Skip to content

How do I queue messages in SQS to wait for a Step Function to complete?

0

I am working on a process that is as follows:

  1. New S3 file is PUT in a bucket/folder (i.e. bucket_name/folder_name/file_name.json)
  2. Lambda 1 is triggered by S3 PutObject and sends a message containing the file path to SQS (Message: "folder_name/file_name.json")
  3. FIFO SQS receives message and triggers Lambda 2
  4. Lambda 2 triggers Step Function which triggers a series of Glue Jobs

I need the messages to sit in SQS until the Step Function is completed as the Glue Jobs cannot run concurrently if the same glue jobs are requested at around the same time.

Right now, I can place a file in S3 and it triggers the whole process. However, if I add another file immediately after, the whole process is triggered over again and two step functions are running at the same time.

How do I force the SQS messages to wait until my Step Function is completed? I only want one iteration of the step function running at one time.

Edit After Accepting Answer: The process I ended up with was a single lambda, a standard SQS, and my step function. When a file is placed in S3 it triggers the lambda. Since the trigger was from S3, the lambda checks if the Step Function is running. If it's not, the Step Function is triggered, if it is a message is sent to SQS. At the end of the Step Function I added a step to trigger the Lambda again (bypassing using an EventBridge). This time the lambda sees that the trigger was from the step function so it checks if there's a message waiting in SQS. If there is it triggers the Step Function, if there isn't the process is complete and the lambda waits for the next S3 file drop.

3 Answers
1
Accepted Answer

There is no built in way to do what you want, however, there are a few options to implement this.

One option is to catch the Step Function SUCCEEDED event in EventBridge, and use it to invoke a Lambda that checks if there is another message in the queue and if there is, it invokes a new state machine. You will not be able to use the SQS to Lambda trigger (Step 3 above). So your Lambda 1 checks if there is an execution. If there is, it just puts the message in the queue. If there is none, it invokes the state machine.

Another option is to start a step function execution for each object, but before starting the glue jobs, check if there is another one in progress. If there is one, you sleep for a few seconds, and check again. If there is none, you start the glue jobs. You can use Dynamo DB to mark when you start the glue jobs and when you are done.

AWS
EXPERT
answered a year ago
  • So the process would be, S3 file placed -> Lambda checks SF status If SF running -> Lambda sends message to SQS If SF not running -> Lambda triggers SF EventBridge gets notified of SF Success -> Triggers lambda (same as above?) to check for SQS messages If SQS message -> Lambda triggers SF (How do I check if an SQS message is waiting? I cannot find a way to do this) If No SQS message -> Process completed until new S3 file received . Does that flow make sense? Can I do everything in one lambda or would I need two?

  • Yes. Exactly.

    To check if there is a message in the queue, you do not use an SQS integration, but rather, you use the AWS SQS SDK to read a message from the queue. If you get a message back, you handle it. If not, you do nothing.

    The functions do not need to be the same. Usually I would recommend having two different functions. However, in this case, to prevent race conditions, maybe it is better to have one function and limit the function's concurrency to 1 (by using Reserved Concurrency). Pay attention that each event source has a different event object structure, so you will need to check it in your handler.

  • Edit: I realize my error. I was using a colon instead of an equals sign for the QueueURL . Ok great! One final question. I was working on this process yesterday and the step I'm working on is the lambda checking if there's messages in SQS. I'm trying to check if there's a message in the queue, if there is I'll read it and pass it to the state machine. But I keep getting an error when I try to do get_queue_attributes() I'm using Python sqs = boto3.client('sqs') sqs_response = sqs.get_queue_attributes( QueueUrl: 'https://sqs.here-is-my-sqs-url.fifo', AttributeNames = ['ApproximateNumberOfMessages'] ) No matter the order I put the parameters for get_queue_attributes I always get an error on the first one and it's just a basic Runtime.UserCodeSyntaxError so I can't tell what's wrong. Hopefully once that is figured out I'll be able to get this process working. I appreciate your help, Uri!

  • There is no reason to check if there are messages in the queue. Just read one message. Specify 0 for the wait time and 1 for the batch size:

    response = client.receive_message(
        QueueUrl= 'https://sqs.here-is-my-sqs-url.fifo',
        MaxNumberOfMessages=1,
        WaitTimeSeconds=0
    )
    

    If there is a message the response Messages attribute will include 1 message. If there are none, it will be empty.

    The only reason might be cost ($0.40 vs $0.50 per million requests), but I do not think it is a factor in your case.

0

When the messages are injected to FIFO SQS from Lambda 1, consider having one message group ID. This will ensure only one Lambda instance processes messages for a particular message group ID at a time. This also ensure messages within a group are handled sequentially.

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/fifo-queue-lambda-behavior.html#lambda-concurrency-fifo-queues

And then in the FIFO SQS -> Lambda set up, have batch size set as 1, this will ensure only one message from the queue is consumed.

https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html#invocation-eventsourcemapping-batching

Effectively with this, S3 invokes Lambda 1, Lambda 1 injects messages to the FIFO SQS with single message group id. Single message group id will ensure only one Lambda instance processes messages. With batch size as 1, the Lambda 2 will only send/process one message and invoke step function.

AWS
SUPPORT ENGINEER
answered a year ago
0

If I were you, I think I'd just use EventBridge S3 Notifications to trigger your Step Functions state machine directly (no need for deploying and managing intermediary things). Then implement the concurrency control in the workflow logic. I'm assuming you won't have massive volume, in which case placing a queue in the middle with more flow control might be helpful. The advantage of doing it this way is that you have fewer moving parts and you have better visibility and control (e.g., you can see which ones are waiting and cancel some if you don't want them to proceed.

I wrote up this blog post a while back on how to implement concurrency control in your workflow definition: https://aws.amazon.com/blogs/compute/controlling-concurrency-in-distributed-systems-using-aws-step-functions/

More recently I included a slightly altered approach in this app, where the logic for acquiring and releasing the locks is externalized for re-use and where the key you use for your lock is dynamic. https://github.com/aws-samples/aws-stepfunctions-examples/tree/main/sam/app-meeting-summarization

AWS
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.