AWS EMR parallel tasks and performance issue

0

Trying to load data of 200GB into dynamo using spark EMR but facing performance issues.

""" Copy paste the following code in your Lambda function. Make sure to change the following key parameters for the API as per your account

-Name (Name of Spark cluster) -LogUri (S3 bucket to store EMR logs) -Ec2SubnetId (The subnet to launch the cluster into) -JobFlowRole (Service role for EC2) -ServiceRole (Service role for Amazon EMR)

The following parameters are additional parameters for the Spark job itself. Change the bucket name and prefix for the Spark job (located at the bottom).

-s3://your-bucket-name/prefix/lambda-emr/SparkProfitCalc.jar (Spark jar file) -s3://your-bucket-name/prefix/fake_sales_data.csv (Input data file in S3) -s3://your-bucket-name/prefix/outputs/report_1/ (Output location in S3)

""" import json import boto3

client = boto3.client('emr')

def lambda_handler(event, context):

env = event['env']
cb_bucket_name = event['cb-bucket-name']
ddb_table_name = event['ddb-table-name']
existing_table_size_in_DDB_mb = event['existing-table-size-in-DDB-mb']
s3_location_cbexport = event['s3-location-cbexport']
s3_location_largerecords = event['s3-location-largerecords']
s3_location_region_largerecords = event['s3-location-region-largerecords']
time_stamp_for_ddb_table = event['time-stamp-for-ddb-table']
just_migrate_large_doc_to_s3 = event['just-migrate-large-doc-to-s3']
debug_mode = event['debug-mode']

print(env)
print(cb_bucket_name)
print(ddb_table_name)
print(existing_table_size_in_DDB_mb)
print(s3_location_cbexport)
print(s3_location_largerecords)
print(s3_location_region_largerecords)
print(time_stamp_for_ddb_table)
print(just_migrate_large_doc_to_s3)
print(debug_mode)

dataMigrationJobId = "data-migration-job-" + cb_bucket_name + "-" + env
response = client.run_job_flow(
    Name= dataMigrationJobId,
    LogUri= 's3://cls-cb-migration-artifacts-qaint/logs/myjob',
    ReleaseLabel= 'emr-6.2.0',
    Instances={
        'InstanceGroups' : [
           {
                'Name' : 'Master node',
                'Market' : 'ON_DEMAND',
                'InstanceRole' : 'MASTER',
                'InstanceType' : 'r5d.2xlarge',
                'InstanceCount' : 1,
                

            },
            {
                'Name' : 'Core nodes',
                'Market' : 'ON_DEMAND',
                'InstanceRole' : 'CORE',
                'InstanceType' : 'r5d.4xlarge',
                'InstanceCount' : 16,
            },
            {
                'Name' : 'Task nodes',
                'Market' : 'SPOT',
                'InstanceRole' : 'TASK',
                'InstanceType' : 'r5d.2xlarge',
                'InstanceCount' : 20,
                'BidPrice' : '0.6'
            }
            ],
        'Ec2KeyName': 'cbtoddb',
        'KeepJobFlowAliveWhenNoSteps': False,
        'TerminationProtected': False,
        'Ec2SubnetId': 'subnet-057dfe60cac41b111'
    },
    Applications = [ {'Name': 'Spark'} ],
    VisibleToAllUsers=True,
    JobFlowRole = 'CouchbaseRole',
    ServiceRole = 'EMR_DefaultRole',
    Steps=[
        {
            'Name': dataMigrationJobId,
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': [
                        'spark-submit',
                        '--deploy-mode', 'cluster',
                        '--executor-memory', '4G',
                        '--driver-memory', '16G',
                        '--conf', 'spark.executor.cores=16',
                        '--conf', 'spark.dynamicAllocation.enabled=true',
                        '--conf', 'spark.dynamicAllocation.minExecutors=50',
                        '--conf', 'spark.dynamicAllocation.maxExecutors=200',
                        '--conf', 'spark.sql.shuffle.partitions =4000',
                        '--conf', 'spark.sql.autoBroadcastJoinThreshold =10485760',
                        '--jars', '/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar',
                        '--class', 'com.pearson.Migration',
                        's3://myoptimizedjarcs/v6/CBToDDB-1.0-SNAPSHOT.jar',
                        env,
                        cb_bucket_name,
                        ddb_table_name,
                        existing_table_size_in_DDB_mb,
                        s3_location_cbexport,
                        s3_location_largerecords,
                        s3_location_region_largerecords,
                        time_stamp_for_ddb_table,
                        just_migrate_large_doc_to_s3,
                        debug_mode
                    ]
            }
        }
    ]
)

return response

Using the above config but performce is not good i want to write data within 60 mins.

질문됨 2달 전719회 조회
4개 답변
0

Hi,

Did you test the DDB bulk import feature (from S3) to compare with EMR: https://aws.amazon.com/about-aws/whats-new/2022/08/amazon-dynamodb-supports-bulk-imports-amazon-s3-new-dynamodb-tables/

This blog post may also help: https://aws.amazon.com/blogs/database/amazon-dynamodb-can-now-import-amazon-s3-data-into-a-new-table/

I would personally try this first to see the performance I get from this simpler way to ingest large data. If your EMR task is required to process this data, before loading it then you can use the S3 bulk import as a baseline for performances.

If you get performances you want with S3 bulk import, you can also envision to use EMR for a processing of the data with results in S3 that you would then bulk import into DDB via S3 import.

Best,

Didier

profile pictureAWS
전문가
답변함 2달 전
profile picture
전문가
검토됨 2달 전
  • Hi, is this bulk import capable of handling large documents means larger tha dynamo db limit, we are using spark because some docs are bigger hence we persist them to s3. Can you help how can we mitigate this?

    @Didier?

0

Hi, is this bulk import capable of handling large documents means larger tha dynamo db limit, we are using spark because some docs are bigger hence we persist them to s3. Can you help how can we mitigate this?

답변함 2달 전
0

Hi,

Can you please suggest appropriate way to handle this load in given time. I would like to get efficient and performance friendly solution. Would like to focus on getting performance through EMR.

답변함 2달 전
0
  1. Please check if your EMR cluster is utilized maximum resources to execute the spark application. You can configure MaximumResourceAllocation property available in EMR spark to leverage the available resources.

  2. If your EMR cluster requires more nodes based on your requirement, further go ahead and extend it or use** auto scaling** as needed.

  3. You can check at DynamoDB side and make sure it utilizes the full provisioned capacity(WCU). Also, consider you have EMR cluster and dynamoDB table in the same region. You can also leverage this document to further optimize the performance.

profile pictureAWS
지원 엔지니어
답변함 2달 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인