Why Cloudwatch logs are always synchronous?

0

My lambda function reads image names from an **SQS **queue, downloads images from an **S3 **bucket, processes them (2 different processing), uploads them into an **S3 **bucket, and then removes the image from the local. I've written an asynchronous version of my lambda function to perform the above tasks in an asynchronous manner using Python asyncio. The asynchronous version uses a separate S3 bucket and a different SQS queue.

When I run the two functions, the asynchronous lambda function prints logs in a synchronous order like the synchronous function. My functions are using python-3.8 runtime and both have the same configuration.

I'm very new to AWS and Python as well. Can anyone suggest to me why the logs in Cloudwatch are synchronous?

For your reference, I'm sharing the original and the asyncio implementation below:

ORIGINAL CODE (synchronous)

    def run(self):
        try:
            messages = self._extract_tasks()
            print("Number of messages extracted from SQS: ", len(messages))
            if len(messages) == 0:
                return

            for image_key in messages:
                image_name = self._get_name_from_key(image_key)
                # print("Image name: " + image_name)
                image_name_without_file_suffix = image_name.split(".")[-2]
                image_file_suffix = image_name.split(".")[-1]
                temp_image_path = image_name_without_file_suffix + "-" + str(random.randrange(100000))
                self._download_image(image_key, temp_image_path + "." + image_file_suffix)
                self.bw_image_processor.monochrome_and_upload(temp_image_path + "." + image_file_suffix)
                self.brighten_image_processor.brighten_and_upload(temp_image_path + "." + image_file_suffix)
                # print("Calling delete from ImageProcessor.run()")
                delete_file(temp_image_path + "." + image_file_suffix)
        except Exception as e:
            print("Failed to process message from SQS queue...")
            print(e)

ASYNC CODE (asynchronous)

    async def process_image(self, image_key):
        image_name = self._get_name_from_key(image_key)
        image_name_without_file_suffix = image_name.split(".")[-2]
        image_file_suffix = image_name.split(".")[-1]
        temp_image_path = image_name_without_file_suffix + "-" + str(random.randrange(100000))
        
        self._download_image(image_key, temp_image_path + "." + image_file_suffix)
        
        tasks = [
            self.bw_image_processor.monochrome_and_upload(temp_image_path + "." + image_file_suffix),
            self.brighten_image_processor.brighten_and_upload(temp_image_path + "." + image_file_suffix)
        ]
        
        await asyncio.gather(*tasks)
        delete_file(temp_image_path + "." + image_file_suffix)
    

    async def process_messages(self):
        try:
            messages = self._extract_tasks()
            print("Number of messages extracted from SQS: ", len(messages))
            if len(messages) == 0:
                return

            # Process messages concurrently
            await asyncio.gather(*[self.process_image(image_key) for image_key in messages])

        except Exception as e:
            print("Failed to process messages from SQS queue...")
            print(e)

    def run(self):
        asyncio.run(self.process_messages())
tariq
質問済み 9ヶ月前85ビュー
回答なし

ログインしていません。 ログイン 回答を投稿する。

優れた回答とは、質問に明確に答え、建設的なフィードバックを提供し、質問者の専門分野におけるスキルの向上を促すものです。

質問に答えるためのガイドライン

関連するコンテンツ