PySpark on Athena - how do you parallel map/reduce binary files without RDDs?

0

I have a large of binary files in an s3 bucket. These files are in a National Instruments format named "tdms." There's a python library for parsing these files, and I was able to follow the athena docs to add this library to my Athena notebook with no real problems.

Now I'm trying to do the actual analysis. I'm comfortable with Spark using both Scala and Python, but Spark on Athena is a little bit different. The main thing that's throwing me off is no RDDs.

I'm starting by loading my files from S3 like this:

dfs = spark.read.format("binaryFile").load("s3://mybucketname/*.tdms") 

That yields a Dataframe that looks like this:

DataFrame[path: string, modificationTime: timestamp, length: bigint, content: binary]

Perfect so far. Now, what I want to do is to, in parallel to a bunch of workers (DPUs) one file at a time. As a little test the first thing I tried was:

def count_the_size(x) -> int:
    return f.length

dfs.applymap( count_the_size ).sum()

but that doesn't work, as this type of 'DataFrame' doesn't have apply, applymap, etc. And, because there are no RDDs, there is no sparkContext.parallelize().

So now that I have this dataframe, what is the correct way of doing some processing using a lambda, to work in parallel through all the files then return something to me I can .reduce?

profile picture
asked 2 years ago414 views
1 Answer
0

Hello,

Hope you are doing well. Thanks for contacting us for the support.

Per your description, I understand that you are facing an issue with calling method "applymap" with a DataFrame, and kindly see my explanation as below:

dfs = spark.read.format("binaryFile").load("s3://mybucketname/*.tdms") 

The above code will return a "pyspark.sql.dataframe.DataFrame" object, and per [1], it does not have "applymap" function.

In case you need to use "applymap" function, you need to convert the DataFrame to "pyspark.pandas.frame.DataFrame" object which has the function [2]. Sample code to convert is as below [3]:

pd_dfs = dfs.to_pandas_on_spark()

Besides, you can check the actual datatypes of returned DataFrames, such as:

print(type(dfs))
print(type(pd_dfs))

Output:
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.pandas.frame.DataFrame'>

For your further question on what is the correct way of processing using Lambda, we need to look at your your use case to understand more about the context to give correct answers. Hence, I would request you to create a support request case, then we can go deep and discuss with you on the use case and provide appropriate solutions.

Thank you, and have a great day ahead.

===============

Reference:

[1] - https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html

[2] - https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.html

[3] - https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.DataFrame.to_pandas_on_spark.html

AWS
answered 2 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions