I'm trying to build a simple Collaborative Filtering Recommendation Engine using Apache Spark ML lib on Amazon EMR.
So I created a EMR on EC2 cluster, with the following configuration:


I tried to run my ALS code but it failed every time, but I tried a simple step which is just formatting some data and it done successfully:

Here is the code I used to train ALS model:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, explode
from pyspark.sql.types import StructType, StructField, IntegerType
import sys
import logging
args = sys.argv[1:]
bucket = args[0]
logging.info("bucket " + bucket)
with SparkSession.builder.appName("CollaborativeFilteringRecommendationEngine").config(
"spark.sql.adaptive.enabled", "true"
).config(
"spark.sql.adaptive.coalescePartitions.enabled", "true"
).getOrCreate() as spark:
spark.sparkContext.setLogLevel("DEBUG")
logging.info("Spark Session Created")
schema = StructType(
[
StructField("user_id", IntegerType(), True),
StructField("item_id", IntegerType(), True),
StructField("rating", IntegerType(), True),
]
)
# Load data from S3
ratings_df = (
spark.read.option("header", "true")
.schema(schema)
.csv(f"s3://{bucket}/data/rating.csv")
)
ratings_df.show(10)
# 80% train, 20% test
(training, test) = ratings_df.randomSplit([0.8, 0.2], seed=42)
training.cache()
test.cache()
als = ALS(
maxIter=10,
regParam=0.1,
userCol="user_id",
itemCol="item_id",
ratingCol="rating",
coldStartStrategy="drop",
nonnegative=True,
implicitPrefs=False,
)
# Train the model
print("Training ALS model...")
model = als.fit(training)
# Make predictions on test data
predictions = model.transform(test)
predictions.show(10)
# Evaluate model performance
evaluator = RegressionEvaluator(
metricName="rmse", labelCol="rating", predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Square Error (RMSE): {rmse}")
mae_evaluator = RegressionEvaluator(
metricName="mae", labelCol="rating", predictionCol="prediction"
)
mae = mae_evaluator.evaluate(predictions)
print(f"Mean Absolute Error (MAE): {mae}")
model_path = f"s3://{bucket}/model"
model.save(model_path)
Here is the log of failed step:
stderr
25/06/14 10:23:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/14 10:23:31 INFO DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ip-10-0-13-71.ec2.internal/10.0.13.71:8032
25/06/14 10:23:32 INFO Configuration: resource-types.xml not found
25/06/14 10:23:32 INFO ResourceUtils: Unable to find 'resource-types.xml'.
25/06/14 10:23:32 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (12288 MB per container)
25/06/14 10:23:32 INFO Client: Will allocate AM container, with 2432 MB memory including 384 MB overhead
25/06/14 10:23:32 INFO Client: Setting up container launch context for our AM
25/06/14 10:23:32 INFO Client: Setting up the launch environment for our AM container
25/06/14 10:23:32 INFO Client: Preparing resources for our AM container
25/06/14 10:23:32 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/06/14 10:23:35 INFO Client: Uploading resource file:/mnt/tmp/spark-d6ec1322-83b1-4baa-8502-6eb83f2ae046/__spark_libs__8023384351060371926.zip -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/__spark_libs__8023384351060371926.zip
25/06/14 10:23:36 INFO Client: Uploading resource file:/etc/hudi/conf.dist/hudi-defaults.conf -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/hudi-defaults.conf
25/06/14 10:23:37 INFO ClientConfigurationFactory: Set initial getObject socket timeout to 2000 ms.
25/06/14 10:23:38 INFO Client: Uploading resource s3://h-dataset-asl/gpt/main.py -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/main.py
25/06/14 10:23:39 INFO S3NativeFileSystem: Opening 's3://h-dataset-asl/gpt/main.py' for reading
25/06/14 10:23:39 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/pyspark.zip -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/pyspark.zip
25/06/14 10:23:39 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/py4j-0.10.9.5-src.zip
25/06/14 10:23:40 INFO Client: Uploading resource file:/mnt/tmp/spark-d6ec1322-83b1-4baa-8502-6eb83f2ae046/__spark_conf__7120570014526316113.zip -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/__spark_conf__.zip
25/06/14 10:23:40 INFO SecurityManager: Changing view acls to: hadoop
25/06/14 10:23:40 INFO SecurityManager: Changing modify acls to: hadoop
25/06/14 10:23:40 INFO SecurityManager: Changing view acls groups to:
25/06/14 10:23:40 INFO SecurityManager: Changing modify acls groups to:
25/06/14 10:23:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); groups with view permissions: Set(); users with modify permissions: Set(hadoop); groups with modify permissions: Set()
25/06/14 10:23:40 INFO Client: Submitting application application_1749896428067_0001 to ResourceManager
25/06/14 10:23:41 INFO YarnClientImpl: Submitted application application_1749896428067_0001
controller
2025-06-14T10:23:28.498Z INFO Ensure step 1 jar file command-runner.jar
2025-06-14T10:23:28.499Z INFO StepRunner: Created Runner for step 1
INFO startExec 'hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster s3://h-dataset-asl/gpt/main.py'
INFO Environment:
PATH=/usr/lib64/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/opt/aws/puppet/bin/
SECURITY_PROPERTIES=/emr/instance-controller/lib/security.properties
HISTCONTROL=ignoredups
HISTSIZE=1000
HADOOP_ROOT_LOGGER=INFO,DRFA
JAVA_HOME=/etc/alternatives/jre
AWS_DEFAULT_REGION=us-east-1
LANG=en_US.UTF-8
MAIL=/var/spool/mail/hadoop
LOGNAME=hadoop
PWD=/
HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-01799401JPIB6JZE2M3Z/tmp
_=/etc/alternatives/jre/bin/java
LESSOPEN=||/usr/bin/lesspipe.sh %s
SHELL=/bin/bash
QTINC=/usr/lib64/qt-3.3/include
USER=hadoop
HADOOP_LOGFILE=syslog
HOSTNAME=ip-10-0-13-71
QTDIR=/usr/lib64/qt-3.3
HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-01799401JPIB6JZE2M3Z
EMR_STEP_ID=s-01799401JPIB6JZE2M3Z
QTLIB=/usr/lib64/qt-3.3/lib
HOME=/home/hadoop
SHLVL=1
HADOOP_IDENT_STRING=hadoop
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-01799401JPIB6JZE2M3Z/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-01799401JPIB6JZE2M3Z/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-01799401JPIB6JZE2M3Z
INFO ProcessRunner started child process 736
2025-06-14T10:23:28.501Z INFO HadoopJarStepRunner.Runner: startRun() called for s-01799401JPIB6JZE2M3Z Child Pid: 736
INFO Synchronously wait child process to complete : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
The log for completed step is normal:


Please help me to figure it out. Thank you!