Apache Iceberg on AWS Glue에서 한글 테이블명과 컬럼명을 사용하는 방법

4분 분량
콘텐츠 수준: 고급
0

온프레미스의 데이터웨어하우스(예: Greenplum)에서 고객들은 종종 한글테이블명과 한글컬럼명을 사용하는 사례들이 있습니다. 이런 고객들이 AWS Glue에서 지원하는 Apache Iceberg 테이블 포맷으로 트랜잭션 데이터레이크를 구성할 때, 한글 테이블명을 처리하는데 어려움을 겪을 수 있습니다. 이번 글은 이런 환경에서 SparkSQL로 한글 테이블명 Apache Iceberg 테이블을 생성할 때 발생할 수 있는 오류의 원인과 해결 방법에 대해 설명합니다.

조건

  • Amazon S3에 저장된 데이터는 한글컬럼명을 가짐
  • Amazon S3의 폴더명에 한글이 포함됨
  • Apache Iceberg 테이블포맷으로 한글 테이블명인 테이블을 생성하기 원함

문제 현상

AWS Glue ETL 또는 Interactive Session에서 아래 SparkSQL을 수행하면 에러가 발생합니다.

t_table_name = "주문내역"
partition_column = "`기준일`"

spark.sql(f"""create table {catalog_name}.{database_name}.`{t_table_name}`
               using iceberg
               tblproperties ('format-version'='2')
               partitioned by ({partition_column})
               location 's3://{t_bucket_name}/{t_bucket_prefix}/{t_table_name}'
               as (select * from input_data)""")

[에러 메세지]
IllegalArgumentException: Invalid table identifier: aws_korean_db.주문내역

원인

아래 설명에 있는 것처럼, Hive 호환성을 유지하기 위해 AWS Glue Catalog에 생성되는 데이터베이스명, 테이블명, 컬럼명은 소문자, 숫자, 밑줄(_)을 사용하도록 권고하고 있습니다.

참고 : https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html#schema-names

Database, table, and column names
  
When you create schema in AWS Glue to query in Athena, consider the following:   
* A database name cannot be longer than 255 characters.
* A table name cannot be longer than 255 characters.
* A column name cannot be longer than 255 characters.
* The only acceptable characters for database names, table names, and column names are lowercase letters, numbers, and the underscore character.

해결 방법

Apache Iceberg 0.14.1 버전부터 Spark Configuration glue.skip-name-validation=true|false이 도입되었습니다.

현재 AWS Glue 버전별로 지원되는 Apache Iceberg 버전은 아래와 같습니다.

따라서, glue.skip-name-validation 옵션을 설정하기 위해서는 AWS Glue 4.0을 사용해야 합니다.

AWS Glue versionSupported Iceberg version
4.01.0.0
3.00.13.1

참고 : https://iceberg.apache.org/docs/1.0.0/aws/#skip-name-validation

Skip Name Validation

Allow user to skip name validation for table name and namespaces. 
It is recommended to stick to Glue best practice in https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html to make sure operations are Hive compatible. 
This is only added for users that have existing conventions using non-standard characters. 
When database name and table name validation are skipped, there is no guarantee that downstream systems would all support the names.

주의 : 위에 언급되어 있는 것처럼, Hive 호환성을 유지하기 위해 Glue 명명규칙을 따르기를 권고하고 있습니다. 이 옵션을 사용하는 경우, 이 점을 유의해야 합니다.

예제 코드

아래 예제 코드는 AWS Glue Interactive Session(Notebook)에서 수행한 예제입니다. 현재 AWS Glue Interactive Session도 Glue 4.0을 지원하고 있습니다.

%session_id_prefix iceberg-korean-tablename-
%glue_version 4.0
%idle_timeout 60
%%configure 
{
  "--datalake-formats": "iceberg"
}
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, max
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
catalog_name = "glue_catalog"
database_name = "aws_korean_db"
s_bucket_name = "aws-kor-raw-data"
t_bucket_name = "aws-kor-stg-data"
s_bucket_prefix = "iron/demo"
t_bucket_prefix = "iron/demo"
s_table_name = "주문내역"
t_table_name = "주문내역"

partition_column = "`기준일`"
s_table_path = f"s3://{s_bucket_name}/{s_bucket_prefix}/{s_table_name}"
t_table_path = f"s3://{t_bucket_name}/{t_bucket_prefix}/{t_table_name}"
## Configure iceberg configuration
conf = SparkConf()
conf.set(f"spark.sql.catalog.{catalog_name}.warehouse", f"{t_table_path}")
conf.set(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog")
conf.set(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set(f"spark.sql.catalog.{catalog_name}.glue.skip-name-validation", "true")
conf.set(f"spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set(f"spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set(f"spark.sql.iceberg.handle-timestamp-without-timezone","true")

spark = SparkSession.builder.config(conf=conf).getOrCreate()
table_df = spark.read.parquet(s_table_path)
table_df.createOrReplaceTempView("s_table")

sql_select ="SELECT * FROM s_table  where `기준일` = '2023-06-05' "
table_sql = spark.sql(sql_select).drop("yyyy","mm","dd")
rawDF = table_sql.sortWithinPartitions(f"{partition_column}")
rawDF.createOrReplaceTempView("input_data")
rawDF.printSchema()
rawDF.show(1)
## Check if the iceberg table already exists
existing_tables = spark.sql(f"SHOW TABLES IN {catalog_name}.{database_name}")
#existing_tables.show()
df_existing_tables = existing_tables.select('tableName').rdd.flatMap(lambda x:x).collect()
# delete
if f"{t_table_name}" in df_existing_tables:
    print(df_existing_tables)
    print(f"Table {t_table_name} already exists")
    del_sql = f"DELETE FROM {catalog_name}.{database_name}.`{t_table_name}` where `기준일` = '2023-06-05' "
    print(f"Executing SparkSQL:\n`{del_sql}`")
    spark.sql(del_sql).count()
else:
    print(f"Table {t_table_name} doesn't exist")

# create & insert
if(rawDF.count() > 0):
   
    if f"{t_table_name}" in df_existing_tables:
        print(f"Table {t_table_name} already exists : Insert")
        rawDF.writeTo(f"{catalog_name}.{database_name}.`{t_table_name}`").overwritePartitions()
    else:
        print(f"Table {t_table_name} doesn't exist")
        spark.sql(f"""create table {catalog_name}.{database_name}.`{t_table_name}`
                  using iceberg
                  tblproperties ('format-version'='2')
                  partitioned by ({partition_column})
                  location 's3://{t_bucket_name}/{t_bucket_prefix}/{t_table_name}'
                  as (select * from input_data)""")
else:
    print(f"No Data changed.")

결과

AWS Glue : Iceberg Table

AWS Glue : Iceberg Table

Amazon Athena Query Result

Amazon Athena Query Result

Amazon S3 : Iceberg Table

Amazon S3 : Iceberg Table

profile pictureAWS
전문가
IRON
게시됨 9달 전445회 조회