AWS announces preview of AWS Interconnect - multicloud
AWS announces AWS Interconnect – multicloud (preview), providing simple, resilient, high-speed private connections to other cloud service providers. AWS Interconnect - multicloud is easy to configure and provides high-speed, resilient connectivity with dedicated bandwidth, enabling customers to interconnect AWS networking services such as AWS Transit Gateway, AWS Cloud WAN, and Amazon VPC to other cloud service providers with ease.
使用awswrangler库在S3中执行UPSERT操作
【以下的问题经过翻译处理】 你好,
我正在尝试对Inceberg表执行Upsert操作。 以下脚本创建了一个原始数据存储在S3桶中的Parquet格式表。 然后创建一个空的Inceberg表以进行填充和更新。 在尝试插入数据时失败,请查看下面的错误。
脚本:
import pandas as pd
import awswrangler as wr
import boto3
database = "test"
iceberg_database = "iceberg_mid_sized_demo"
bucket_name = "test-bucket"
folder_name = "iceberg_mid_sized/raw_input"
path = f"s3://{bucket_name}/{folder_name}"
session = boto3.Session()
glue_client = session.client('glue')
try:
glue_client.create_database(DatabaseInput={'Name': database})
print('Database created')
except glue_client.exceptions.AlreadyExistsException as e:
print("The database already exists")
# Create external table in input parquet files.
create_raw_data_table_query = """
CREATE EXTERNAL TABLE test.raw_input(
op string,
id bigint,
name string,
city string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET
LOCATION 's3://test-bucket/iceberg_mid_sized/raw_input/'
tblproperties ("parquet.compress"="SNAPPY");
"""
create_raw_data_table_query_exec_id = wr.athena.start_query_execution(sql=create_raw_data_table_query, database=database)
# create iceberg tables database
try:
glue_client.create_database(DatabaseInput={'Name': iceberg_database})
print('Database created')
except glue_client.exceptions.AlreadyExistsException as e:
print("The database already exists")
# Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
create_output_iceberg_query = """
CREATE TABLE iceberg_mid_sized_demo.iceberg_output (
id bigint,
name string,
city string
)
LOCATION 's3://test-bucket/iceberg-mid_sized/iceberg_output/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet'
)
"""
create_iceberg_table_query_exec_id = wr.athena.start_query_execution(sql=create_output_iceberg_query, database=iceberg_database)
primary_key = ['id']
wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)
这个最后一行返回以下回溯和错误:
ArrowInvalid Traceback (most recent call last)
/var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2358353780.py in <module>
1 primary_key = ['id']
----> 2 wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_merge_upsert_table.py in merge_upsert_table(delta_df, database, table, primary_key, boto3_session)
111 if wr.catalog.does_table_exist(database=database, table=table, boto3_session=boto3_session):
112 # Read the existing table into a pandas dataframe
--> 113 existing_df = wr.s3.read_parquet_table(database=database, table=table, boto3_session=boto3_session)
114 # Check if data quality inside dataframes to be merged are sufficient
115 if _is_data_quality_sufficient(existing_df=existing_df, delta_df=delta_df, primary_key=primary_key):
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs)
448 del args[name]
449 args = {**args, **keywords}
--> 450 return function(**args)
451
452 wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet_table(table, database, filename_suffix, filename_ignore_suffix, catalog_id, partition_filter, columns, validate_schema, categories, safe, map_types, chunked, use_threads, boto3_session, s3_additional_kwargs)
969 use_threads=use_threads,
970 boto3_session=boto3_session,
--> 971 s3_additional_kwargs=s3_additional_kwargs,
972 )
973 partial_cast_function = functools.partial(
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet(path, path_root, path_suffix, path_ignore_suffix, version_id, ignore_empty, ignore_index, partition_filter, columns, validate_schema, chunked, dataset, categories, safe, map_types, use_threads, last_modified_begin, last_modified_end, boto3_session, s3_additional_kwargs, pyarrow_additional_kwargs)
767 if len(paths) == 1:
768 return _read_parquet(
--> 769 path=paths[0], version_id=versions[paths[0]] if isinstance(versions, dict) else None, **args
770 )
771 if validate_schema is True:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet(path, version_id, columns, categories, safe, map_types, boto3_session, dataset, path_root, s3_additional_kwargs, use_threads, pyarrow_additional_kwargs)
538 use_threads=use_threads,
539 version_id=version_id,
--> 540 pyarrow_additional_kwargs=pyarrow_args,
541 ),
542 categories=categories,
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet_file(path, columns, categories, boto3_session, s3_additional_kwargs, use_threads, version_id, pyarrow_additional_kwargs)
480 source=f,
481 read_dictionary=categories,
--> 482 coerce_int96_timestamp_unit=pyarrow_args["coerce_int96_timestamp_unit"],
483 )
484 if pq_file is None:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _pyarrow_parquet_file_wrapper(source, read_dictionary, coerce_int96_timestamp_unit)
41 try:
42 return pyarrow.parquet.ParquetFile(
---> 43 source=source, read_dictionary=read_dictionary, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
44 )
45 except TypeError as ex:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/parquet.py in __init__(self, source, metadata, common_metadata, read_dictionary, memory_map, buffer_size, pre_buffer, coerce_int96_timestamp_unit)
232 buffer_size=buffer_size, pre_buffer=pre_buffer,
233 read_dictionary=read_dictionary, metadata=metadata,
--> 234 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
235 )
236 self.common_metadata = common_metadata
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetReader.open()
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()
我尝试替换以下行:
wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)
并用这段代码替换
merge_into_query = """
MERGE INTO iceberg_mid_sized_demo.iceberg_output t
USING test.raw_input s
ON t.id = s.id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.city = s.city
WHEN NOT MATCHED THEN INSERT (id, name, city) VALUES (s.id, s.name, s.city)
;
"""
merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query,
database="iceberg_mid_sized_demo",
workgroup='wgname'
)
这个现在我得到:
---------------------------------------------------------------------------
InvalidRequestException Traceback (most recent call last)
/var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2112489404.py in <module>
1 merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query,
2 database="iceberg_mid_sized_demo",
----> 3 workgroup='athena3'
4 )
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs)
448 del args[name]
449 args = {**args, **keywords}
--> 450 return function(**args)
451
452 wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in start_query_execution(sql, database, s3_output, workgroup, encryption, kms_key, params, boto3_session, max_cache_seconds, max_cache_query_inspections, max_remote_cache_entries, max_local_cache_entries, data_source, wait)
494 encryption=encryption,
495 kms_key=kms_key,
--> 496 boto3_session=session,
497 )
498 if wait:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in _start_query_execution(sql, wg_config, database, data_source, s3_output, workgroup, encryption, kms_key, boto3_session)
101 ex_code="ThrottlingException",
102 max_num_tries=5,
--> 103 **args,
104 )
105 return cast(str, response["QueryExecutionId"])
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_utils.py in try_it(f, ex, ex_code, base, max_num_tries, **kwargs)
341 for i in range(max_num_tries):
342 try:
--> 343 return f(**kwargs)
344 except ex as exception:
345 if ex_code is not None and hasattr(exception, "response"):
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
389 "%s() only accepts keyword arguments." % py_operation_name)
390 # The "self" in this scope is referring to the BaseClient.
--> 391 return self._make_api_call(operation_name, kwargs)
392
393 _api_call.__name__ = str(py_operation_name)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
717 error_code = parsed_response.get("Error", {}).get("Code")
718 error_class = self.exceptions.from_code(error_code)
--> 719 raise error_class(parsed_response, operation_name)
720 else:
721 return parsed_response
InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 5:31: mismatched input '.'. Expecting: '='
如何对Athena表执行Upsert?谢谢
- 语言
- 中文 (简体)
- 最新
- 投票最多
- 评论最多
【以下的回答经过翻译处理】 以下查询提供了解决方案(确保使用Athena引擎版本3):
merge_into_query = """ MERGE INTO "iceberg_mid_sized_demo"."iceberg_output" t USING "test"."raw_input" s ON (t.id = s.id) WHEN MATCHED AND s.op = 'D' THEN DELETE WHEN MATCHED THEN UPDATE SET city = s.city WHEN NOT MATCHED THEN INSERT (id, name, city) VALUES (s.id, s.name, s.city) ; """
merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query, database="iceberg_mid_sized_demo", workgroup='athena3')
相关内容
AWS 官方已更新 3 个月前