problem with time sync in kinesis analytics


so I have 2 streams and what I am doing is making 2 tables in studio and then just merging the 2 tables using inner-join and putting all that data in s3, what this issue seems to be is that because of the time being different aws is not able to join the data based on the given key.

the following is the code for joining the 2 tables previously when I used simple data everything was working but now there is no error, only the tables are not merging


INSERT INTO s3_join 
SELECT * FROM ExampleInputStream1
INNER JOIN ExampleInputStream
ON ExampleInputStream1.seq_num = ExampleInputStream.seq_num
已提問 1 年前檢視次數 255 次
1 個回答

It sounds like the issue might be related to the timestamp format of the data in your two input streams. When using an inner join, the join condition needs to match exactly, so if the timestamp format is different in the two streams, the join may not be successful.

One way to address this could be to convert the timestamp format to a common format before joining the two streams. You could use a Flink SQL function like DATE_FORMAT() to convert the timestamp to a string with a common format, and then join the two streams based on that string.

Here's an example of how you could modify your query to convert the timestamp format:

%flink.ssql(type=update) INSERT INTO s3_join SELECT * FROM ( SELECT *, DATE_FORMAT(ExampleInputStream1.ts, 'yyyy-MM-dd HH:mm:ss') as ts_formatted FROM ExampleInputStream1 ) stream1 INNER JOIN ( SELECT *, DATE_FORMAT(ExampleInputStream.ts, 'yyyy-MM-dd HH:mm:ss') as ts_formatted FROM ExampleInputStream ) stream2 ON stream1.seq_num = stream2.seq_num AND stream1.ts_formatted = stream2.ts_formatted

In this example, I'm using DATE_FORMAT() to convert the ts column in each stream to a string with the format 'yyyy-MM-dd HH:mm:ss'. I'm then joining the two streams based on both the seq_num and ts_formatted columns. This should ensure that the join condition matches exactly, even if the timestamp format is different in the two streams. hope this helps!

已回答 1 年前
  • hi, really appreciate the reply, but this is not working. what I believe might be a problem is that the streams are not syncing. so I have 2 streams collecting a huge amount of data and since the data is so huge even if I am starting the data collection at the same time, something is not going right, either kinesis is not able to collect all the packets like the 2 streams are picking up random packets, or something else, not sure. do you have any recommendations?

您尚未登入。 登入 去張貼答案。