Why Cloudwatch logs are always synchronous?

0

My lambda function reads image names from an **SQS **queue, downloads images from an **S3 **bucket, processes them (2 different processing), uploads them into an **S3 **bucket, and then removes the image from the local. I've written an asynchronous version of my lambda function to perform the above tasks in an asynchronous manner using Python asyncio. The asynchronous version uses a separate S3 bucket and a different SQS queue.

When I run the two functions, the asynchronous lambda function prints logs in a synchronous order like the synchronous function. My functions are using python-3.8 runtime and both have the same configuration.

I'm very new to AWS and Python as well. Can anyone suggest to me why the logs in Cloudwatch are synchronous?

For your reference, I'm sharing the original and the asyncio implementation below:

ORIGINAL CODE (synchronous)

    def run(self):
        try:
            messages = self._extract_tasks()
            print("Number of messages extracted from SQS: ", len(messages))
            if len(messages) == 0:
                return

            for image_key in messages:
                image_name = self._get_name_from_key(image_key)
                # print("Image name: " + image_name)
                image_name_without_file_suffix = image_name.split(".")[-2]
                image_file_suffix = image_name.split(".")[-1]
                temp_image_path = image_name_without_file_suffix + "-" + str(random.randrange(100000))
                self._download_image(image_key, temp_image_path + "." + image_file_suffix)
                self.bw_image_processor.monochrome_and_upload(temp_image_path + "." + image_file_suffix)
                self.brighten_image_processor.brighten_and_upload(temp_image_path + "." + image_file_suffix)
                # print("Calling delete from ImageProcessor.run()")
                delete_file(temp_image_path + "." + image_file_suffix)
        except Exception as e:
            print("Failed to process message from SQS queue...")
            print(e)

ASYNC CODE (asynchronous)

    async def process_image(self, image_key):
        image_name = self._get_name_from_key(image_key)
        image_name_without_file_suffix = image_name.split(".")[-2]
        image_file_suffix = image_name.split(".")[-1]
        temp_image_path = image_name_without_file_suffix + "-" + str(random.randrange(100000))
        
        self._download_image(image_key, temp_image_path + "." + image_file_suffix)
        
        tasks = [
            self.bw_image_processor.monochrome_and_upload(temp_image_path + "." + image_file_suffix),
            self.brighten_image_processor.brighten_and_upload(temp_image_path + "." + image_file_suffix)
        ]
        
        await asyncio.gather(*tasks)
        delete_file(temp_image_path + "." + image_file_suffix)
    

    async def process_messages(self):
        try:
            messages = self._extract_tasks()
            print("Number of messages extracted from SQS: ", len(messages))
            if len(messages) == 0:
                return

            # Process messages concurrently
            await asyncio.gather(*[self.process_image(image_key) for image_key in messages])

        except Exception as e:
            print("Failed to process messages from SQS queue...")
            print(e)

    def run(self):
        asyncio.run(self.process_messages())
tariq
asked 8 months ago81 views
No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions