Questions tagged with AWS Glue

Content language: English

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Partition Projection for vpcflow logs in parquet with hive compatible prefixes

I am not sure if what I am asking for is possible or not, as I don't fully understand Athena and glue and all the nuances of how partition projection works. I am trying to switch all of our vpcflow logs from writing to CloudWatch Logs to writing directly to s3 in parquet format with hive compatible prefixes turned on. I have successfully gotten my parquet table and schema to work by loading the partitions using MSCK REPAIR TABLE. Initially I was going to call it done, but I'm worried about how long MSCK REPAIR TABLE will take to run to load new partitions and we are centralizing our vpcflow logs from many accounts into one s3 bucket. I also don't know if there is any cost implications the more objects in the bucket. So my thought was to use partition projection, which has the added benefit of not having to run a MSCK REPAIR TABLE on a scheduled basis. I haven't seen any examples of someone using partition projection with parquet formatted objects or hive compatible s3 prefixes. Can someone confirm if this is possible? Should I not be using hive compatible prefixes in this scenario? Here is what my glue table looks like where I am trying to use partition projection. It is from a Cloudformation Template so the base bucket name is a param but should be enough information to examine where I am going wrong: ``` GlueTableVpcFlowParquet: Type: AWS::Glue::Table Properties: CatalogId: Ref: AWS::AccountId DatabaseName: Ref: GlueDatabase TableInput: Name: Fn::Join: - '' - - Ref: TableNameVPCFlowParquet - Ref: CIAppend TableType: EXTERNAL_TABLE Parameters: skip.header.line.count: '1' projection.enabled: 'true' projection.aws_account_id.type: injected projection.year.type: integer projection.year.range: 2022,9999 projection.month.type: integer projection.month.range: 01,12 # projection.month.digits: '2' projection.day.type: integer projection.day.range: 01,31 projection.aws_region.type: enum projection.aws_region.values: Ref: TableRegions storage.location.template: Fn::Join: - '' - - Ref: TableLocationVpcFlowParquet - "/awslogs/aws-account-id=${aws_account_id}/aws-service=vpcflowlogs/aws-region=${aws_region}/year=${year}/month=${month}/day=${day}" PartitionKeys: - Name: aws_account_id Type: string - Name: aws_region Type: string - Name: year Type: int - Name: month Type: int - Name: day Type: int StorageDescriptor: Columns: - Name: version Type: int - Name: account_id Type: string - Name: interface_id Type: string - Name: srcaddr Type: string - Name: dstaddr Type: string - Name: srcport Type: int - Name: dstport Type: int - Name: protocol Type: bigint - Name: packets Type: bigint - Name: bytes Type: bigint - Name: start Type: bigint - Name: end Type: bigint - Name: action Type: string - Name: log_status Type: string Location: Fn::Join: - '' - - Ref: TableLocationVpcFlowParquet - "" InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat SerdeInfo: SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe BucketColumns: [] SortColumns: [] ``` Object prefixes in s3 look like this: s3://<bucketName>/AWSLogs/aws-account-id=012345678912/aws-service=vpcflowlogs/aws-region=us-east-1/year=2022/month=11/day=10/012345678912_vpcflowlogs_us-east-1_fl-003e10087957dd87a_20221110T1850Z_33d78e4c.log.parquet I run Athena search like this: SELECT * FROM "iaasdatasources"."vpcflowlogsparquet" WHERE aws_account_id='012345678912' and aws_region='us-east-1' and year=2022 and month=11 and day=14 limit 10 It seems to run for 20 seconds but returns zero results and no errors. Also if anyone has advice on doing a data_time partition projection instead of separate year month day partitions, that would be helpful. I wasn't sure what the format of should be considering the date locations in the path are hive formatted year=<year>/month=<month>/day=<day>
1
answers
0
votes
44
views
asked 20 days ago

Create table in GlueCatalog with Parquet format fails, even when enableUpdateCatalog

I am trying to modify a PySpark/Glue job to write a DynamicFrame to a new table in Glue Catalog, with a Parquet format. The database exists, the table does not yet. I am following the instructions posted here: https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html I am getting the error: ``` File "/tmp/resource_logs_with_attributes.py", line 443, in write_transformed_data_to_s3 format="glueparquet" File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 386, in write_dynamic_frame_from_catalog makeOptions(self._sc, additional_options), catalog_id) File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o91.getCatalogSink. : com.amazonaws.services.glue.model.EntityNotFoundException: Table api_logs_core not found. (Service: AWSGlue; Status Code: 400; Error Code: EntityNotFoundException; Request ID: 4ed5f8f4-e8bf-4764-afb8-425804d3bcd5; Proxy: null) ``` ``` sink = glueContext.getSink( connection_type="s3", path="s3://mybucket/test", enableUpdateCatalog=True, format="glueparquet" ) sink.setCatalogInfo( catalogDatabase='myexistinggluedatabase', catalogTableName='newtable' ) newdyf = glueContext.write_dynamic_frame_from_catalog( frame=my_dyf, database='myexistinggluedatabase', table_name='newtable', format="glueparquet" ) ``` I have tried variants including: * include the partition key array in the sink (`partitionKeys=["dt_pk"]`) * `format="parquet"` + `format_options={"useGlueParquetWriter": True}` * including `"updateBehavior": "UPDATE_IN_DATABASE"` in additional_options * `sink.writeFrame()` with options as documented * `my_dyf.write()` with connection_options and format + format_options This is generally functional code that has worked like this: * reads data from a source into a DynamicFrame * does basic transformations to a new schema * works in the context of a Glue Catalog * existing code uses `write_dynamic_frame.with_options`, specifying S3 location and format parquet, partitioned by date, etc., and to a table whose schema was discovered and written to the Glue Catalog by a Glue Crawler. This works in two phases: Glue Job runs and writes Parquet files to S3, then Glue Crawler updates the Glue Catalog * writes to a partitioned S3 data set in parquet format * Works either with standard parquet, or Glue Parquet Writer My sole change is that I have the same schema, partitions, data format as before, but that I now want to write to a new S3 location, and that I want to have Glue Job create the table and update the Data Catalog.
0
answers
0
votes
15
views
asked 24 days ago