Skip to content

Can I configure the AWS Glue Data Catalog Crawler to use specific (or precise) data types when creating tables from Parquet files in S3?

0

Hello, I have the following data pipeline: RDS DB → DMS → S3 (Parquet) ↔ Glue Data Catalog ← Redshift (Spectrum). I'm using an AWS Glue Crawler to create tables for the external database (based on Parquet files in S3).

Currently, when the crawler runs, it assigns the data type tinyint, which is not supported by Redshift. As a result, I have to manually edit each table after every crawler run.

Is there any way to automate this process?

Thanks.

asked 10 months ago188 views
1 Answer
1
Accepted Answer

To avoid creating a script for each table and hardcoding field names, you can process all tables in a specific database and apply schema changes using a script.

Updated Glue Job Script

This script iterates through all tables in a specified database, dynamically retrieves their schemas, and applies the desired transformations without hardcoding field names.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'DATABASE_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

database_name = args['DATABASE_NAME']

# Get all tables in the specified database
tables = glueContext.catalog_client.get_tables(DatabaseName=database_name)['TableList']

for table in tables:
    table_name = table['Name']
    print(f"Processing table: {table_name}")

    # Read the data from the Glue Data Catalog
    datasource = glueContext.create_dynamic_frame.from_catalog(
        database=database_name,
        table_name=table_name
    )

    # Convert to DataFrame to modify schema
    df = datasource.toDF()

    # Dynamically cast columns based on their current data type
    for column, dtype in df.dtypes:
        if dtype == "tinyint":  # Example: Change tinyint to smallint
            df = df.withColumn(column, df[column].cast("smallint"))
        elif dtype == "string":  # Example: Trim strings
            df = df.withColumn(column, df[column].trim())

    # Convert back to DynamicFrame
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")

    # Write the data back to the Glue Data Catalog
    glueContext.write_dynamic_frame.from_catalog(
        frame=dynamic_frame,
        database=database_name,
        table_name=table_name,
        format="parquet"
    )

job.commit()

Notes

  • Modify the transformation logics per your requirements.
  • Ensure the IAM role used by the Glue Job has permissions to access the Glue Catalog and S3 buckets.
  • Test the script on a subset of tables before running it on the entire database.
EXPERT
answered 10 months ago
  • Thanks! It will try it.

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.