使用awswrangler库在S3中执行UPSERT操作

0

【以下的问题经过翻译处理】 你好,

我正在尝试对Inceberg表执行Upsert操作。 以下脚本创建了一个原始数据存储在S3桶中的Parquet格式表。 然后创建一个空的Inceberg表以进行填充和更新。 在尝试插入数据时失败,请查看下面的错误。

脚本:

import pandas as pd
import awswrangler as wr
import boto3

database = "test"
iceberg_database = "iceberg_mid_sized_demo"
bucket_name = "test-bucket"
folder_name = "iceberg_mid_sized/raw_input"
path = f"s3://{bucket_name}/{folder_name}"

session = boto3.Session()

glue_client = session.client('glue')

try:
    glue_client.create_database(DatabaseInput={'Name': database})
    print('Database created')
    
except glue_client.exceptions.AlreadyExistsException as e:
    print("The database already exists")

# Create external table in input parquet files. 
create_raw_data_table_query = """
CREATE EXTERNAL TABLE test.raw_input(
  op string, 
  id bigint, 
  name string, 
  city string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS PARQUET
LOCATION 's3://test-bucket/iceberg_mid_sized/raw_input/'
tblproperties ("parquet.compress"="SNAPPY");
"""

create_raw_data_table_query_exec_id = wr.athena.start_query_execution(sql=create_raw_data_table_query, database=database)

# create iceberg tables database
try:
    glue_client.create_database(DatabaseInput={'Name': iceberg_database})
    print('Database created')
    
except glue_client.exceptions.AlreadyExistsException as e:
    print("The database already exists")

# Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
create_output_iceberg_query = """
CREATE TABLE iceberg_mid_sized_demo.iceberg_output (
  id bigint,
  name string,
  city string
  ) 
LOCATION 's3://test-bucket/iceberg-mid_sized/iceberg_output/' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet'
)
"""

create_iceberg_table_query_exec_id = wr.athena.start_query_execution(sql=create_output_iceberg_query, database=iceberg_database)

primary_key = ['id']
wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)

这个最后一行返回以下回溯和错误:

ArrowInvalid                              Traceback (most recent call last)
/var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2358353780.py in <module>
      1 primary_key = ['id']
----> 2 wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_merge_upsert_table.py in merge_upsert_table(delta_df, database, table, primary_key, boto3_session)
    111     if wr.catalog.does_table_exist(database=database, table=table, boto3_session=boto3_session):
    112         # Read the existing table into a pandas dataframe
--> 113         existing_df = wr.s3.read_parquet_table(database=database, table=table, boto3_session=boto3_session)
    114         # Check if data quality inside dataframes to be merged are sufficient
    115         if _is_data_quality_sufficient(existing_df=existing_df, delta_df=delta_df, primary_key=primary_key):

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs)
    448                 del args[name]
    449                 args = {**args, **keywords}
--> 450         return function(**args)
    451 
    452     wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs)

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet_table(table, database, filename_suffix, filename_ignore_suffix, catalog_id, partition_filter, columns, validate_schema, categories, safe, map_types, chunked, use_threads, boto3_session, s3_additional_kwargs)
    969         use_threads=use_threads,
    970         boto3_session=boto3_session,
--> 971         s3_additional_kwargs=s3_additional_kwargs,
    972     )
    973     partial_cast_function = functools.partial(

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet(path, path_root, path_suffix, path_ignore_suffix, version_id, ignore_empty, ignore_index, partition_filter, columns, validate_schema, chunked, dataset, categories, safe, map_types, use_threads, last_modified_begin, last_modified_end, boto3_session, s3_additional_kwargs, pyarrow_additional_kwargs)
    767     if len(paths) == 1:
    768         return _read_parquet(
--> 769             path=paths[0], version_id=versions[paths[0]] if isinstance(versions, dict) else None, **args
    770         )
    771     if validate_schema is True:

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet(path, version_id, columns, categories, safe, map_types, boto3_session, dataset, path_root, s3_additional_kwargs, use_threads, pyarrow_additional_kwargs)
    538             use_threads=use_threads,
    539             version_id=version_id,
--> 540             pyarrow_additional_kwargs=pyarrow_args,
    541         ),
    542         categories=categories,

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet_file(path, columns, categories, boto3_session, s3_additional_kwargs, use_threads, version_id, pyarrow_additional_kwargs)
    480             source=f,
    481             read_dictionary=categories,
--> 482             coerce_int96_timestamp_unit=pyarrow_args["coerce_int96_timestamp_unit"],
    483         )
    484         if pq_file is None:

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _pyarrow_parquet_file_wrapper(source, read_dictionary, coerce_int96_timestamp_unit)
     41         try:
     42             return pyarrow.parquet.ParquetFile(
---> 43                 source=source, read_dictionary=read_dictionary, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
     44             )
     45         except TypeError as ex:

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/parquet.py in __init__(self, source, metadata, common_metadata, read_dictionary, memory_map, buffer_size, pre_buffer, coerce_int96_timestamp_unit)
    232             buffer_size=buffer_size, pre_buffer=pre_buffer,
    233             read_dictionary=read_dictionary, metadata=metadata,
--> 234             coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
    235         )
    236         self.common_metadata = common_metadata

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetReader.open()

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

我尝试替换以下行:

wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)

并用这段代码替换

merge_into_query = """
MERGE INTO iceberg_mid_sized_demo.iceberg_output t
USING test.raw_input s
ON t.id = s.id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.city = s.city
WHEN NOT MATCHED THEN INSERT (id, name, city) VALUES (s.id, s.name, s.city)
;
"""

merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query,
                                                     database="iceberg_mid_sized_demo",
                                                    workgroup='wgname'
                                                     )

这个现在我得到:

---------------------------------------------------------------------------
InvalidRequestException                   Traceback (most recent call last)
/var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2112489404.py in <module>
      1 merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query,
      2                                                      database="iceberg_mid_sized_demo",
----> 3                                                     workgroup='athena3'
      4                                                      )

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs)
    448                 del args[name]
    449                 args = {**args, **keywords}
--> 450         return function(**args)
    451 
    452     wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs)

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in start_query_execution(sql, database, s3_output, workgroup, encryption, kms_key, params, boto3_session, max_cache_seconds, max_cache_query_inspections, max_remote_cache_entries, max_local_cache_entries, data_source, wait)
    494             encryption=encryption,
    495             kms_key=kms_key,
--> 496             boto3_session=session,
    497         )
    498     if wait:

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in _start_query_execution(sql, wg_config, database, data_source, s3_output, workgroup, encryption, kms_key, boto3_session)
    101         ex_code="ThrottlingException",
    102         max_num_tries=5,
--> 103         **args,
    104     )
    105     return cast(str, response["QueryExecutionId"])

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_utils.py in try_it(f, ex, ex_code, base, max_num_tries, **kwargs)
    341     for i in range(max_num_tries):
    342         try:
--> 343             return f(**kwargs)
    344         except ex as exception:
    345             if ex_code is not None and hasattr(exception, "response"):

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    389                     "%s() only accepts keyword arguments." % py_operation_name)
    390             # The "self" in this scope is referring to the BaseClient.
--> 391             return self._make_api_call(operation_name, kwargs)
    392 
    393         _api_call.__name__ = str(py_operation_name)

/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
    717             error_code = parsed_response.get("Error", {}).get("Code")
    718             error_class = self.exceptions.from_code(error_code)
--> 719             raise error_class(parsed_response, operation_name)
    720         else:
    721             return parsed_response

InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 5:31: mismatched input '.'. Expecting: '='

如何对Athena表执行Upsert?谢谢

profile picture
专家
已提问 5 个月前20 查看次数
1 回答
0

【以下的回答经过翻译处理】 以下查询提供了解决方案(确保使用Athena引擎版本3):

merge_into_query = """ MERGE INTO "iceberg_mid_sized_demo"."iceberg_output" t USING "test"."raw_input" s ON (t.id = s.id) WHEN MATCHED AND s.op = 'D' THEN DELETE WHEN MATCHED THEN UPDATE SET city = s.city WHEN NOT MATCHED THEN INSERT (id, name, city) VALUES (s.id, s.name, s.city) ; """

merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query, database="iceberg_mid_sized_demo", workgroup='athena3')

profile picture
专家
已回答 5 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则