Questions tagged with Analytics
Content language: English
Sort by most recent
I have an array which is stored inside s3 bucket that looks like
```
[
{
"bucket_name": "ababa",
"bucket_creation_date": "130999",
"additional_data": {
"bucket_acl": [
{
"Grantee": {
"DisplayName": "abaabbb",
"ID": "abaaaa",
"Type": "CanonicalUser"
},
"Permission": "FULL_CONTROL"
}
],
"bucket_policy": {
"Version": "2012-10-17",
"Id": "abaaa",
"Statement": [
{
"Sid": "iddd",
"Effect": "Allow",
"Principal": {
"Service": "logging.s3.amazonaws.com"
},
"Action": "s3:PutObject",
"Resource": "aarnnn"
},
{
"Effect": "Deny",
"Principal": "*",
"Action": [
"s3:GetBucket*",
"s3:List*",
"s3:DeleteObject*"
],
"Resource": [
"arn:aws:s3:::1111-aaa/*",
"arn:aws:s3:::1111-bbb"
],
"Condition": {
"Bool": {
"aws_SecureTransport": "false"
}
}
}
]
},
"public_access_block_configuration": {
"BlockPublicAcls": true,
"IgnorePublicAcls": true,
"BlockPublicPolicy": true,
"RestrictPublicBuckets": true
},
"website_hosting": {},
"bucket_tags": [
{
"Key": "keyyy",
"Value": "valueee"
}
]
},
"processed_data": {}
},
.......................
]
```
NOTE- some of the field may be string/array/struct based on the data we get(eg actions can be array or string)
END GOAL- I want to query inside this data and look for multiple conditions and then create a field inside processed_data and set it to true/false based on the query using AWS Glue
Example- For each object inside the array, i want to check :
```
1- if bucket_acl has grantee.type=CanonicalUser and Permission=FULL_CONTROL
AND
2- if bucket_policy has statement that contains Effect=Allow and Principal=* and Action = ...... and Resources = ...... and condition is empty
AND
3- website_hosting is empty
and then create a field inside processes_data and set it to true if the above query satisfies eg- processed_data:{ isPublic: True}
```
Approaches I Tried:
1- I tried saving the data in s3 bucket in parquet format using aws-wrangler/aws-pandas for faster querying and then getting the data in aws glue using glue dynamic frame:
```
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={"paths": ["s3://abaabbb/abaaaaa/"], "recurse": True},
transformation_ctx="S3bucket_node1",
)
S3bucket_node1.printSchema()
S3bucket_node1.show()
```
Output:
```
root
|-- bucket_name: string
|-- bucket_creation_date: string
|-- additional_data: string
|-- processed_data: string
{"bucket_name": "abaaaa", "bucket_creation_date": "139999", "additional_data": "{'bucket_acl': [{'Grantee': {'DisplayName': 'abaaaaaa', 'ID': 'abaaa', 'Type': 'CanonicalUser'}, 'Permission': 'FULL_CONTROL'}], 'bucket_policy': {}, 'public_access_block_configuration': {'BlockPublicAcls': True, 'IgnorePublicAcls': True, 'BlockPublicPolicy': True, 'RestrictPublicBuckets': True}, 'website_hosting': {}, 'bucket_tags': []}", "processed_data": "{}"}
```
Getting everything as string, seems like most of these libraries doesn't support nested data types
2- Tried saving the data as it is(in json) using put object API and then getting the data in aws glue using glue dynamic frame:
```
piece1 = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": True},
connection_type="s3",
format="json",
connection_options={"paths": ["s3://raghav-test-df/raghav3.json"], "recurse": True},
transformation_ctx="S3bucket_node1",
)
piece1.printSchema()
piece1.show()
piece1.count()
```
Output:
```
root
0
```
Getting no schema and count as 0
3- Tried getting the data using spark data frame:
```
sparkDF=spark.read.option("inferSchema", "true").option("multiline", "true").json("s3://ababa/abaa.json")
sparkDF.printSchema()
sparkDF.count()
sparkDF.show()
```
Output-
```
root
|-- additional_data: struct (nullable = true)
| |-- bucket_acl: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Grantee: struct (nullable = true)
| | | | |-- DisplayName: string (nullable = true)
| | | | |-- ID: string (nullable = true)
| | | | |-- Type: string (nullable = true)
| | | |-- Permission: string (nullable = true)
| |-- bucket_policy: struct (nullable = true)
| | |-- Id: string (nullable = true)
| | |-- Statement: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- Action: string (nullable = true)
| | | | |-- Condition: struct (nullable = true)
| | | | | |-- Bool: struct (nullable = true)
| | | | | | |-- aws:SecureTransport: string (nullable = true)
| | | | | |-- StringEquals: struct (nullable = true)
| | | | | | |-- AWS:SourceAccount: string (nullable = true)
| | | | | | |-- AWS:SourceArn: string (nullable = true)
| | | | | | |-- aws:PrincipalAccount: string (nullable = true)
| | | | | | |-- s3:x-amz-acl: string (nullable = true)
| | | | |-- Effect: string (nullable = true)
| | | | |-- Principal: string (nullable = true)
| | | | |-- Resource: string (nullable = true)
| | | | |-- Sid: string (nullable = true)
| | |-- Version: string (nullable = true)
| |-- bucket_tags: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Key: string (nullable = true)
| | | |-- Value: string (nullable = true)
| |-- public_access_block_configuration: struct (nullable = true)
| | |-- BlockPublicAcls: boolean (nullable = true)
| | |-- BlockPublicPolicy: boolean (nullable = true)
| | |-- IgnorePublicAcls: boolean (nullable = true)
| | |-- RestrictPublicBuckets: boolean (nullable = true)
|-- bucket_creation_date: string (nullable = true)
|-- bucket_name: string (nullable = true)
```
Getting the schema and correct count, but some of the field has different data types(eg actions can be string or array) and spark makes them default to string, i think querying the data based on multiple conditions using sql will be too complex
Do i need to change the approach or something else, i am stuck here
Can someone please help in achieving the end goal?
The error message I'm getting:
Error message not found: ATHENA_CLIENT_ERROR. Can't find bundle for base name com.simba.athena.athena.jdbc42.messages, locale en_US
We have a datalake architecture which we stood up on AWS s3. When I'm trying to run queries against the tables in the Curated db, in Athena, I'm getting results. When I copy the same query and paste it in the custom SQL funtion in Tableau it gives me an error. This issue is affecting our business and needs to resolve as soon as possible.
Please send me an answer if you have previously dealt with this kind of issue.
Important stuff:
I have the JDBCSIMBA4.2 driver.
I have an athena properties file directing to our S3 location.
I have JAVA8.0 Installed with JAVA HOME.
I have * access meaning I have all access in AWS.
I am able to connect to tables in the database.
I am able to view all the tables in the database.
I also made couple dashboards using this database.
The error message I'm getting:
Error message not found: ATHENA_CLIENT_ERROR. Can't find bundle for base name com.simba.athena.athena.jdbc42.messages, locale en_US
We have a datalake architecture which we stood up on AWS s3. When I'm trying to run queries against the tables in the Curated db, in Athena, I'm getting results. When I copy the same query and paste it in the custom SQL funtion in Tableau it gives me an error. This issue is affecting our business and needs to resolve as soon as possible.
Please send me an answer if you have previously dealt with this kind of issue.
Other stuff:
I have * access meaning I have all access in AWS.
Want to create a quicksight dashboard which will allow user to upload a simple csv file to an existing dataset.
Suppose, one dataset has one column customer_id. This has 10 entries.
I want to enable users to upload a new csv file having new set of 50 customer_ids and append in that existing dataset. Could you please help/suggest how to achive that in quicksight.Thanks.
How can we achieve real time integration with salesforce using kinesis or Glue or any other service .(Not third party tool) ?
Our goal is to get the CDC data from salesforce and move it to s3 bucket which can we used for other projects .
Hello I have a question about IoT Analytics. When I try a query to get some data from my datastore it works but it will delete the last test in this dataset.
I can't have more than one content of dataset.
Someone know why and if I can change this ?
Hi,
I am trying to perform an upsert of an inceberg table.
The script below creates a table with raw data stored in parquet format in an S3 bucket.
Then it creates an empty iceberg table to be populated and eventually updated.
When trying to insert data, it fails, please see error further down.
The script:
```
import pandas as pd
import awswrangler as wr
import boto3
database = "test"
iceberg_database = "iceberg_mid_sized_demo"
bucket_name = "test-bucket"
folder_name = "iceberg_mid_sized/raw_input"
path = f"s3://{bucket_name}/{folder_name}"
session = boto3.Session()
glue_client = session.client('glue')
try:
glue_client.create_database(DatabaseInput={'Name': database})
print('Database created')
except glue_client.exceptions.AlreadyExistsException as e:
print("The database already exists")
# Create external table in input parquet files.
create_raw_data_table_query = """
CREATE EXTERNAL TABLE test.raw_input(
op string,
id bigint,
name string,
city string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS PARQUET
LOCATION 's3://test-bucket/iceberg_mid_sized/raw_input/'
tblproperties ("parquet.compress"="SNAPPY");
"""
create_raw_data_table_query_exec_id = wr.athena.start_query_execution(sql=create_raw_data_table_query, database=database)
# create iceberg tables database
try:
glue_client.create_database(DatabaseInput={'Name': iceberg_database})
print('Database created')
except glue_client.exceptions.AlreadyExistsException as e:
print("The database already exists")
# Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
create_output_iceberg_query = """
CREATE TABLE iceberg_mid_sized_demo.iceberg_output (
id bigint,
name string,
city string
)
LOCATION 's3://test-bucket/iceberg-mid_sized/iceberg_output/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet'
)
"""
create_iceberg_table_query_exec_id = wr.athena.start_query_execution(sql=create_output_iceberg_query, database=iceberg_database)
primary_key = ['id']
wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)
```
This last line returns the following traceback and error:
``` python
ArrowInvalid Traceback (most recent call last)
/var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2358353780.py in <module>
1 primary_key = ['id']
----> 2 wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_merge_upsert_table.py in merge_upsert_table(delta_df, database, table, primary_key, boto3_session)
111 if wr.catalog.does_table_exist(database=database, table=table, boto3_session=boto3_session):
112 # Read the existing table into a pandas dataframe
--> 113 existing_df = wr.s3.read_parquet_table(database=database, table=table, boto3_session=boto3_session)
114 # Check if data quality inside dataframes to be merged are sufficient
115 if _is_data_quality_sufficient(existing_df=existing_df, delta_df=delta_df, primary_key=primary_key):
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs)
448 del args[name]
449 args = {**args, **keywords}
--> 450 return function(**args)
451
452 wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet_table(table, database, filename_suffix, filename_ignore_suffix, catalog_id, partition_filter, columns, validate_schema, categories, safe, map_types, chunked, use_threads, boto3_session, s3_additional_kwargs)
969 use_threads=use_threads,
970 boto3_session=boto3_session,
--> 971 s3_additional_kwargs=s3_additional_kwargs,
972 )
973 partial_cast_function = functools.partial(
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in read_parquet(path, path_root, path_suffix, path_ignore_suffix, version_id, ignore_empty, ignore_index, partition_filter, columns, validate_schema, chunked, dataset, categories, safe, map_types, use_threads, last_modified_begin, last_modified_end, boto3_session, s3_additional_kwargs, pyarrow_additional_kwargs)
767 if len(paths) == 1:
768 return _read_parquet(
--> 769 path=paths[0], version_id=versions[paths[0]] if isinstance(versions, dict) else None, **args
770 )
771 if validate_schema is True:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet(path, version_id, columns, categories, safe, map_types, boto3_session, dataset, path_root, s3_additional_kwargs, use_threads, pyarrow_additional_kwargs)
538 use_threads=use_threads,
539 version_id=version_id,
--> 540 pyarrow_additional_kwargs=pyarrow_args,
541 ),
542 categories=categories,
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _read_parquet_file(path, columns, categories, boto3_session, s3_additional_kwargs, use_threads, version_id, pyarrow_additional_kwargs)
480 source=f,
481 read_dictionary=categories,
--> 482 coerce_int96_timestamp_unit=pyarrow_args["coerce_int96_timestamp_unit"],
483 )
484 if pq_file is None:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/s3/_read_parquet.py in _pyarrow_parquet_file_wrapper(source, read_dictionary, coerce_int96_timestamp_unit)
41 try:
42 return pyarrow.parquet.ParquetFile(
---> 43 source=source, read_dictionary=read_dictionary, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
44 )
45 except TypeError as ex:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/parquet.py in __init__(self, source, metadata, common_metadata, read_dictionary, memory_map, buffer_size, pre_buffer, coerce_int96_timestamp_unit)
232 buffer_size=buffer_size, pre_buffer=pre_buffer,
233 read_dictionary=read_dictionary, metadata=metadata,
--> 234 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
235 )
236 self.common_metadata = common_metadata
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetReader.open()
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()
ArrowInvalid: Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
```
I have also tried to run the script replacing the following line:
``` python
wr.s3.merge_upsert_table(delta_df=val_df, database='iceberg_mid_sized_demo', table='iceberg_output', primary_key=primary_key)
```
with these
``` python
merge_into_query = """
MERGE INTO iceberg_mid_sized_demo.iceberg_output t
USING test.raw_input s
ON t.id = s.id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.city = s.city
WHEN NOT MATCHED THEN INSERT (id, name, city) VALUES (s.id, s.name, s.city)
;
"""
merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query,
database="iceberg_mid_sized_demo",
workgroup='wgname'
)
```
however, now I am getting:
``` python
---------------------------------------------------------------------------
InvalidRequestException Traceback (most recent call last)
/var/folders/y8/11mxbknn1sxbbq7vvhd14frr0000gn/T/ipykernel_17075/2112489404.py in <module>
1 merge_into_query_id = wr.athena.start_query_execution(sql=merge_into_query,
2 database="iceberg_mid_sized_demo",
----> 3 workgroup='athena3'
4 )
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_config.py in wrapper(*args_raw, **kwargs)
448 del args[name]
449 args = {**args, **keywords}
--> 450 return function(**args)
451
452 wrapper.__doc__ = _inject_config_doc(doc=function.__doc__, available_configs=available_configs)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in start_query_execution(sql, database, s3_output, workgroup, encryption, kms_key, params, boto3_session, max_cache_seconds, max_cache_query_inspections, max_remote_cache_entries, max_local_cache_entries, data_source, wait)
494 encryption=encryption,
495 kms_key=kms_key,
--> 496 boto3_session=session,
497 )
498 if wait:
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/athena/_utils.py in _start_query_execution(sql, wg_config, database, data_source, s3_output, workgroup, encryption, kms_key, boto3_session)
101 ex_code="ThrottlingException",
102 max_num_tries=5,
--> 103 **args,
104 )
105 return cast(str, response["QueryExecutionId"])
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/awswrangler/_utils.py in try_it(f, ex, ex_code, base, max_num_tries, **kwargs)
341 for i in range(max_num_tries):
342 try:
--> 343 return f(**kwargs)
344 except ex as exception:
345 if ex_code is not None and hasattr(exception, "response"):
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
389 "%s() only accepts keyword arguments." % py_operation_name)
390 # The "self" in this scope is referring to the BaseClient.
--> 391 return self._make_api_call(operation_name, kwargs)
392
393 _api_call.__name__ = str(py_operation_name)
/opt/anaconda3/envs/data_analysis/lib/python3.7/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
717 error_code = parsed_response.get("Error", {}).get("Code")
718 error_class = self.exceptions.from_code(error_code)
--> 719 raise error_class(parsed_response, operation_name)
720 else:
721 return parsed_response
InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 5:31: mismatched input '.'. Expecting: '='
```
How do you perform UPSERT of Athena tables?
Thanks
Hello All,
We need to small POC . In this we need to pick data from salesforce and push to Azure datalake using Glue . Can we connect to Azure datalake from Glue .
I configure the glue job according to the [official document](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html).
But it always throw error as shown below when running.
```
23/01/18 10:38:24 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
File "/tmp/test_job.py", line 16, in <module>
AWSGlueDataCatalog_node1674017752048 = glueContext.create_dynamic_frame.from_catalog(
File "/opt/amazon/lib/python3.7/site-packages/awsglue/dynamicframe.py", line 629, in from_catalog
return self._glue_context.create_dynamic_frame_from_catalog(db, table_name, redshift_tmp_dir, transformation_ctx, push_down_predicate, additional_options, catalog_id, **kwargs)
File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 188, in create_dynamic_frame_from_catalog
return source.getFrame(**kwargs)
File "/opt/amazon/lib/python3.7/site-packages/awsglue/data_source.py", line 36, in getFrame
jframe = self._jsource.getDynamicFrame()
File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/opt/amazon/lib/python3.7/site-packages/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o101.getDynamicFrame.
: java.lang.Exception: Unsupported dataframe format for job bookmarks
at org.apache.spark.sql.wrapper.SparkSqlDecoratorDataSource.resolveRelation(SparkSqlDecoratorDataSource.scala:103)
at com.amazonaws.services.glue.SparkSQLDataSource.$anonfun$getDynamicFrame$24(DataSource.scala:794)
at com.amazonaws.services.glue.util.FileSchemeWrapper.$anonfun$executeWithQualifiedScheme$1(FileSchemeWrapper.scala:90)
at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:83)
at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:90)
at com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:762)
at com.amazonaws.services.glue.DataSource.getDynamicFrame(DataSource.scala:102)
at com.amazonaws.services.glue.DataSource.getDynamicFrame$(DataSource.scala:102)
at com.amazonaws.services.glue.AbstractSparkSQLDataSource.getDynamicFrame(DataSource.scala:726)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
```
Script of glue job:
```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1674017752048 = glueContext.create_dynamic_frame.from_catalog(
database="source_db",
table_name="source_table",
transformation_ctx="AWSGlueDataCatalog_node1674017752048",
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=AWSGlueDataCatalog_node1674017752048,
mappings=[
("time", "timestamp", "time", "timestamp"),
("name", "string", "name", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node MySQL table
MySQLtable_node3 = glueContext.write_dynamic_frame.from_catalog(
frame=ApplyMapping_node2,
database="target_db",
table_name="target_table",
transformation_ctx="MySQLtable_node3",
)
job.commit()
```
Source table definition:
```
CREATE TABLE source_db.source_table (
time timestamp,
name string)
PARTITIONED BY (`name`)
LOCATION 's3://source_db/source_table'
TBLPROPERTIES (
'table_type'='iceberg'
);
```
Cron job Problem it is work monday to friday but this is stop staturday and sunday please help for this problem
Hello Team, is there a limit to the number of tables which can be scanned using the Glue Crawler? I have a crawler which scans S3 buckets from a single source for data from January 2021 until December 2022. I have partitions for year and month. The crawler is not updating the data for November and December 2022. I am using this data to query in Athena and eventually in QuickSight. Can anyone suggest what could be wrong?
Hi all,
I have followed the instructions https://docs.aws.amazon.com/athena/latest/ug/connect-data-source-serverless-app-repo.html to deploy Timestream as an additional data source to Athena and can succeassfully query timestream data via Athena console, by using catalog "TimestreamCatalog" I added.
Now I need to use the same catalog "TimestreamCatalog" when building a Glue job.
I run:
```
DataCatalogtable_node1 = glueContext.create_dynamic_frame.from_catalog(
catalog_id = "TimestreamCatalog",
database="mydb",
table_name="mytable",
transformation_ctx="DataCatalogtable_node1",
)
```
and run into this error, even when the role in question has Administrator policy i.e. action:* resource* attached (for the sake of experiment):
```
An error occurred while calling o86.getCatalogSource. User: arn:aws:sts::*******:assumed-role/AWSGlueServiceRole-andrei/GlueJobRunnerSession is not authorized to perform: glue:GetTable on resource: arn:aws:glue:eu-central-1:TimestreamCatalog:catalog (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException; Request ID: 36d7e411-8ca9-4993-9066-b6ca1d7ea4a3; Proxy: null)
```
When calling `aws athena list-data-catalogs `, I get:
```
{
"DataCatalogsSummary": [
{
"CatalogName": "AwsDataCatalog",
"Type": "GLUE"
},
{
"CatalogName": "TimestreamCatalog",
"Type": "LAMBDA"
}
]
}
```
I am not sure if using data source name as catalog_id is correct here, so any hint on what catalog_id is supposed to be for customer data source is appreciated, or any hint on how to resolve the issue above.
Thanks, Andrei