Apache Iceberg on AWS Glue에서 한글 테이블명과 컬럼명을 사용하는 방법
온프레미스의 데이터웨어하우스(예: Greenplum)에서 고객들은 종종 한글테이블명과 한글컬럼명을 사용하는 사례들이 있습니다. 이런 고객들이 AWS Glue에서 지원하는 Apache Iceberg 테이블 포맷으로 트랜잭션 데이터레이크를 구성할 때, 한글 테이블명을 처리하는데 어려움을 겪을 수 있습니다. 이번 글은 이런 환경에서 SparkSQL로 한글 테이블명 Apache Iceberg 테이블을 생성할 때 발생할 수 있는 오류의 원인과 해결 방법에 대해 설명합니다.
조건
- Amazon S3에 저장된 데이터는 한글컬럼명을 가짐
- Amazon S3의 폴더명에 한글이 포함됨
- Apache Iceberg 테이블포맷으로 한글 테이블명인 테이블을 생성하기 원함
문제 현상
AWS Glue ETL 또는 Interactive Session에서 아래 SparkSQL을 수행하면 에러가 발생합니다.
t_table_name = "주문내역"
partition_column = "`기준일`"
spark.sql(f"""create table {catalog_name}.{database_name}.`{t_table_name}`
using iceberg
tblproperties ('format-version'='2')
partitioned by ({partition_column})
location 's3://{t_bucket_name}/{t_bucket_prefix}/{t_table_name}'
as (select * from input_data)""")
[에러 메세지]
IllegalArgumentException: Invalid table identifier: aws_korean_db.주문내역
원인
아래 설명에 있는 것처럼, Hive 호환성을 유지하기 위해 AWS Glue Catalog에 생성되는 데이터베이스명, 테이블명, 컬럼명은 소문자, 숫자, 밑줄(_)을 사용하도록 권고하고 있습니다.
참고 : https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html#schema-names
Database, table, and column names
When you create schema in AWS Glue to query in Athena, consider the following:
* A database name cannot be longer than 255 characters.
* A table name cannot be longer than 255 characters.
* A column name cannot be longer than 255 characters.
* The only acceptable characters for database names, table names, and column names are lowercase letters, numbers, and the underscore character.
해결 방법
Apache Iceberg 0.14.1 버전부터 Spark Configuration glue.skip-name-validation=true|false이 도입되었습니다.
현재 AWS Glue 버전별로 지원되는 Apache Iceberg 버전은 아래와 같습니다.
따라서, glue.skip-name-validation 옵션을 설정하기 위해서는 AWS Glue 4.0을 사용해야 합니다.
AWS Glue version | Supported Iceberg version |
---|---|
4.0 | 1.0.0 |
3.0 | 0.13.1 |
참고 : https://iceberg.apache.org/docs/1.0.0/aws/#skip-name-validation
Skip Name Validation
Allow user to skip name validation for table name and namespaces.
It is recommended to stick to Glue best practice in https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html to make sure operations are Hive compatible.
This is only added for users that have existing conventions using non-standard characters.
When database name and table name validation are skipped, there is no guarantee that downstream systems would all support the names.
주의 : 위에 언급되어 있는 것처럼, Hive 호환성을 유지하기 위해 Glue 명명규칙을 따르기를 권고하고 있습니다. 이 옵션을 사용하는 경우, 이 점을 유의해야 합니다.
예제 코드
아래 예제 코드는 AWS Glue Interactive Session(Notebook)에서 수행한 예제입니다. 현재 AWS Glue Interactive Session도 Glue 4.0을 지원하고 있습니다.
%session_id_prefix iceberg-korean-tablename-
%glue_version 4.0
%idle_timeout 60
%%configure
{
"--datalake-formats": "iceberg"
}
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, max
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
catalog_name = "glue_catalog"
database_name = "aws_korean_db"
s_bucket_name = "aws-kor-raw-data"
t_bucket_name = "aws-kor-stg-data"
s_bucket_prefix = "iron/demo"
t_bucket_prefix = "iron/demo"
s_table_name = "주문내역"
t_table_name = "주문내역"
partition_column = "`기준일`"
s_table_path = f"s3://{s_bucket_name}/{s_bucket_prefix}/{s_table_name}"
t_table_path = f"s3://{t_bucket_name}/{t_bucket_prefix}/{t_table_name}"
## Configure iceberg configuration
conf = SparkConf()
conf.set(f"spark.sql.catalog.{catalog_name}.warehouse", f"{t_table_path}")
conf.set(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog")
conf.set(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set(f"spark.sql.catalog.{catalog_name}.glue.skip-name-validation", "true")
conf.set(f"spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set(f"spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set(f"spark.sql.iceberg.handle-timestamp-without-timezone","true")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
table_df = spark.read.parquet(s_table_path)
table_df.createOrReplaceTempView("s_table")
sql_select ="SELECT * FROM s_table where `기준일` = '2023-06-05' "
table_sql = spark.sql(sql_select).drop("yyyy","mm","dd")
rawDF = table_sql.sortWithinPartitions(f"{partition_column}")
rawDF.createOrReplaceTempView("input_data")
rawDF.printSchema()
rawDF.show(1)
## Check if the iceberg table already exists
existing_tables = spark.sql(f"SHOW TABLES IN {catalog_name}.{database_name}")
#existing_tables.show()
df_existing_tables = existing_tables.select('tableName').rdd.flatMap(lambda x:x).collect()
# delete
if f"{t_table_name}" in df_existing_tables:
print(df_existing_tables)
print(f"Table {t_table_name} already exists")
del_sql = f"DELETE FROM {catalog_name}.{database_name}.`{t_table_name}` where `기준일` = '2023-06-05' "
print(f"Executing SparkSQL:\n`{del_sql}`")
spark.sql(del_sql).count()
else:
print(f"Table {t_table_name} doesn't exist")
# create & insert
if(rawDF.count() > 0):
if f"{t_table_name}" in df_existing_tables:
print(f"Table {t_table_name} already exists : Insert")
rawDF.writeTo(f"{catalog_name}.{database_name}.`{t_table_name}`").overwritePartitions()
else:
print(f"Table {t_table_name} doesn't exist")
spark.sql(f"""create table {catalog_name}.{database_name}.`{t_table_name}`
using iceberg
tblproperties ('format-version'='2')
partitioned by ({partition_column})
location 's3://{t_bucket_name}/{t_bucket_prefix}/{t_table_name}'
as (select * from input_data)""")
else:
print(f"No Data changed.")
결과
AWS Glue : Iceberg Table
Amazon Athena Query Result
Amazon S3 : Iceberg Table
관련 콘텐츠
- 질문됨 일 년 전lg...
- 질문됨 2년 전lg...
- 질문됨 10달 전lg...
- AWS 공식업데이트됨 2달 전
- AWS 공식업데이트됨 일 년 전
- AWS 공식업데이트됨 2달 전