pyspark gets stuck in Running due to import issue

0

In short: I run a pySpark application on AWS's EMR. When I map an rdd using a custom function that resides in an external module in an external package (shipped inside a .zip file as --py-files) the cluster gets stuck - the Running status is kept while no more log lines appear until I manually terminate it.

What it is not: It is not a proper import exception - as this would have terminated the application upon executing the import lines, raising the appropriate exception, which does not happen. Also, as seen below, calling a function that maps with a similar function as a lambda, when the called function resides in the "problematic" module - works.

What it is: Only when the program tries to use a function from that module as a mapping function in a transformation that is written in the main program does the bug occur. Additionally, if I remove the import line highlighted in the external file (the "problematic" module) - an import that is never used there in this minimal bug-reproduction context (but in the actual context it is used) - the bug ceases to exist.

Below is the code for the minimal example of the bug, including commenting of 2 important lines, and some technical info. Any help would be appreciated.

Here is the main program:

import spark_context_holder
from reproducing_bugs_external_package import reproducing_bugs_external_file


sc = spark_context_holder.sc
log = spark_context_holder.log


def make_nums_rdd():
    return sc.parallelize([1, 2, 3] * 300).map(lambda x: x * x / 1.45)

log.warn("Starting my code!")
sum = sc.parallelize([1,2,3]*300).map(lambda x: x*x/1.45).sum()
log.warn("The calculated sum using in-line expression, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(sum))
simple_sum_rdd = make_nums_rdd()
log.warn("The calculated sum using the in-file function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
simple_sum_rdd = reproducing_bugs_external_file.make_nums_rdd(sc)
log.warn("The calculated sum using the external file's function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
simple_sum_rdd = sc.parallelize([1,2,3]*300).map(reproducing_bugs_external_file.calc_func)
log.warn("The calculated sum using the external file's mapping function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
# This last line does not get logged, while the others up until this one do. Here the cluster gets stuck on Running status without outputting any more log lines

In the zip file shipped as --py-files I have the following structure:

-spark_context_holde.py
-reproducing_bugs_external_package
-- init.py
-- reproducing_bugs_external_file.py

And here are their respective contents:

spark_context_holder.py

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("kac_walk_experiment")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
log4jLogger = sc._jvm.org.apache.log4j
log = log4jLogger.LogManager.getLogger("dbg_et")

# sc.setLogLevel("ALL")

def getParallelismAlternative():
    return int(sc.getConf().get('spark.cores.max'))

init.py

from . import reproducing_bugs_external_file

__all__ = [reproducing_bugs_external_file]

reproducing_bugs_external_file.py

import numpy
import spark_context_holder  # If this is removed - the bug stops!


def make_nums_rdd(sc):
    return sc.parallelize([1, 2, 3] * 300).map(lambda x: x * x / 1.45)


def calc_func(x):
    return x*x/1.45

More technical details:

Release label:emr-5.17.0
Hadoop distribution:Amazon 2.8.4
Applications:Spark 2.3.1
using python3.4 which is the 3 version installed on AWS's machines to date

All this can be seen in SO https://stackoverflow.com/questions/54011119/an-import-issue-gets-pyspark-on-aws-stuck-in-running-status

etl
asked 5 years ago465 views
1 Answer
0

I got an answer on SO: https://stackoverflow.com/a/54208443/3836051
Essentially the problem was making the executors run the code that creates a new spark context by importing "spark_context_holder" in a module they import and use.

etl
answered 5 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions