Skip to content

How to write data into S3 table buckets with Glue Job Spark

0

I've created a s3 table bucketed named test-s3-table-bucket, and namespace's name is backend_reconciliation, and in Athena, I find the Datasource is AwsDataCatalog, and Catalog is s3tablescatalog/test-s3-table-bucket, and Database is backend_reconciliation, and then I create a table named daily_sales:

CREATE TABLE backend_reconciliation.daily_sales ( sale_date date, product_category string, sales_amount double) PARTITIONED BY (month(sale_date)) TBLPROPERTIES ('table_type' = 'iceberg')

Then I create a Glue job, want to insert a row into this table, the code is like this: `import sys import logging from pyspark.sql import SparkSession from awsglue.transforms import * from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job from pyspark.context import SparkContext from pyspark.sql.functions import lit

logger = logging.getLogger() logger.setLevel(logging.INFO)

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder
.config("spark.sql.catalog.s3tablescatalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.s3tablescatalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.s3tablescatalog.glue.region", "us-east-1")
.config("spark.sql.catalog.s3tablescatalog.warehouse", "s3://test-s3-table-bucket/")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.defaultCatalog", "s3tablescatalog")
.getOrCreate()

glueContext = GlueContext(spark.sparkContext) job = Job(glueContext) job.init(args['JOB_NAME'], args)

df = spark.createDataFrame([("2024-06-01", "electronics", 12345.67)], ["sale_date", "product_category", "sales_amount"])

df.writeTo("s3tablescatalog.test-s3-table-bucket.backend_reconciliation.daily_sales")
.option("checkpointLocation", "s3://abc/checkpoints/daily_sales/")
.tableProperty("format-version", "2")
.append()

logger.info("write complete.") job.commit() `

while I get the error: Error Category: QUERY_ERROR; Failed Line Number: 34; Spark Error Class: TABLE_OR_VIEW_NOT_FOUND; AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view s3tablescatalog.test-s3-table-bucket.backend_reconciliation.daily_sales cannot be found. Verify the spelling and correctness of the schema and catalog.

I had tried many approaches, as the Spark does not support "/", while the s3 table buckets add the prefix "s3tablescatalog" automatically, so all of the Catalog for s3 table buckets contains "/", how can I solve this?

asked 9 months ago1.5K views
2 Answers
1
Accepted Answer

To write data into S3 table buckets with AWS Glue Spark jobs, you need to properly configure your SparkSession and use the correct table reference format. The error you're encountering is due to incorrect table referencing in your code.

Here's how to fix your Glue job to write to your S3 table bucket:

  1. First, update your SparkSession configuration:
spark = SparkSession.builder.appName("WriteToS3Tables") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3tables") \
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.s3tables.client.region", "us-east-1") \
.config("spark.sql.catalog.s3tables.glue.account-id", "YOUR_ACCOUNT_ID") \
.config("spark.sql.catalog.s3tables.glue.id", "YOUR_ACCOUNT_ID:s3tablescatalog/test-s3-table-bucket") \
.config("spark.sql.catalog.s3tables.warehouse", "s3://test-s3-table-bucket/") \
.getOrCreate()
  1. Then, use one of these approaches to write data to your table:

Option 1: Using SQL to insert data:

df = spark.createDataFrame([("2024-06-01", "electronics", 12345.67)], ["sale_date", "product_category", "sales_amount"])
df.createOrReplaceTempView("temp_data")

spark.sql("""
INSERT INTO backend_reconciliation.daily_sales
SELECT * FROM temp_data
""")

Option 2: Using DataFrame API:

df = spark.createDataFrame([("2024-06-01", "electronics", 12345.67)], ["sale_date", "product_category", "sales_amount"])

df.writeTo("backend_reconciliation.daily_sales") \
.append()

The key differences from your original code are:

  1. Setting up the catalog configuration correctly with the account ID and proper catalog naming
  2. Using the correct table reference format (namespace.table_name) without including the bucket name in the table path
  3. Ensuring your Glue job has the necessary IAM permissions to access the S3 table bucket

Make sure to replace "YOUR_ACCOUNT_ID" with your actual AWS account ID in the configuration.
Sources
Introducing AWS Glue 5.0 for Apache Spark | AWS Big Data Blog
New Amazon S3 Tables: Storage optimized for analytics workloads | AWS News Blog
Running ETL jobs on Amazon S3 tables with AWS Glue - Amazon Simple Storage Service

answered 9 months ago
AWS
SUPPORT ENGINEER
reviewed 8 months ago
  • You're right, finally I find it is the reason for the permissions, I should grant permissions in the Lake Formation console

0

Thanks for your response, while I change my code to this:

` import sys import logging from pyspark.sql import SparkSession from awsglue.transforms import * from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job from pyspark.context import SparkContext from pyspark.sql.functions import lit

logger = logging.getLogger() logger.setLevel(logging.INFO)

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.appName("WriteToS3Tables")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.defaultCatalog", "s3tables")
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.s3tables.client.region", "us-east-1")
.config("spark.sql.catalog.s3tables.glue.account-id", "abc")
.config("spark.sql.catalog.s3tables.glue.id", "abc:s3tablescatalog/test-s3-table-bucket")
.config("spark.sql.catalog.s3tables.warehouse", "s3://test-s3-table-bucket/")
.getOrCreate()

glueContext = GlueContext(spark.sparkContext) job = Job(glueContext) job.init(args['JOB_NAME'], args)

df = spark.createDataFrame([("2024-06-01", "electronics", 12345.67)], ["sale_date", "product_category", "sales_amount"]) df.createOrReplaceTempView("temp_data")

spark.sql(""" INSERT INTO backend_reconciliation.daily_sales SELECT * FROM temp_data """)

logger.info("write complete.") job.commit() `

or this:

` import sys import logging from pyspark.sql import SparkSession from awsglue.transforms import * from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job from pyspark.context import SparkContext from pyspark.sql.functions import lit

logger = logging.getLogger() logger.setLevel(logging.INFO)

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.appName("WriteToS3Tables")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.defaultCatalog", "s3tables")
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.s3tables.client.region", "us-east-1")
.config("spark.sql.catalog.s3tables.glue.account-id", "abc")
.config("spark.sql.catalog.s3tables.glue.id", "abc:s3tablescatalog/test-s3-table-bucket")
.config("spark.sql.catalog.s3tables.warehouse", "s3://test-s3-table-bucket/")
.getOrCreate()

glueContext = GlueContext(spark.sparkContext) job = Job(glueContext) job.init(args['JOB_NAME'], args)

df = spark.createDataFrame([("2024-06-01", "electronics", 12345.67)], ["sale_date", "product_category", "sales_amount"])

df.writeTo("backend_reconciliation.daily_sales")
.append()

logger.info("write complete.") job.commit() `

I still get this error:

Error Category: QUERY_ERROR; Failed Line Number: 34; Spark Error Class: TABLE_OR_VIEW_NOT_FOUND; AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view backend_reconciliation.daily_sales cannot be found. Verify the spelling and correctness of the schema and catalog.

"abc" is my fake account id, and the Glue Job IAM role has: AmazonS3FullAccess, AmazonS3TablesFullAccess, AWSGlueServiceRole and CloudWatchLogsFullAccess permissions

btw, I want to point out that I'm using S3 table buckets, not general purpose bucket

answered 9 months ago
  • I added iceberg to datalake-format configuration, and it worked. Job details -> Advanced Properties -> Job parameters -> --datalake-formats: iceberg

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.