Skip to content

Pyspark provide correct filter for database table in aws glue python

0

I want to provide filter for table _test_rw_omuc_baag__test1_app where to_date('19700101','yyyymmdd') + (((AENDERUNGSDATLO/60)/60)/24) > to_date('20220101','yyyymmdd') in glue python script but not sure how to do it.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import datetime
from pyspark.sql import functions as F

args = getResolvedOptions(sys.argv, ['target_BucketName', 'JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#s3 bucket path for creation of BCR table folder
outputbucketname = args['target_BucketName']
timestamp = datetime.datetime.now().strftime("%Y%m%d")

#sectanr
filename_sectanr = f"sectanr{timestamp}"
output_path_sectanr = f"{outputbucketname}/{filename_sectanr}"


#------------------------------------SECTANR----------------------------------------------------------

# Script generated for node Relational DB
RelationalDB_node_sectanr = glueContext.create_dynamic_frame.from_options(
    connection_type = "oracle",
    connection_options = {
        "useConnectionProperties": "true",
        "dbtable": "DSUSER00.SECTANR",
        "connectionName": "DS7PSEC",
        "hashfield":"AENDERUNGSDATLO",
        "hashpartitions":"152",
        "jobBookmarkKeys":["AENDERUNGSDATLO"],
        "jobBookmarkKeysSortOrder":"asc",
    },
    transformation_ctx = "RelationalDB_node_sectanr"
)

# Convert DynamicFrame to DataFrame sectanr
df_sectanr = RelationalDB_node_sectanr.toDF()

# Repartition the DataFrame to control output files sectanr
df_repartitioned_sectanr = df_sectanr_filtered.repartition(20)  # Adjust '10' based on your data volume


# Check for empty partitions and write only if data is present
if not df_repartitioned_sectanr.rdd.isEmpty():
    df_repartitioned_sectanr.write.format("csv") \
        .option("compression", "gzip") \
        .option("header", "true") \
        .option("delimiter", "Ə") \
        .option("ignoreLeadingWhiteSpace", "false") \
        .option("ignoreTrailingWhiteSpace", "false") \
        .save(output_path_sectanr)
        
job.commit()
asked 8 months ago123 views
2 Answers
1

Hi,

To provide filter for table _test_rw_omuc_baag__test1_app where to_date('19700101','yyyymmdd') + (((AENDERUNGSDATLO/60)/60)/24) > to_date('20220101','yyyymmdd') in glue python script.

Kindly refer the below.

Sample data on which the code snippet is tested:

id,timestamp_string
1,20250314
2,20250315
  1. For the conversion from string value to date format, create a DynamicFrame first and convert it to a DataFrame.
# Read from Glue catalog table
dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
    database=source_database,
    table_name=source_table
)

# Convert DynamicFrame to DataFrame
df = dynamic_frame.toDF()

# Print source schema and sample data
print("Source Schema:")
dynamic_frame.printSchema()
print("\nSample Input Data:")
df.show(5)
  1. Then convert string to date format using the below script.
# Convert string to date with basic error handling
df = df.withColumn("timestamp_date", 
    when(col("timestamp_string").rlike("^\\d{8}$"),
        to_date(col("timestamp_string"), "yyyyMMdd")
    ).otherwise(None)
)

# Print sample of converted data
print("\nSample Output Data:")
df.show(5)
  1. Covert your DataFrame back to DynamicFrame
# Convert back to DynamicFrame
output_dyf = DynamicFrame.fromDF(df, glueContext, "output_dyf")
  1. And write the data into the desired S3 path as per your use case:
# Write to S3
glueContext.write_dynamic_frame.from_options(
    frame=output_dyf,
    connection_type="s3",
    connection_options={
        "path": output_path
    },
    format="parquet"
)

Output for the tested sample data:

+---+----------------+--------------+
| id|timestamp_string|timestamp_date|
+---+----------------+--------------+
|  1|        20250314|    2025-03-14|
|  2|        20250315|    2025-03-15|
+---+----------------+--------------+

This approach will ensure that the conversion from string value to date format in the Glue job.

Note: The code snippets that shared above are just for reference. Kindly modify it according to your use case.

Sources: [1] Program AWS Glue ETL scripts in PySpark: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python.html [2] DynamicFrame class: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html

AWS
answered 8 months ago
0

To apply the filter you described for the table _test_rw_omuc_baag__test1_app in your AWS Glue Python script, you can modify your code as follows:

First, import the necessary functions:

from pyspark.sql.functions import to_date, col

Then, after creating your DynamicFrame and converting it to a DataFrame, apply the filter:

# Convert DynamicFrame to DataFrame
df_sectanr = RelationalDB_node_sectanr.toDF()

# Apply the filter
df_sectanr_filtered = df_sectanr.filter(
    to_date(lit('19700101'), 'yyyyMMdd').cast('timestamp') + 
    (col('AENDERUNGSDATLO') / 60 / 60 / 24).cast('interval') > 
    to_date(lit('20220101'), 'yyyyMMdd')
)

# Repartition the filtered DataFrame
df_repartitioned_sectanr = df_sectanr_filtered.repartition(20)

This filter converts the date strings to date types, calculates the interval based on AENDERUNGSDATLO, and compares it with the specified date (January 1, 2022). The filter will keep only the rows where the calculated date is after January 1, 2022.

Make sure to use the filtered DataFrame (df_sectanr_filtered) in your subsequent operations, such as repartitioning and writing to CSV.

Remember to adjust the column name 'AENDERUNGSDATLO' if it's different in your actual data. Also, ensure that the AENDERUNGSDATLO column in your data contains the expected numeric values (seconds since epoch) for this calculation to work correctly.

This approach should provide the correct filter for your database table in the AWS Glue Python script.
Sources
How to provide filter with date in glue python | AWS re:Post
Program AWS Glue ETL scripts in PySpark - AWS Glue
AWS Glue PySpark transforms reference - AWS Glue

answered 8 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.