Skip to content

Create step to build simple ALS model with PySpark ML Lib but not working, but some simple task can be done easily

0

I'm trying to build a simple Collaborative Filtering Recommendation Engine using Apache Spark ML lib on Amazon EMR. So I created a EMR on EC2 cluster, with the following configuration:

Enter image description here

Enter image description here

I tried to run my ALS code but it failed every time, but I tried a simple step which is just formatting some data and it done successfully:

Enter image description here

Here is the code I used to train ALS model:

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, explode
from pyspark.sql.types import StructType, StructField, IntegerType
import sys
import logging

args = sys.argv[1:]
bucket = args[0]

logging.info("bucket " + bucket)

with SparkSession.builder.appName("CollaborativeFilteringRecommendationEngine").config(
    "spark.sql.adaptive.enabled", "true"
).config(
    "spark.sql.adaptive.coalescePartitions.enabled", "true"
).getOrCreate() as spark:

    spark.sparkContext.setLogLevel("DEBUG")
    logging.info("Spark Session Created")

    schema = StructType(
        [
            StructField("user_id", IntegerType(), True),
            StructField("item_id", IntegerType(), True),
            StructField("rating", IntegerType(), True),
        ]
    )

    # Load data from S3
    ratings_df = (
        spark.read.option("header", "true")
        .schema(schema)
        .csv(f"s3://{bucket}/data/rating.csv")
    )

    ratings_df.show(10)

    # 80% train, 20% test
    (training, test) = ratings_df.randomSplit([0.8, 0.2], seed=42)

    training.cache()
    test.cache()

    als = ALS(
        maxIter=10,
        regParam=0.1,
        userCol="user_id",
        itemCol="item_id",
        ratingCol="rating",
        coldStartStrategy="drop",
        nonnegative=True,
        implicitPrefs=False,
    )

    # Train the model
    print("Training ALS model...")
    model = als.fit(training)

    # Make predictions on test data
    predictions = model.transform(test)
    predictions.show(10)

    # Evaluate model performance
    evaluator = RegressionEvaluator(
        metricName="rmse", labelCol="rating", predictionCol="prediction"
    )
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Square Error (RMSE): {rmse}")

    mae_evaluator = RegressionEvaluator(
        metricName="mae", labelCol="rating", predictionCol="prediction"
    )
    mae = mae_evaluator.evaluate(predictions)
    print(f"Mean Absolute Error (MAE): {mae}")

    model_path = f"s3://{bucket}/model"
    model.save(model_path)

Here is the log of failed step:

stderr

25/06/14 10:23:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/14 10:23:31 INFO DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ip-10-0-13-71.ec2.internal/10.0.13.71:8032
25/06/14 10:23:32 INFO Configuration: resource-types.xml not found
25/06/14 10:23:32 INFO ResourceUtils: Unable to find 'resource-types.xml'.
25/06/14 10:23:32 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (12288 MB per container)
25/06/14 10:23:32 INFO Client: Will allocate AM container, with 2432 MB memory including 384 MB overhead
25/06/14 10:23:32 INFO Client: Setting up container launch context for our AM
25/06/14 10:23:32 INFO Client: Setting up the launch environment for our AM container
25/06/14 10:23:32 INFO Client: Preparing resources for our AM container
25/06/14 10:23:32 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/06/14 10:23:35 INFO Client: Uploading resource file:/mnt/tmp/spark-d6ec1322-83b1-4baa-8502-6eb83f2ae046/__spark_libs__8023384351060371926.zip -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/__spark_libs__8023384351060371926.zip
25/06/14 10:23:36 INFO Client: Uploading resource file:/etc/hudi/conf.dist/hudi-defaults.conf -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/hudi-defaults.conf
25/06/14 10:23:37 INFO ClientConfigurationFactory: Set initial getObject socket timeout to 2000 ms.
25/06/14 10:23:38 INFO Client: Uploading resource s3://h-dataset-asl/gpt/main.py -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/main.py
25/06/14 10:23:39 INFO S3NativeFileSystem: Opening 's3://h-dataset-asl/gpt/main.py' for reading
25/06/14 10:23:39 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/pyspark.zip -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/pyspark.zip
25/06/14 10:23:39 INFO Client: Uploading resource file:/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/py4j-0.10.9.5-src.zip
25/06/14 10:23:40 INFO Client: Uploading resource file:/mnt/tmp/spark-d6ec1322-83b1-4baa-8502-6eb83f2ae046/__spark_conf__7120570014526316113.zip -> hdfs://ip-10-0-13-71.ec2.internal:8020/user/hadoop/.sparkStaging/application_1749896428067_0001/__spark_conf__.zip
25/06/14 10:23:40 INFO SecurityManager: Changing view acls to: hadoop
25/06/14 10:23:40 INFO SecurityManager: Changing modify acls to: hadoop
25/06/14 10:23:40 INFO SecurityManager: Changing view acls groups to: 
25/06/14 10:23:40 INFO SecurityManager: Changing modify acls groups to: 
25/06/14 10:23:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
25/06/14 10:23:40 INFO Client: Submitting application application_1749896428067_0001 to ResourceManager
25/06/14 10:23:41 INFO YarnClientImpl: Submitted application application_1749896428067_0001

controller

2025-06-14T10:23:28.498Z INFO Ensure step 1 jar file command-runner.jar
2025-06-14T10:23:28.499Z INFO StepRunner: Created Runner for step 1
INFO startExec 'hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster s3://h-dataset-asl/gpt/main.py'
INFO Environment:
  PATH=/usr/lib64/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/opt/aws/puppet/bin/
  SECURITY_PROPERTIES=/emr/instance-controller/lib/security.properties
  HISTCONTROL=ignoredups
  HISTSIZE=1000
  HADOOP_ROOT_LOGGER=INFO,DRFA
  JAVA_HOME=/etc/alternatives/jre
  AWS_DEFAULT_REGION=us-east-1
  LANG=en_US.UTF-8
  MAIL=/var/spool/mail/hadoop
  LOGNAME=hadoop
  PWD=/
  HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-01799401JPIB6JZE2M3Z/tmp
  _=/etc/alternatives/jre/bin/java
  LESSOPEN=||/usr/bin/lesspipe.sh %s
  SHELL=/bin/bash
  QTINC=/usr/lib64/qt-3.3/include
  USER=hadoop
  HADOOP_LOGFILE=syslog
  HOSTNAME=ip-10-0-13-71
  QTDIR=/usr/lib64/qt-3.3
  HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-01799401JPIB6JZE2M3Z
  EMR_STEP_ID=s-01799401JPIB6JZE2M3Z
  QTLIB=/usr/lib64/qt-3.3/lib
  HOME=/home/hadoop
  SHLVL=1
  HADOOP_IDENT_STRING=hadoop
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-01799401JPIB6JZE2M3Z/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-01799401JPIB6JZE2M3Z/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-01799401JPIB6JZE2M3Z
INFO ProcessRunner started child process 736
2025-06-14T10:23:28.501Z INFO HadoopJarStepRunner.Runner: startRun() called for s-01799401JPIB6JZE2M3Z Child Pid: 736
INFO Synchronously wait child process to complete : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...

The log for completed step is normal: Enter image description here

Enter image description here

Please help me to figure it out. Thank you!

asked 10 months ago115 views
1 Answer
0

Based on the logs you've provided, it appears your ALS model training job is failing, but the logs don't show the complete error message. The logs only show the job submission process to YARN, but not the actual execution or failure point of your PySpark application.

Here are some potential issues and solutions:

  1. Incomplete Error Logs: The logs you've shared only show the job submission to YARN, not the actual execution or error. You'll need to check the YARN application logs for the complete error message. You can access these through the EMR console by navigating to the Application History tab or directly through the YARN ResourceManager UI.

  2. Python Dependencies: Your ALS model might require specific Python libraries that aren't available in the default EMR environment. For PySpark jobs, you may need to create a Python virtual environment with all required dependencies and submit it with your job.

  3. Spark Configuration: The ALS algorithm is more resource-intensive than simple data formatting tasks. You might need to adjust your Spark configuration parameters for memory allocation, especially if you're working with a large dataset.

  4. Code Structure: Ensure your PySpark code is properly structured for cluster execution. The way you're initializing SparkSession with a context manager (with SparkSession...) might be causing issues in a distributed environment.

Here's what I recommend:

  1. Modify your code to use a more standard SparkSession initialization:
spark = SparkSession.builder.appName("CollaborativeFilteringRecommendationEngine") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()

# Your code here

spark.stop()  # Instead of using with context
  1. Add more logging to help identify where the failure occurs:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Add log statements at key points
logger.info("Starting application")
logger.info("Loading data")
# etc.
  1. Consider adding Spark configuration parameters to allocate more resources:
--conf spark.executor.memory=4g 
--conf spark.driver.memory=2g
  1. If you're using custom Python libraries, package them with your application using a virtual environment:
--conf spark.archives=s3://your-bucket/pyspark_venv.tar.gz#environment 
--conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python
--conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python 
--conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python

By implementing these changes and checking the complete application logs, you should be able to identify and resolve the issue with your ALS model training job.
Sources
EMR Jupyter Notebook: PySpark Imports Work in Shell, Not in Notebook- Issue is importing custom files | AWS re:Post
Add a Spark step - Amazon EMR
Using Python libraries with EMR Serverless - Amazon EMR
Build a data lakehouse in a hybrid Environment using Amazon EMR Serverless, Apache DolphinScheduler, and TiDB | AWS Big Data Blog
spark env issues post custom ami creation for EMR on EC2 cluster stuck in steps failure | AWS re:Post

answered 10 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.