- Newest
- Most votes
- Most comments
It sounds like the issue might be related to the timestamp format of the data in your two input streams. When using an inner join, the join condition needs to match exactly, so if the timestamp format is different in the two streams, the join may not be successful.
One way to address this could be to convert the timestamp format to a common format before joining the two streams. You could use a Flink SQL function like DATE_FORMAT() to convert the timestamp to a string with a common format, and then join the two streams based on that string.
Here's an example of how you could modify your query to convert the timestamp format:
%flink.ssql(type=update) INSERT INTO s3_join SELECT * FROM ( SELECT *, DATE_FORMAT(ExampleInputStream1.ts, 'yyyy-MM-dd HH:mm:ss') as ts_formatted FROM ExampleInputStream1 ) stream1 INNER JOIN ( SELECT *, DATE_FORMAT(ExampleInputStream.ts, 'yyyy-MM-dd HH:mm:ss') as ts_formatted FROM ExampleInputStream ) stream2 ON stream1.seq_num = stream2.seq_num AND stream1.ts_formatted = stream2.ts_formatted
In this example, I'm using DATE_FORMAT() to convert the ts column in each stream to a string with the format 'yyyy-MM-dd HH:mm:ss'. I'm then joining the two streams based on both the seq_num and ts_formatted columns. This should ensure that the join condition matches exactly, even if the timestamp format is different in the two streams. hope this helps!
Relevant content
- asked 2 years ago
- AWS OFFICIALUpdated a year ago
- AWS OFFICIALUpdated 8 months ago
- AWS OFFICIALUpdated a year ago
hi, really appreciate the reply, but this is not working. what I believe might be a problem is that the streams are not syncing. so I have 2 streams collecting a huge amount of data and since the data is so huge even if I am starting the data collection at the same time, something is not going right, either kinesis is not able to collect all the packets like the 2 streams are picking up random packets, or something else, not sure. do you have any recommendations?