Query/Transform nested and inconsistent data types in AWS Glue

0

I have an array which is stored inside s3 bucket that looks like

[
    {
        "bucket_name": "ababa",
        "bucket_creation_date": "130999",
        "additional_data": {
            "bucket_acl": [
                {
                    "Grantee": {
                        "DisplayName": "abaabbb",
                        "ID": "abaaaa",
                        "Type": "CanonicalUser"
                    },
                    "Permission": "FULL_CONTROL"
                }
            ],
            "bucket_policy": {
                "Version": "2012-10-17",
                "Id": "abaaa",
                "Statement": [
                    {
                        "Sid": "iddd",
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "logging.s3.amazonaws.com"
                        },
                        "Action": "s3:PutObject",
                        "Resource": "aarnnn"
                    },
                    {
                        "Effect": "Deny",
                        "Principal": "*",
                        "Action": [
                            "s3:GetBucket*",
                            "s3:List*",
                            "s3:DeleteObject*"
                        ],
                        "Resource": [
                            "arn:aws:s3:::1111-aaa/*",
                            "arn:aws:s3:::1111-bbb"
                        ],
                        "Condition": {
                            "Bool": {
                                "aws_SecureTransport": "false"
                            }
                        }
                    }
                ]
            },
            "public_access_block_configuration": {
                "BlockPublicAcls": true,
                "IgnorePublicAcls": true,
                "BlockPublicPolicy": true,
                "RestrictPublicBuckets": true
            },
            "website_hosting": {},
            "bucket_tags": [
                {
                    "Key": "keyyy",
                    "Value": "valueee"
                }
            ]
        },
        "processed_data": {}
    },
.......................
]

NOTE- some of the field may be string/array/struct based on the data we get(eg actions can be array or string)

END GOAL- I want to query inside this data and look for multiple conditions and then create a field inside processed_data and set it to true/false based on the query using AWS Glue

Example- For each object inside the array, i want to check :

1- if bucket_acl has grantee.type=CanonicalUser and Permission=FULL_CONTROL
AND
2- if bucket_policy has statement that contains Effect=Allow and Principal=* and Action = ...... and Resources = ...... and condition is empty
AND
3- website_hosting is empty
and then create a field inside processes_data and set it to true if the above query satisfies eg- processed_data:{ isPublic: True}

Approaches I Tried:

1- I tried saving the data in s3 bucket in parquet format using aws-wrangler/aws-pandas for faster querying and then getting the data in aws glue using glue dynamic frame:

S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={"paths": ["s3://abaabbb/abaaaaa/"], "recurse": True},
    transformation_ctx="S3bucket_node1",
)
S3bucket_node1.printSchema()
S3bucket_node1.show()

Output:

root
|-- bucket_name: string
|-- bucket_creation_date: string
|-- additional_data: string
|-- processed_data: string

{"bucket_name": "abaaaa", "bucket_creation_date": "139999", "additional_data": "{'bucket_acl': [{'Grantee': {'DisplayName': 'abaaaaaa', 'ID': 'abaaa', 'Type': 'CanonicalUser'}, 'Permission': 'FULL_CONTROL'}], 'bucket_policy': {}, 'public_access_block_configuration': {'BlockPublicAcls': True, 'IgnorePublicAcls': True, 'BlockPublicPolicy': True, 'RestrictPublicBuckets': True}, 'website_hosting': {}, 'bucket_tags': []}", "processed_data": "{}"}

Getting everything as string, seems like most of these libraries doesn't support nested data types

2- Tried saving the data as it is(in json) using put object API and then getting the data in aws glue using glue dynamic frame:

piece1 = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": True},
    connection_type="s3",
    format="json",
    connection_options={"paths": ["s3://raghav-test-df/raghav3.json"], "recurse": True},
    transformation_ctx="S3bucket_node1",
)
piece1.printSchema()
piece1.show()
piece1.count()

Output:

root


0

Getting no schema and count as 0

3- Tried getting the data using spark data frame:

sparkDF=spark.read.option("inferSchema", "true").option("multiline", "true").json("s3://ababa/abaa.json")
sparkDF.printSchema()
sparkDF.count()
sparkDF.show()

Output-

root
 |-- additional_data: struct (nullable = true)
 |    |-- bucket_acl: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Grantee: struct (nullable = true)
 |    |    |    |    |-- DisplayName: string (nullable = true)
 |    |    |    |    |-- ID: string (nullable = true)
 |    |    |    |    |-- Type: string (nullable = true)
 |    |    |    |-- Permission: string (nullable = true)
 |    |-- bucket_policy: struct (nullable = true)
 |    |    |-- Id: string (nullable = true)
 |    |    |-- Statement: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Action: string (nullable = true)
 |    |    |    |    |-- Condition: struct (nullable = true)
 |    |    |    |    |    |-- Bool: struct (nullable = true)
 |    |    |    |    |    |    |-- aws:SecureTransport: string (nullable = true)
 |    |    |    |    |    |-- StringEquals: struct (nullable = true)
 |    |    |    |    |    |    |-- AWS:SourceAccount: string (nullable = true)
 |    |    |    |    |    |    |-- AWS:SourceArn: string (nullable = true)
 |    |    |    |    |    |    |-- aws:PrincipalAccount: string (nullable = true)
 |    |    |    |    |    |    |-- s3:x-amz-acl: string (nullable = true)
 |    |    |    |    |-- Effect: string (nullable = true)
 |    |    |    |    |-- Principal: string (nullable = true)
 |    |    |    |    |-- Resource: string (nullable = true)
 |    |    |    |    |-- Sid: string (nullable = true)
 |    |    |-- Version: string (nullable = true)
 |    |-- bucket_tags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Key: string (nullable = true)
 |    |    |    |-- Value: string (nullable = true)
 |    |-- public_access_block_configuration: struct (nullable = true)
 |    |    |-- BlockPublicAcls: boolean (nullable = true)
 |    |    |-- BlockPublicPolicy: boolean (nullable = true)
 |    |    |-- IgnorePublicAcls: boolean (nullable = true)
 |    |    |-- RestrictPublicBuckets: boolean (nullable = true)
 |-- bucket_creation_date: string (nullable = true)
 |-- bucket_name: string (nullable = true)

Getting the schema and correct count, but some of the field has different data types(eg actions can be string or array) and spark makes them default to string, i think querying the data based on multiple conditions using sql will be too complex

Do i need to change the approach or something else, i am stuck here Can someone please help in achieving the end goal?

  • If you know the schema ahead, Could you define the schema and try. For the cases where fields can be string or array, Spark defaults it to String. you could programatically try checking the string and define your logic based on how it looks. I dont know if there is way to define it as list for some records while string for some records.

asked a year ago686 views
1 Answer
0

Hi Raghav,

The approach you are using is correct. AWS Glue DynamicFrames are a perfect fit for this kind of data. Transforming the data to a simpler layout will certainly help you simplify your queries.

I was able to get the schema populated for the sample data above:

Code Snippet I used:

piece1 = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": True, "jsonPath":"$[*]"},
    connection_type="s3",
    format="json",
    connection_options={"paths": ["s3://BUCKET_NAME/PREFIX_KEY/repost_sample.json"]},
    transformation_ctx="S3bucket_node1",
)

piece1.printSchema()

Notice that I omitted the recurse parameter - since you are reading a single file and not a nested directory, this is not required. Also, I added the jsonPath to format_options to specify the location of the records within JSON.

I was able to get the right schema (with choice datatype created as per Glue's default behaviour - which allows us to use ResolveChoice resolve conflicts in datatype - Refer example in docs)

root
|-- bucket_name: string
|-- bucket_creation_date: string
|-- additional_data: struct
|    |-- bucket_acl: array
|    |    |-- element: struct
|    |    |    |-- Grantee: struct
|    |    |    |    |-- DisplayName: string
|    |    |    |    |-- ID: string
|    |    |    |    |-- Type: string
|    |    |    |-- Permission: string
|    |-- bucket_policy: struct
|    |    |-- Version: string
|    |    |-- Id: string
|    |    |-- Statement: array
|    |    |    |-- element: struct
|    |    |    |    |-- Sid: string
|    |    |    |    |-- Effect: string
|    |    |    |    |-- Principal: choice
|    |    |    |    |    |-- string
|    |    |    |    |    |-- struct
|    |    |    |    |    |    |-- Service: string
|    |    |    |    |-- Action: choice
|    |    |    |    |    |-- array
|    |    |    |    |    |    |-- element: string
|    |    |    |    |    |-- string
|    |    |    |    |-- Resource: choice
|    |    |    |    |    |-- array
|    |    |    |    |    |    |-- element: string
|    |    |    |    |    |-- string
|    |    |    |    |-- Condition: struct
|    |    |    |    |    |-- Bool: struct
|    |    |    |    |    |    |-- aws_SecureTransport: string
|    |-- public_access_block_configuration: struct
|    |    |-- BlockPublicAcls: boolean
|    |    |-- IgnorePublicAcls: boolean
|    |    |-- BlockPublicPolicy: boolean
|    |    |-- RestrictPublicBuckets: boolean
|    |-- website_hosting: struct
|    |-- bucket_tags: array
|    |    |-- element: struct
|    |    |    |-- Key: string
|    |    |    |-- Value: string
|-- processed_data: struct

In your case, you can use make_struct or make_cols specification (Refer docs for more info on these specs) to resolve type conflict and you can easily write a query to check the 2 columns (nested columns if you use make_struct) for values.

If you wish to continue using Spark DataFrames, I'd say string is actually the right choice for the Action column since it has to accommodate both string and array types. You can use json functions within your query engine to extract values necessary.

AWS
SUPPORT ENGINEER
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions