Reprocessing failed DDB streams

2

We've been using DDB streams as an event source for lambda functions serving our analytics pipeline. Additionally, we configured the lambda to send the failed records metadata to a DLQ in case the processing fails after a given number of retries - by doing so, we get the event metadata rather than the actual failing records. Once we fix the underlying issue we currently have no easy way to reprocess the failing records.

Looked at KCL but it requires Java to be installed. As we use Python, we find it somewhat strange to install Java just for the sake of getting the impacted DDB records. To me, the data that's sent to the DLQ should include the impacted records.

Would appreciate any easy way to fetch the impacted records given the metadata sent in such a case.

3 Answers
1

Should be possible in Python using boto3.client('dynamodbstreams') First, you should get shard iterator using metadata of failed record(s) and then using that shard iterator you get records themselves.

answered 2 years ago
  • Thanks for this answer, I was wondering whether there's an option using AWS native service to get this information. If no other answers come up I'll mark is as the answer.

1

Yes, you don't get the impacted records, but a shard iterator.

You need to call get-shard-iterator, and then call getRecords to get the actual impacted records.


In response to your question

Would appreciate any easy way to fetch the impacted records given the metadata sent in such a case.

Two Reasons why this is not given by AWS out-of-the box:

Reason1:

SQS message size has a limit, your DDB record update size may exceed the message limit. Hence by delivering the shard information which is of fixed size, AWS internally didn't had to worry about actual message size limitations.

Reason2:

Above is the approach to get the impacted records. You might ask Why can't AWS DDB or lambda deliver the impacted records to DLQ?, to answer this, we need to understand what DDBTrigger and shard is ?

Since, DDB trigger is providing you the realtime updates to table data using kinesis behind the scenes(shard mechanism), and if particular read from shard failed, then it's delivered to dlq and continue with next data in the shard.

So within a shard, let's say your record Id=ABC was updated multiple times(v0,v1,v2,v3 be the versions), and if some udpate(v1) from DDB failed processing in your lambda, they are sidelined to DLQ. It's possible that next updated to same record Id=ABC, were succesffully processes later on by the lambda. Let's assume your lambda is writing to S3 (eventually overriding older versions). In this scenarios,

  • v0 was written to S3, as lambda was able to process succesffuly
  • v1's updates got delivered to dlq since lambda failed
  • v2 was written to S3, as lambda was able to process succesffuly
  • v3 was written to S3, as lambda was able to process succesffuly

Now, you need a processor over the dlq (shard reader + getRecords) to read and identify whether you should write to S3 or not. In this case you should ignore and delete the dlq message, since v3 has been written by lambda later on.

Since, lambda failed on one of the shard iterator, and your business logic in lambda can be doing anything with the data, it delivers that shard iterator to the DLQ and not the messages.

Hope this information helps you in understanding the problem.

Let me know if anything is unclear.

mtk
answered 2 years ago
  • This is clear, but unfortunately it means you can't process messages after 24 hours have passed, because they're no longer in the stream. Right?

1

Would it work in your case to use a Lambda function to consume (and enrich) the records from the SQS dead letter queue? You could write the processing logic in Python and only activate the Lambda function once you have recovered from the incident and are ready to process the backlog from the SQS queue.

https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html

answered 2 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions