Hi everyone,
I am using AWS EMR to do some ETL operations on very large datasets (like millions/billions of records). I am using PySpark and reading the csv files using spark.read.csv. The results are written into a postgres dataframe. I am pasting a sample part of my codebase (do note that this is only a small portion of the code).
def data_profiling(self, cursor, df, attribute_ids, existing_attribute_ids):
total_rows = df.count()
null_count = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0]
unique_count = df.agg(*(countDistinct(col).alias(col) for col in df.columns)).collect()[0]
duplicate_count = df.groupBy(df.columns).count().filter('count > 1').count()
junk_count = df.select([count(when(~col(c).rlike("^[A-Za-z0-9]"), c)).alias(c) for c in df.columns]).collect()[0]
# Calculate percentages
null_percentage = {col: (null_count[col] / total_rows) * 100 for col in df.columns}
unique_percentage = {col: (unique_count[col] / total_rows) * 100 for col in df.columns}
duplicate_percentage = (duplicate_count / total_rows) * 100 if duplicate_count > 0 else 0
junk_percentage = {col: (junk_count[col] / total_rows) * 100 for col in df.columns}
try:
for column in df.columns:
if column == "record_id":
continue
attribute_id = attribute_ids.get(column)
if attribute_id is not None and attribute_id not in existing_attribute_ids:
creation_time = datetime.datetime.now()
null_payload = {"count": null_count[column], "percentage": null_percentage[column]}
unique_payload = {"count": unique_count[column], "percentage": unique_percentage[column]}
duplicate_payload = {"count": duplicate_count, "percentage": duplicate_percentage}
junk_payload = {"count": junk_count[column], "percentage": junk_percentage[column]}
profile_query = "INSERT INTO profiles (profile_type, payload, updated_at, created_at, attribute_id) VALUES (%s, %s, %s, %s, %s)"
cursor.execute(profile_query, ("null", json.dumps(null_payload), creation_time, creation_time, attribute_id))
cursor.execute(profile_query, ("unique", json.dumps(unique_payload), creation_time, creation_time, attribute_id))
cursor.execute(profile_query, ("duplicate", json.dumps(duplicate_payload), creation_time, creation_time, attribute_id))
cursor.execute(profile_query, ("junk", json.dumps(junk_payload), creation_time, creation_time, attribute_id))
conn.commit()
While running the code with the said datasets I'm facing the following error:
**Warning: The Spark session does not have enough YARN resources to start.
The code failed because of a fatal error:
Session 10 unexpectedly reached final status 'dead'. See logs:
stdout:
stderr:
Mar 13, 2024 8:47:06 AM org.apache.spark.launcher.Log4jHotPatchOption staticJavaAgentOption
WARNING: spark.log4jHotPatch.enabled is set to true, but /usr/share/log4j-cve-2021-44228-hotpatch/jdk17/Log4jHotPatchFat.jar does not exist at the configured location
24/03/13 08:47:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/13 08:47:12 INFO DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ip-10-0-9-124.ec2.internal/10.0.9.124:8032
24/03/13 08:47:13 INFO Configuration: resource-types.xml not found
24/03/13 08:47:13 INFO ResourceUtils: Unable to find 'resource-types.xml'.
24/03/13 08:47:13 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (5120 MB per container)
24/03/13 08:47:13 INFO Client: Will allocate AM container, with 2432 MB memory including 384 MB overhead
24/03/13 08:47:13 INFO Client: Setting up container launch context for our AM
24/03/13 08:47:13 INFO Client: Setting up the launch environment for our AM container
24/03/13 08:47:13 INFO Client: Preparing resources for our AM container
24/03/13 08:47:14 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
24/03/13 08:47:35 INFO Client: Uploading resource file:/mnt/tmp/spark-e7b2da8b-7afd-4fae-bdc4-65fff410b216/__spark_libs__16724108829559673263.zip -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/__spark_libs__16724108829559673263.zip24/03/13 08:47:44 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/kryo-shaded-4.0.2.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/kryo-shaded-4.0.2.jar24/03/13 08:47:44 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/livy-api-0.7.1-incubating.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/livy-api-0.7.1-incubating.jar24/03/13 08:47:44 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/livy-rsc-0.7.1-incubating.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/livy-rsc-0.7.1-incubating.jar24/03/13 08:47:44 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/livy-thriftserver-session-0.7.1-incubating.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/livy-thriftserver-session-0.7.1-incubating.jar24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/minlog-1.3.0.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/minlog-1.3.0.jar24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/netty-all-4.1.17.Final.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/netty-all-4.1.17.Final.jar24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/rsc-jars/objenesis-2.5.1.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/objenesis-2.5.1.jar24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/repl_2.12-jars/commons-codec-1.9.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/commons-codec-1.9.jar24/03/13 08:47:45 WARN Client: Same name resource file:///usr/lib/livy/repl_2.12-jars/kryo-shaded-4.0.2.jar added multiple times to distributed cache
24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/repl_2.12-jars/livy-core_2.12-0.7.1-incubating.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/livy-core_2.12-0.7.1-incubating.jar24/03/13 08:47:45 INFO Client: Uploading resource file:/usr/lib/livy/repl_2.12-jars/livy-repl_2.12-0.7.1-incubating.jar -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/livy-repl_2.12-0.7.1-incubating.jar24/03/13 08:47:45 WARN Client: Same name resource file:///usr/lib/livy/repl_2.12-jars/minlog-1.3.0.jar added multiple times to distributed cache
24/03/13 08:47:45 WARN Client: Same name resource file:///usr/lib/livy/repl_2.12-jars/objenesis-2.5.1.jar added multiple times to distributed cache
24/03/13 08:47:45 INFO Client: Uploading resource file:/etc/spark/conf.dist/hive-site.xml -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/hive-site.xml24/03/13 08:47:46 INFO Client: Uploading resource file:/etc/hudi/conf.dist/hudi-defaults.conf -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/hudi-defaults.conf24/03/13 08:47:46 INFO Client: Uploading resource file:/usr/lib/spark/R/lib/sparkr.zip#sparkr -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/sparkr.zip24/03/13 08:47:46 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/pyspark.zip -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/pyspark.zip24/03/13 08:47:47 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/py4j-0.10.9.7-src.zip24/03/13 08:47:48 INFO Client: Uploading resource file:/mnt/tmp/spark-e7b2da8b-7afd-4fae-bdc4-65fff410b216/__spark_conf__10224235219861396504.zip -> hdfs://ip-10-0-9-124.ec2.internal:8020/user/livy/.sparkStaging/application_1710314844894_0011/spark_conf.zip24/03/13 08:47:51 INFO SecurityManager: Changing view acls to: livy
24/03/13 08:47:51 INFO SecurityManager: Changing modify acls to: livy
24/03/13 08:47:51 INFO SecurityManager: Changing view acls groups to:
24/03/13 08:47:51 INFO SecurityManager: Changing modify acls groups to:
24/03/13 08:47:51 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: livy; groups with view permissions: EMPTY; users with modify permissions: livy; groups with modify permissions: EMPTY
24/03/13 08:47:51 INFO Client: Submitting application application_1710314844894_0011 to ResourceManager
24/03/13 08:47:52 INFO YarnClientImpl: Submitted application application_1710314844894_0011
24/03/13 08:47:52 INFO Client: Application report for application_1710314844894_0011 (state: ACCEPTED)
24/03/13 08:47:52 INFO Client:
client token: N/A
diagnostics: [Wed Mar 13 08:47:51 +0000 2024] Application is added to the scheduler and is not yet activated. Queue's AM resource limit exceeded. Details : AM Partition = <DEFAULT_PARTITION>; AM Resource Request = <memory:2432, max memory:5120, vCores:1, max vCores:8>; Queue Resource Limit for AM = <memory:5120, vCores:1>; User AM Resource Limit of the queue = <memory:5120, vCores:1>; Queue AM Resource Usage = <memory:4864, vCores:2>;
Application host: N/A
Application RPC port: -1
queue: default
start time: 1710319671958
final status: UNDEFINED
tracking URL: http://ip-10-0-9-124.ec2.internal:20888/proxy/application_1710314844894_0011/
user: livy
24/03/13 08:47:52 INFO ShutdownHookManager: Shutdown hook called
24/03/13 08:47:52 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-0c342f25-f293-4bea-9139-12e36b315e71
24/03/13 08:47:52 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-e7b2da8b-7afd-4fae-bdc4-65fff410b216
YARN Diagnostics:
Application application_1710314844894_0011 was killed by user livy at 10.0.9.124.**
I am new to AWS and the same code used to work in Databricks without any issue (although it was slow). What can I do to solve this issue?