Skip to content

Pyspark add only few columns from database table in aws glue python

0

I want to add only two columns EXECUTION_MARKET and TA_NUMBERSYS from database table DSUSER00.SECTANR in glue python script but not sure how to do it.

Below is my code:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import datetime
from pyspark.sql import functions as F
from pyspark.sql.functions import to_date, col


args = getResolvedOptions(sys.argv, ['target_BucketName', 'JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#s3 bucket path for creation of BCR table folder
outputbucketname = args['target_BucketName']
timestamp = datetime.datetime.now().strftime("%Y%m%d")

#sectanr
filename_sectanr = f"sectanr{timestamp}"
output_path_sectanr = f"{outputbucketname}/{filename_sectanr}"


#------------------------------------SECTANR----------------------------------------------------------

# Script generated for node Relational DB
RelationalDB_node_sectanr = glueContext.create_dynamic_frame.from_options(
    connection_type = "oracle",
    connection_options = {
        "useConnectionProperties": "true",
        "dbtable": "DSUSER00.SECTANR",
        "connectionName": "DS7PSEC",
        "hashfield":"AENDERUNGSDATLO",
        "hashpartitions":"152",
        "jobBookmarkKeys":["AENDERUNGSDATLO"],
        "jobBookmarkKeysSortOrder":"asc",
    },
    transformation_ctx = "RelationalDB_node_sectanr"
)

# Convert DynamicFrame to DataFrame sectanr
df_sectanr = RelationalDB_node_sectanr.toDF()

#Filter last 5 years data
#df_sectanr_filtered = df_sectanr.filter(F.col("to_date('19700101','yyyymmdd') + (((AENDERUNGSDATLO/60)/60)/24)") > "to_date('20220101','yyyymmdd')")
df_sectanr_filtered = df_sectanr.filter(
    to_date('19700101', 'yyyyMMdd').cast('timestamp') + 
    (col('AENDERUNGSDATLO') / 60 / 60 / 24).cast('interval') > 
    to_date('20220101', 'yyyyMMdd')
)

# Repartition the DataFrame to control output files sectanr
df_repartitioned_sectanr = df_sectanr_filtered.repartition(20)  # Adjust '10' based on your data volume


# Check for empty partitions and write only if data is present
if not df_repartitioned_sectanr.rdd.isEmpty():
    df_repartitioned_sectanr.write.format("csv") \
        .option("compression", "gzip") \
        .option("header", "true") \
        .option("delimiter", "Ə") \
        .option("ignoreLeadingWhiteSpace", "false") \
        .option("ignoreTrailingWhiteSpace", "false") \
        .save(output_path_sectanr)
        
job.commit()
asked 8 months ago72 views
2 Answers
1

Hi,

To add the specified two columns “EXECUTION_MARKET” and “TA_NUMBERSYS” from the database table “DSUSER00.SECTANR” using glue python script can be done as follows:

  1. First create a Dynamic Frame to select the columns that you want to add in the data catalog table:
#Create a DynamicFrame from your source table
source_dyf = glueContext.create_dynamic_frame.from_catalog(
    database="database_name”,
    table_name="table_name”
)
#Select only the desired columns
selected_dyf = DynamicFrame.select_fields(source_dyf, ['EXECUTION_MARKET', ‘TA_NUMBERSYS'])
  1. Then convert the Dynamic Frame script into Data Frame script:
source_df = source_dyf.toDF()
selected_df = source_df.select("EXECUTION_MARKET”, "TA_NUMBERSYS")
selected_dyf = DynamicFrame.fromDF(selected_df, glueContext, "selected_dyf") 
  1. And write the data into the desired path as per your use case:
glueContext.write_dynamic_frame.from_options(
    frame=selected_dyf,
    connection_type="s3",
    connection_options={
        "path": "s3://bucket/folder/“
)

This approach will ensure that only the specified columns are read from the database and processed in your Glue job.

Note: The code snippets that shared about are just for your reference. Kindly modify it according to your use case.

Sources:

[+]https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python.html [+]https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html

AWS
answered 8 months ago
0
  1. First, when creating the dynamic frame, specify the columns you want to select:
RelationalDB_node_sectanr = glueContext.create_dynamic_frame.from_options(
    connection_type = "oracle",
    connection_options = {
        "useConnectionProperties": "true",
        "dbtable": "DSUSER00.SECTANR",
        "connectionName": "DS7PSEC",
        "hashfield":"AENDERUNGSDATLO",
        "hashpartitions":"152",
        "jobBookmarkKeys":["AENDERUNGSDATLO"],
        "jobBookmarkKeysSortOrder":"asc",
        "customSql": "SELECT EXECUTION_MARKET, TA_NUMBERSYS, AENDERUNGSDATLO FROM DSUSER00.SECTANR"
    },
    transformation_ctx = "RelationalDB_node_sectanr"
)
  1. Then, when you convert the DynamicFrame to a DataFrame, you'll only have these columns:
df_sectanr = RelationalDB_node_sectanr.toDF()
  1. You can keep your filtering logic, but it will only apply to the AENDERUNGSDATLO column:
df_sectanr_filtered = df_sectanr.filter(
    to_date('19700101', 'yyyyMMdd').cast('timestamp') + 
    (col('AENDERUNGSDATLO') / 60 / 60 / 24).cast('interval') > 
    to_date('20220101', 'yyyyMMdd')
)
  1. The rest of your code can remain the same. When you write the DataFrame to CSV, it will only contain the EXECUTION_MARKET and TA_NUMBERSYS columns, plus the AENDERUNGSDATLO column used for filtering.

This approach will ensure that only the specified columns are read from the database and processed in your Glue job, which can improve performance and reduce data transfer if these columns are significantly smaller than the full table.

Remember to adjust any other parts of your script that might depend on columns that are no longer present in the DataFrame after this change.

Sources
Program AWS Glue ETL scripts in PySpark - AWS Glue
pyspark trimming all fields bydefault while writing into csv in aws glue python | AWS re:Post

answered 8 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.