使用awswrangler库在S3中执行UPSERT操作
0
【以下的问题经过翻译处理】 你好,
我正在尝试对Inceberg表执行Upsert操作。 以下脚本创建了一个原始数据存储在S3桶中的Parquet格式表。 然后创建一个空的Inceberg表以进行填充和更新。 在尝试插入数据时失败,请查看下面的错误。
脚本:
import pandas as pd
import awswrangler as wr
import boto3
database = "test"
iceberg_database = "iceberg_mid_sized_demo"
bucket_name = "test-bucket"
folder_name = "iceberg_mid_sized/raw_input"
path = f"s3://{bucket_name}/{folder_name}"
session = boto3.Session()
glue_client = session.client('glue')
try:
glue_client.create_database(DatabaseInput={'Name': database})
print('Database created')
except glue_client.exceptions.AlreadyExistsException as e:
print("The database already exists")
# Create external table in input parquet files.
create_raw_data_table_query = """
CREATE EXTERNAL TABLE test.raw_input(
op string,
id bigint,
name string,
city string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET
LOCATION 's3://test-bucket/iceberg_mid_sized/raw_input/'
tblproperties ("parquet.compress"="SNAPPY");
"""
create_raw_data_table_query_exec_id = wr.athena.start_query_execution(sql=create_raw_data_table_query, database=database)
# create iceberg tables database
try:
glue_client.create_database(DatabaseInput={'Name': iceberg_database})
print('Database created')
except glue_client.exceptions.AlreadyExistsException as e:
print("The database already exists")
# Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
create_output_iceberg_query = """
CREATE TABLE iceberg_mid_sized_demo.iceberg_output (
id bigint,
name string,
city string
)
LOCATION 's3://test-bucket/iceberg-mid_sized/iceberg_output/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet'
)
"""
create_iceberg_table_query_exec_id = wr.athena.start_query_execution(sql=create_output_iceberg_query, database=iceberg_database)
primary_key = ['id']
wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)
这个最后一行返回以下回溯和错误:
ArrowInvalid Traceback (most recent call last)
/var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2358353780.py in <module>
1 primary_key = ['id']
----> 2 wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_merge_upsert_table.py in merge_upsert_table(delta_df, database, table, primary_key, boto3_session)
111 if wr.catalog.does_table_exist(database=database, table=table, boto3_session=boto3_session):
112 # Read the existing table into a pandas dataframe
--> 113 existing_df = wr.s3.read_parquet_table(database=database, table=table, boto3_session=boto3_session)
114 # Check if data quality inside dataframes to be merged are sufficient
115 if _is_data_quality_sufficient(existing_df=existing_df, delta_df=delta_df, primary_key=primary_key):
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs)
448 del args[name]
449 args = {**args, **keywords}
--> 450 return function(**args)
451
452 wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet_table(table, database, filename_suffix, filename_ignore_suffix, catalog_id, partition_filter, columns, validate_schema, categories, safe, map_types, chunked, use_threads, boto3_session, s3_additional_kwargs)
969 use_threads=use_threads,
970 boto3_session=boto3_session,
--> 971 s3_additional_kwargs=s3_additional_kwargs,
972 )
973 partial_cast_function = functools.partial(
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet(path, path_root, path_suffix, path_ignore_suffix, version_id, ignore_empty, ignore_index, partition_filter, columns, validate_schema, chunked, dataset, categories, safe, map_types, use_threads, last_modified_begin, last_modified_end, boto3_session, s3_additional_kwargs, pyarrow_additional_kwargs)
767 if len(paths) == 1:
768 return _read_parquet(
--> 769 path=paths[0], version_id=versions[paths[0]] if isinstance(versions, dict) else None, **args
770 )
771 if validate_schema is True:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet(path, version_id, columns, categories, safe, map_types, boto3_session, dataset, path_root, s3_additional_kwargs, use_threads, pyarrow_additional_kwargs)
538 use_threads=use_threads,
539 version_id=version_id,
--> 540 pyarrow_additional_kwargs=pyarrow_args,
541 ),
542 categories=categories,
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet_file(path, columns, categories, boto3_session, s3_additional_kwargs, use_threads, version_id, pyarrow_additional_kwargs)
480 source=f,
481 read_dictionary=categories,
--> 482 coerce_int96_timestamp_unit=pyarrow_args["coerce_int96_timestamp_unit"],
483 )
484 if pq_file is None:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _pyarrow_parquet_file_wrapper(source, read_dictionary, coerce_int96_timestamp_unit)
41 try:
42 return pyarrow.parquet.ParquetFile(
---> 43 source=source, read_dictionary=read_dictionary, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
44 )
45 except TypeError as ex:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/parquet.py in __init__(self, source, metadata, common_metadata, read_dictionary, memory_map, buffer_size, pre_buffer, coerce_int96_timestamp_unit)
232 buffer_size=buffer_size, pre_buffer=pre_buffer,
233 read_dictionary=read_dictionary, metadata=metadata,
--> 234 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
235 )
236 self.common_metadata = common_metadata
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetReader.open()
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()
我尝试替换以下行:
wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)
并用这段代码替换
merge_into_query = """
MERGE INTO iceberg_mid_sized_demo.iceberg_output t
USING test.raw_input s
ON t.id = s.id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.city = s.city
WHEN NOT MATCHED THEN INSERT (id, name, city) VALUES (s.id, s.name, s.city)
;
"""
merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query,
database="iceberg_mid_sized_demo",
workgroup='wgname'
)
这个现在我得到:
---------------------------------------------------------------------------
InvalidRequestException Traceback (most recent call last)
/var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2112489404.py in <module>
1 merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query,
2 database="iceberg_mid_sized_demo",
----> 3 workgroup='athena3'
4 )
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs)
448 del args[name]
449 args = {**args, **keywords}
--> 450 return function(**args)
451
452 wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in start_query_execution(sql, database, s3_output, workgroup, encryption, kms_key, params, boto3_session, max_cache_seconds, max_cache_query_inspections, max_remote_cache_entries, max_local_cache_entries, data_source, wait)
494 encryption=encryption,
495 kms_key=kms_key,
--> 496 boto3_session=session,
497 )
498 if wait:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in _start_query_execution(sql, wg_config, database, data_source, s3_output, workgroup, encryption, kms_key, boto3_session)
101 ex_code="ThrottlingException",
102 max_num_tries=5,
--> 103 **args,
104 )
105 return cast(str, response["QueryExecutionId"])
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_utils.py in try_it(f, ex, ex_code, base, max_num_tries, **kwargs)
341 for i in range(max_num_tries):
342 try:
--> 343 return f(**kwargs)
344 except ex as exception:
345 if ex_code is not None and hasattr(exception, "response"):
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
389 "%s() only accepts keyword arguments." % py_operation_name)
390 # The "self" in this scope is referring to the BaseClient.
--> 391 return self._make_api_call(operation_name, kwargs)
392
393 _api_call.__name__ = str(py_operation_name)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
717 error_code = parsed_response.get("Error", {}).get("Code")
718 error_class = self.exceptions.from_code(error_code)
--> 719 raise error_class(parsed_response, operation_name)
720 else:
721 return parsed_response
InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 5:31: mismatched input '.'. Expecting: '='
如何对Athena表执行Upsert?谢谢
1 回答
- 最新
- 投票最多
- 评论最多
这些答案有用吗?为正确答案投票,以帮助社区从您的知识中受益。
0
【以下的回答经过翻译处理】 以下查询提供了解决方案(确保使用Athena引擎版本3):
merge_into_query = """ MERGE INTO "iceberg_mid_sized_demo"."iceberg_output" t USING "test"."raw_input" s ON (t.id = s.id) WHEN MATCHED AND s.op = 'D' THEN DELETE WHEN MATCHED THEN UPDATE SET city = s.city WHEN NOT MATCHED THEN INSERT (id, name, city) VALUES (s.id, s.name, s.city) ; """
merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query, database="iceberg_mid_sized_demo", workgroup='athena3')
相关内容
- AWS 官方已更新 1 年前
- AWS 官方已更新 10 个月前