Why Cloudwatch logs are always synchronous?

0

My lambda function reads image names from an **SQS **queue, downloads images from an **S3 **bucket, processes them (2 different processing), uploads them into an **S3 **bucket, and then removes the image from the local. I've written an asynchronous version of my lambda function to perform the above tasks in an asynchronous manner using Python asyncio. The asynchronous version uses a separate S3 bucket and a different SQS queue.

When I run the two functions, the asynchronous lambda function prints logs in a synchronous order like the synchronous function. My functions are using python-3.8 runtime and both have the same configuration.

I'm very new to AWS and Python as well. Can anyone suggest to me why the logs in Cloudwatch are synchronous?

For your reference, I'm sharing the original and the asyncio implementation below:

ORIGINAL CODE (synchronous)

    def run(self):
        try:
            messages = self._extract_tasks()
            print("Number of messages extracted from SQS: ", len(messages))
            if len(messages) == 0:
                return

            for image_key in messages:
                image_name = self._get_name_from_key(image_key)
                # print("Image name: " + image_name)
                image_name_without_file_suffix = image_name.split(".")[-2]
                image_file_suffix = image_name.split(".")[-1]
                temp_image_path = image_name_without_file_suffix + "-" + str(random.randrange(100000))
                self._download_image(image_key, temp_image_path + "." + image_file_suffix)
                self.bw_image_processor.monochrome_and_upload(temp_image_path + "." + image_file_suffix)
                self.brighten_image_processor.brighten_and_upload(temp_image_path + "." + image_file_suffix)
                # print("Calling delete from ImageProcessor.run()")
                delete_file(temp_image_path + "." + image_file_suffix)
        except Exception as e:
            print("Failed to process message from SQS queue...")
            print(e)

ASYNC CODE (asynchronous)

    async def process_image(self, image_key):
        image_name = self._get_name_from_key(image_key)
        image_name_without_file_suffix = image_name.split(".")[-2]
        image_file_suffix = image_name.split(".")[-1]
        temp_image_path = image_name_without_file_suffix + "-" + str(random.randrange(100000))
        
        self._download_image(image_key, temp_image_path + "." + image_file_suffix)
        
        tasks = [
            self.bw_image_processor.monochrome_and_upload(temp_image_path + "." + image_file_suffix),
            self.brighten_image_processor.brighten_and_upload(temp_image_path + "." + image_file_suffix)
        ]
        
        await asyncio.gather(*tasks)
        delete_file(temp_image_path + "." + image_file_suffix)
    

    async def process_messages(self):
        try:
            messages = self._extract_tasks()
            print("Number of messages extracted from SQS: ", len(messages))
            if len(messages) == 0:
                return

            # Process messages concurrently
            await asyncio.gather(*[self.process_image(image_key) for image_key in messages])

        except Exception as e:
            print("Failed to process messages from SQS queue...")
            print(e)

    def run(self):
        asyncio.run(self.process_messages())
tariq
已提问 9 个月前85 查看次数
没有答案

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则