Beanstalk daemon in Worker environment for Python does not delete messages from SQS queue even after sending '200 OK' response

0

There is a beanstalk worker environ setup in Python to do some large zip extraction process. I've implemented it using the following code. Once the process is done application code is sending '200 OK' response to beanstalk but beanstalk daemon is not deleting message from the SQS where the beanstalk daemon reads messages from. The extraction process takes around 30-32 mins to complete.

SQS: Visibility timeout : 1 hr 30 mins Beanstalk: Visibility/Inactivity timeout: 5400

Code:

def application(environ, start_response):

status = "400 Bad Request"
response = "Pending"

path = environ['PATH_INFO']
method = environ['REQUEST_METHOD']

# HTTP request method is always 'POST'
if method == 'POST':
    # If 'HTTP path' is set to '/' in Elastic beanstalk configuration 
    if path == '/':
        message_body_size = int(environ['CONTENT_LENGTH'])
        message = environ['wsgi.input'].read(message_body_size)
        
        if message:
            resources = message['resources'] if 'resources' in message else ''
            if resources:
                for item in resources:
                    zip_path = item['zip_file_path'].strip() if 'zip_file_path' in item else ''
                    extract_path = item['zip_extract_destination'].strip() if 'zip_extract_destination' in item else ''
                    
                    if all([zip_path, extract_path]):
                        try:
                            with zipfile.ZipFile(zip_path, mode="r") as archive:
                                archive.extractall(extract_path)                                
                        except Exception as error:
                            process_error = True
                            process_log_message = f'Exception raised when extracting package: {zip_path} - {str(error)}'
                            break
                    else:
                        process_error = True
                        process_log_message = 'Source or destination location for input package is blank'
                        break
                        
            if process_error == True:
                write_to_log_stream(process_log_message)
            else:
                status = '200 OK'
                response = b'Done'                      
 
# Send response to Elastic Beanstalk daemon
headers = [('Content-type', 'text/plain')]
start_response(status, headers)
return [response]

Where I'm doing wrong?

1 Answer
0

After executing the API to retrieve a message from SQS (receive_message), do you execute the API to delete the message (delete_message)?
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/delete_message.html

Or change it to execute delete_message when status 200 is returned.
SQS does not delete a message just by retrieving it from a queue, it remains.
So you need to run the API to delete after the process is complete.

profile picture
EXPERT
answered 9 months ago
  • Architecture of our application is something different.

    1. There is a Lambda function which is posting message in HTTP POST method to SQS set in Python beanstalk.
    2. Python application (above code) is grabbing those messages through a special parameter 'environ' already provided by AWS, using below method:

    def application(environ, start_response):

    So, here we are not directly accessing SQS to get or delete messages. Messages are already available by a default AWS HTTP Server called Gunicorn. 2. Next, after successful processing, we need to send '200 OK' response to beanstalk. And as a result beanstalk daemon thinks the process was successful and sends a DeleteMessage call to that SQS to remove that particular message. As mentioned in the docs:

    https://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html

    "When the application in the worker environment returns a 200 OK response to acknowledge that it has received and successfully processed the request, the daemon sends a DeleteMessage call to the Amazon SQS queue to delete the message from the queue. If the application returns any response other than 200 OK, Elastic Beanstalk waits to put the message back in the queue after the configured ErrorVisibilityTimeout period. If there is no response, Elastic Beanstalk waits to put the message back in the queue after the InactivityTimeout period so that the message is available for another attempt at processing."

  • I see, so that means that the code in Elastic Beanstalk seems to be OK. Are there any logs or other output anywhere related to SQS acquisition? If logs are being output, I would like to check for errors, etc. https://repost.aws/knowledge-center/elastic-beanstalk-cron-job-worker-tier

  • I hade to increase 'KeepAliveTimeout' and 'Timeout' value in Apache Httpd configuration. Before it was 60 sec, so for short timed processes there were no issue, but for long time processes, the server connection broke and the request requeued in SQS by beanstalk. I did it to 5400 secs.

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions