UDF stored in S3 unable to run as a glue job

0

Trying to use a UDF for converting a column to uppercase UDF is in the S3 bucket .zip file which is accessed by the Glue job py script of UDF for reference attached ISSUE: The UDF is not storing the data in the expected S3 bucket nor it is reading the data from the S3 bucket provided

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Import the UDF code from Amazon S3
spark.sparkContext.addPyFile("s3://**********")
from mylibrary import udf_uppercase

# Create a Spark context
sc = SparkContext()

# Create a Glue context
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Define the AWS Glue job
job = Job(glueContext)
job.init("uppercase_testUDF", args)

# Read data from S3
datasource0 = glueContext.create_dynamic_frame.from_options(
    "s3", 
    {"paths": ["s3://***********"]},
    format="json")

# Apply the UDF to the data
transformed_data = datasource0.toDF().withColumn("column_name", udf_uppercase(datasource0["column_name"]))

transformed_data.show()
Write the transformed data back to S3
glueContext.write_dynamic_frame.from_options(
    frame=transformed_data,
    connection_type="s3",
    connection_options={
        "path": "s3://*****************",
        "format": "json"
    },
    transformation_ctx="test_uppercase_bookmark"
)

# Run the AWS Glue job
job.commit()

2 Answers
0
  1. Your UDF file should be at the root level of your zip file , if you are not making a py package .
  2. Instead of importing like from mylibrary import udf_uppercase , just use import udf_uppercase given that udf_uppercase.py is your UDF file name
  3. Try to use the Python Library Path text box in the job properties tab. give the full path like : s3://BucketName/utils/utils.zip

Refer : https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-libraries.html

answered a year ago
0

That should work (assuming that your file is named mylibrary.py) so that import can work.
The issue I see there is that you convert to DataFrame, add a column (still DataFrame) and then try to pass it to write_dynamic_frame which is intended for DynamicFrame.

You should receive an error like this: TypeError: frame_or_dfc must be DynamicFrame orDynamicFrameCollection. Got <class 'pyspark.sql.dataframe.DataFrame'>

BTW, I guess that's just an example, it's much more efficient if you don't use udfs and do upper case using a SQL function.

profile pictureAWS
EXPERT
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions