By using AWS re:Post, you agree to the Terms of Use
/Analytics/Questions/
Questions in Analytics
Sort by most recent
  • 1
  • 90 / page

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Cross-Account Connect Athena (account X) to Glue + S3 (account Y)

Hello, This question https://repost.aws/questions/QUSdk1j9-FT02t91W3AU0Qng/cross-account-access-from-athena-to-s-3 from 3 years ago sims to be similar. I did all that is suggested appart from using Lake Formation. I wanted to try and create the permissions manually first. **Account Y**: I have JSON data in an S3 and used Glue to create the catalog in account Y. I configured this owner account such as Step 1.a https://docs.aws.amazon.com/athena/latest/ug/security-iam-cross-account-glue-catalog-access.html I also configured the S3 bucket according to "Apply a cross-account bucket policy" from https://tomgregory.com/s3-bucket-access-from-the-same-and-another-aws-account/ **Account X**: I want to configure Athena to query S3 using the catalog created by Glue I configured this borrower account such as Step 1.b https://docs.aws.amazon.com/athena/latest/ug/security-iam-cross-account-glue-catalog-access.html I also configured the IAM Policies according to "Apply a cross-account bucket policy" from https://tomgregory.com/s3-bucket-access-from-the-same-and-another-aws-account/ Both S3 and Glue Policies are attached to the concerned users in this account. **Problem**: In account X, Athena is capable of accessing Glue and it displays Database, Tables and the catalog. However when I run a query (a same successful query made in account Y) I get the error ``` Permission denied on S3 path: s3://asdf This query ran against the "dbname" database, unless qualified by the query. Please post the error message on our forum or contact customer support with Query Id: a3a3a3a... ``` Apparently, I'm missing a S3 permission but I can't find information about it Any help is much appreciated. Thanks,
0
answers
0
votes
3
views
asked 3 hours ago

Not able to get the data in query result in the Athena for the AWS config from S3 bucket

Hi, I have been trying to implement a monitoring solution to monitor the resources for AWS accounts in the organizations with AWS config, AWS Athena and Quicksight. I have set up all the services however Athena is not able to query all the data from the S3 bucket where the config data for all the accounts are stored. It is able to only query the data for the current account from where I am running the query. I can see the config data for all accounts in the S3 bucket as well. **Athena table creation query** ``` CREATE EXTERNAL TABLE aws_config_configuration_snapshot ( fileversion STRING, configSnapshotId STRING, configurationitems ARRAY < STRUCT < configurationItemVersion : STRING, configurationItemCaptureTime : STRING, configurationStateId : BIGINT, awsAccountId : STRING, configurationItemStatus : STRING, resourceType : STRING, resourceId : STRING, resourceName : STRING, ARN : STRING, awsRegion : STRING, availabilityZone : STRING, configurationStateMd5Hash : STRING, configuration : STRING, supplementaryConfiguration : MAP < STRING, STRING >, tags: MAP < STRING, STRING >, resourceCreationTime : STRING > > ) PARTITIONED BY (accountid STRING, dt STRING, region STRING) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://<S3_BUCKET_NAME>/AWSLogs/'; ``` **The lambda function used for data partitioning as per accounts** ``` import datetime import re import boto3 import os TABLE_NAME = 'aws_config_configuration_snapshot' DATABASE_NAME = 'sampledb' ACCOUNT_ID = None # Determined at runtime LATEST_PARTITION_VALUE = 'latest' athena = boto3.client('athena') def lambda_handler(event, context): global ACCOUNT_ID object_key = event['Records'][0]['s3']['object']['key'] match = get_configuration_snapshot_object_key_match(object_key) if match is None: print('Ignoring event for non-configuration snapshot object key', object_key) return print('Adding partitions for configuration snapshot object key', object_key) ACCOUNT_ID = context.invoked_function_arn.split(':')[4] object_key_parent = 's3://{bucket_name}/{object_key_parent}/'.format( bucket_name=event['Records'][0]['s3']['bucket']['name'], object_key_parent=os.path.dirname(object_key)) configuration_snapshot_accountid = get_configuration_snapshot_accountid(match) configuration_snapshot_region = get_configuration_snapshot_region(match) configuration_snapshot_date = get_configuration_snapshot_date(match) drop_partition(configuration_snapshot_accountid, configuration_snapshot_region, LATEST_PARTITION_VALUE) add_partition(configuration_snapshot_accountid, configuration_snapshot_region, LATEST_PARTITION_VALUE, object_key_parent) add_partition(configuration_snapshot_accountid, configuration_snapshot_region, get_configuration_snapshot_date(match).strftime('%Y-%m-%d'), object_key_parent) def get_configuration_snapshot_object_key_match(object_key): # Matches object keys like AWSLogs/123456789012/Config/us-east-1/2018/4/11/ConfigSnapshot/123456789012_Config_us-east-1_ConfigSnapshot_20180411T054711Z_a970aeff-cb3d-4c4e-806b-88fa14702hdb.json.gz return re.match('^AWSLogs/(\d+)/Config/([\w-]+)/(\d+)/(\d+)/(\d+)/ConfigSnapshot/[^\\\]+$', object_key) def get_configuration_snapshot_accountid(match): print('AccountId:', match.group(1)) return match.group(1) def get_configuration_snapshot_region(match): return match.group(2) def get_configuration_snapshot_date(match): return datetime.date(int(match.group(3)), int(match.group(4)), int(match.group(5))) def add_partition(accountid_partition_value, region_partition_value, dt_partition_value, partition_location): execute_query('ALTER TABLE {table_name} ADD PARTITION {partition} location \'{partition_location}\''.format( table_name=TABLE_NAME, partition=build_partition_string(accountid_partition_value, region_partition_value, dt_partition_value), partition_location=partition_location)) def drop_partition(accountid_partition_value, region_partition_value, dt_partition_value): execute_query('ALTER TABLE {table_name} DROP PARTITION {partition}'.format( table_name=TABLE_NAME, partition=build_partition_string(accountid_partition_value, region_partition_value, dt_partition_value))) def build_partition_string(accountid_partition_value, region_partition_value, dt_partition_value): return "(accountid='{accountid_partition_value}', dt='{dt_partition_value}', region='{region_partition_value}')".format( accountid_partition_value=accountid_partition_value, dt_partition_value=dt_partition_value, region_partition_value=region_partition_value) def execute_query(query): print('Executing query:', query) query_output_location = 's3://aws-athena-query-results-{account_id}-{region}'.format( account_id=ACCOUNT_ID, region=os.environ['AWS_REGION']) start_query_response = athena.start_query_execution( QueryString=query, QueryExecutionContext={ 'Database': DATABASE_NAME }, ResultConfiguration={ 'OutputLocation': query_output_location, } ) print('Query started') is_query_running = True while is_query_running: get_query_execution_response = athena.get_query_execution( QueryExecutionId=start_query_response['QueryExecutionId'] ) query_state = get_query_execution_response['QueryExecution']['Status']['State'] is_query_running = query_state in ('RUNNING','QUEUED') if not is_query_running and query_state != 'SUCCEEDED': raise Exception('Query failed') print('Query completed') ``` **sample query tried:** ``` CREATE OR REPLACE VIEW v_config_ec2_vpcs AS SELECT DISTINCT "accountId" "633328536665" , "region" "us-east-1" , "configurationItem"."resourceid" "ResourceId" , "configurationItem"."tags"['name'] "TagName" , "json_extract_scalar"("configurationItem"."configuration", '$.isdefault') "IsDefault" , "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[0].cidrblock') "CidrBlock0" , "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[1].cidrblock') "CidrBlock1" , "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[2].cidrblock') "CidrBlock2" , "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[3].cidrblock') "CidrBlock3" , "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[4].cidrblock') "CidrBlock4" FROM default.aws_config_configuration_snapshot CROSS JOIN UNNEST("configurationitems") t (configurationItem) WHERE (("dt" = 'latest') AND ("configurationItem"."resourcetype" = 'AWS::EC2::VPC')) ``` It is not able to get the data from the S3 bucket for all the AWS account for some reason(only the data for the current account the data is queried.). I have checked the s3 bucket policy and it is set up as per the given below solution. Solution referred: * https://aws.amazon.com/blogs/mt/visualizing-aws-config-data-using-amazon-athena-and-amazon-quicksight/ * https://aws.amazon.com/blogs/mt/how-to-query-your-aws-resource-configuration-states-using-aws-config-and-amazon-athena/ Thanks and Regards, Mahesh B.
0
answers
0
votes
6
views
asked 3 hours ago

Redshift serverless: error while trying to create an external table

The error: ``` > SQL Error [500310] [XX000]: [Amazon](500310) Invalid operation: Unknown std exception when calling external catalog API: ----------------------------------------------- error: The requested role arn:aws:iam::*******:role/service-role/AWSGlueServiceRole-analytics is not associated to cluster code: 30000 context: query: 0 location: xen_aws_credentials_mgr.cpp:403 process: padbmaster [pid=16004] ----------------------------------------------- ; ``` I was able to create an schema prior to creating external table, with no errors: ``` create external schema rsqa from data catalog database 'actions' region 'us-west-2' iam_role 'arn:aws:iam::**********:role/service-role/AWSGlueServiceRole-analytics'; ``` But after that, this is the sql that produces the error above: ``` create external table dev.rsqa.user_actions( event_id varchar(256), api_key varchar(32), event_type varchar(32), action_id INT8, action_name varchar(128), action_data_id INT8, action_time INT4, create_time INT4, metadata varchar(256), user_id INT8, username varchar(256), user_first_name varchar(128), user_last_name varchar(128), value FLOAT4, passive BOOL ) PARTITIONED BY ( ics varchar(32), site_id varchar(32), year varchar(32), month varchar(32), week varchar(32), dt varchar(32) ) stored as parquet location 's3://my-datalake-qa/actions/user_actions/'; ``` I found this post with a potential fix for the above: https://www.repost.aws/questions/QUDySJMPusRgSuQ7O8q2oTag/red-shift-serverless-spectrum-glue-access-issue but I created a user, give it the right permissions, and login with it, run the query again, and still have the same error what am I doing wrong?
0
answers
0
votes
7
views
asked 14 hours ago

EMR Studio - Can you import local code into a notebook?

We are trying to use EMR Studio as a development environment for a medium complexity project which has the code split out into multiple files for testing and maintainability. There's simply too much code to have in one long file. I cannot work out how to import local code into a notebook to run or test it. ## Example layout Here is a simplified example (our project is much larger): ```shell my_notebook.ipynb my_project/ __init__.py model.py report.py ``` In the notebook we might have a cell like: ```python from my_project.model import DataModel from my_project.report import Report report = Report(DataModel(spark)) report.show() ``` The current result is: ``` An error was encountered: No module named 'my_project' Traceback (most recent call last): ModuleNotFoundError: No module named 'my_project' ``` Is this possible? ## Execution environment It appears that the Python execution environment and the shell environment are completely separate, and the current directory is not available to the Python interpreter: | Execution environment | Key | Value | | -----|---------|------| | Python | User | `livy` | | Python | Current working dir | `/mnt/var/lib/livy` | | `%%sh` | User | `emr-notebook` | | `%%sh` | Current working dir | `/home/emr-notebook/e-<HEX_STRING>` | The `/home/emr-notebook/...` dir appears to contain our code, but the `livy` user which we appear to be running as doesn't permission to look at it. So even if we could guess the CWD and add it to the Python path it appears Python would not have permissions to read the code.
0
answers
0
votes
10
views
asked a day ago

Connecting Users to AWS Athena and AWS Lake Formation via Tableau Desktop using the Simba Athena JDBC Driver and Okta as Identity Provider

Hello, due to the following Step by Step Guide provided by the official AWS Athena user-guide (Link at the End of the question), it should be possible to connect Tableau Desktop to Athena and Lake Formation via the Simba Athena JDBC Driver using Okta as Idp. The challenge that I am facing right now, is although i followed each step as documented in the Athena user-guide i can not make the connection work. The error message that i recieve whenever i try to connect Tableau Desktop states: > [Simba][AthenaJDBC](100071) An error has been thrown from the AWS Athena client. The security token included in the request is invalid. [Execution ID not available] Invalid Username or Password. My athena.properties file to configure the driver on the Tableau via connection string URL looks as follows (User Name and Password are masked): ``` jdbc:awsathena://AwsRegion=eu-central-1; S3OutputLocation=s3://athena-query-results; AwsCredentialsProviderClass=com.simba.athena.iamsupport.plugin.OktaCredentialsProvider; idp_host=1234.okta.com; User=*****.*****@example.com; Password=******************; app_id=****************************; ssl_insecure=true; okta_mfa_type=oktaverifywithpush; LakeFormationEnabled=true; ``` The configuration settings used in here are from the official Simba Athena JDBC driver documentation (Version: 2.0.31). Furthermore i assigned the required permissions for my users and groups inside Lake Formation as stated in the Step by Step guide linked below. Right now I am not able to point out why I am not able to make the connection work. So I would be very greatful for any support / idea to find a solution on that topic. Best regards Link: https://docs.aws.amazon.com/athena/latest/ug/security-athena-lake-formation-jdbc-okta-tutorial.html#security-athena-lake-formation-jdbc-okta-tutorial-step-1-create-an-okta-account)
0
answers
0
votes
10
views
asked 2 days ago

Amazon Athena error on querying DynamoDB exported data

**Background** We've configured an export to s3 from dynamodb using the native dynamodb s3 export, and ION as the format output. After this, we've created a table in Athena ``` CREATE EXTERNAL TABLE export_2022_07_01_v4 ( `_PK` STRING, URL STRING, Item struct< `_PK`:string, URL:string > ) ROW FORMAT SERDE 'com.amazon.ionhiveserde.IonHiveSerDe' WITH SERDEPROPERTIES ( "ignore_malformed_ion" = "true" ) STORED AS ION LOCATION '...'; ``` Querying this works all right for small simple queries, but attempting to produce a full output with ``` UNLOAD ( SELECT Item.URL FROM "helioptileexports"."export_2022_07_01_v4" WHERE Item.URL IS NOT NULL ) to '...' WITH (format = 'TEXTFILE') ``` Results in this error ``` HIVE_CURSOR_ERROR: Syntax error at line 1 offset 2: invalid syntax [state:STATE_BEFORE_ANNOTATION_DATAGRAM on token:TOKEN_SYMBOL_OPERATOR] This query ran against the "helioptileexports" database, unless qualified by the query. Please post the error message on our forum or contact customer support with Query Id: f4ca5812-1194-41f1-bfda-00a1a5b2471b ``` **Questions** 1. Is there a way to make Athena more tolerant of formatting errors on specific files? As shown in the example, we are attempting without success to use `ignore_malformed_ion`. Is there anything beyond that that can be done? 2. Is this a bug on DynamoDB ION export process? 3. Is there any mechanism or logging to identify the files which have the malformed data and remove them?
0
answers
0
votes
16
views
asked 5 days ago

AWS Glue Studio Data Preview Fails instantly (Py4JJavaError)

Hi, I'm using AWS Glue Studio and once I click "data preview" it fails with the following error. The flow consists of 2 actions - PosgtreSQL JDBC Data source and Select Field action. The error is thrown instantly once the "Data Preview" button is clicked. The overall flow run successfully if i click "RUN" button - there are no errors and I get the outcome if I add the Target Source to dump the results back to PosgtreSQL table. It's just a Data Preview functionality that fails. Any idea what could be wrong and how to troubleshoot it? > Py4JJavaError: An error occurred while calling o538.getSampleDynamicFrame. : java.lang.UnsupportedOperationException: empty.reduceLeft at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:180) at scala.collection.AbstractTraversable.reduceLeft(Traversable.scala:104) at scala.collection.TraversableOnce$class.reduce(TraversableOnce.scala:208) at scala.collection.AbstractTraversable.reduce(Traversable.scala:104) at com.amazonaws.services.glue.SparkSQLDataSource.com$amazonaws$services$glue$SparkSQLDataSource$$getPaths(DataSource.scala:724) at com.amazonaws.services.glue.SparkSQLDataSource$$anonfun$getSampleDynamicFrame$7.apply(DataSource.scala:799) at com.amazonaws.services.glue.SparkSQLDataSource$$anonfun$getSampleDynamicFrame$7.apply(DataSource.scala:793) at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89) at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:82) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:89) at com.amazonaws.services.glue.SparkSQLDataSource.getSampleDynamicFrame(DataSource.scala:792) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)
0
answers
0
votes
15
views
asked 5 days ago

MWAA apache-airflow-providers-amazon DAGS can't import operator

I'm trying to use newest package for Amazon integration in MWAA. In this particular case I want to use `GlueJobOperator` which is a part of the latest `apache-airflow-providers-amazon` package. ([Link to the documentation](https://airflow.apache.org/docs/apache-airflow-providers-amazon/4.0.0/operators/glue.html)) MWAA Airflow version: ` 2.2.2 ` I added this to the *requirements.txt*: ``` apache-airflow-providers-amazon==4.0.0 ``` and tried to import it and use it like in the examples: ``` from airflow.providers.amazon.aws.operators.glue import GlueJobOperator glue_job = GlueJobOperator( task_id='airflow_id', job_name='job_name' wait_for_completion=True, script_location='bucket_script_prefix', s3_bucket='bucket_name', iam_role_name='iam_role' create_job_kwargs=job_arguments, script_args=script_arguments ) ``` Unfortunately, whenever DAG is parsed I get this error: ``` ... from airflow.providers.amazon.aws.operators.glue import GlueJobOperator ImportError: cannot import name 'GlueJobOperator' from 'airflow.providers.amazon.aws.operators.glue' (/usr/local/lib/python3.7/site-packages/airflow/providers/amazon/aws/operators/glue.py) ``` It's not my first rodeo with MWAA and some extra packages, plugins etc. but I am lost. In this case, I tried many things, went through docs from cover to cover and I still couldn't find the reason. I verified in MWAA that packages were successfully installed both in the logs and in the UI with the version prompted in *requirements.txt*. | Package Name | Version | Description | | --- | --- | --- | | apache-airflow-providers-amazon |4.0.0 |Amazon integration (including Amazon Web Services (AWS)). | Fun fact: I'm using `S3Hook` in some other DAGs and it parses just fine. ``` from airflow.providers.amazon.aws.hooks.s3 import S3Hook s3_hook = S3Hook() ... ```
0
answers
0
votes
22
views
asked 6 days ago

Kinesis Firehose component for AWS Greengrass not sending data streams

Hello, I'm having a problem with linking the Kinesis Firehose aws greengrass component to the AWS Kinesis Service, so i would like to know why it's not working even with following the documentation ; In my Raspberry PI I deployed couple of components but for the sake of this question, i'm only going to invoke the Kinesis Firehose component and my custom python component to send data. in the deployment configs * aws.greengrass.KinesisFirehose ``` { "containerMode": "GreengrassContainer", "containerParams": { "devices": {}, "memorySize": 65535, "mountROSysfs": false, "volumes": {} }, "inputPayloadEncodingType": "binary", "lambdaExecutionParameters": { "EnvironmentVariables": { "DEFAULT_DELIVERY_STREAM_ARN": "arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream", "DELIVERY_STREAM_QUEUE_SIZE": "5000", "PUBLISH_INTERVAL": "10" } }, "maxIdleTimeInSeconds": 60, "maxInstancesCount": 100, "maxQueueSize": 1000, "pinned": true, "pubsubTopics": { "0": { "topic": "kinesisfirehose/message/binary/#", "type": "PUB_SUB" }, "1": { "topic": "kinesisfirehose/message", "type": "PUB_SUB" }, "2": { "topic": "tinyml/message", "type": "PUB_SUB" } }, "statusTimeoutInSeconds": 60, "timeoutInSeconds": 10 } ``` * com.example.HelloWorld ``` { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "My first AWS IoT Greengrass component.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:mqttproxy:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } }, "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } }, "Message": "world" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "Run": "python3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\"" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "Run": "py -3 -u {artifacts:path}/hello_world.py \"{configuration:/Message}\"" } } ] } ``` According to the documentation, kinesis component accepts : > JSON data on the kinesisfirehose/message topic > Binary data on the kinesisfirehose/message/binary/# topic And both of them through local topics So here is my python code where I send a message on the local json topic and subscribe to the "kinesisfirehose/message/status" : ``` import json import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, JsonMessage, BinaryMessage ) TIMEOUT = 30 ipc_client = awsiot.greengrasscoreipc.connect() topic = "kinesisfirehose/message" message = "Hello, World" message_data = { "request": { "data": "Data to send to the delivery stream." }, "id": "request123" } request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.json_message = JsonMessage() publish_message.json_message.message = message_data request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future = operation.get_response() future.result(TIMEOUT) print(f"{operation} ============= {future}") import time import traceback import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") topic_name = event.message.topic_name # Handle message. print(f"RECIEVED =======: {topic_name} --------- {message}") except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "kinesisfirehose/message/status" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) future = operation.activate(request) future.result(TIMEOUT) # Keep the main thread alive, or the process will exit. try : while True: time.sleep(10) except Exception as err : print(f"{err} =====================") finally: # To stop subscribing, close the operation stream. operation.close() ``` Policy attached to the greengrass's iam role : ``` { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::s3-name-xxxx/*" }, { "Action": [ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect": "Allow", "Resource": [ "arn:aws:firehose:eu-central-1:xxxxx:deliverystream/Tiny-video-stream" ] } ] } ``` After multiple tests i noticed : * I can send MQTT * I can send to local topics * No new logs in the aws.greengrass.Kinesis Any ideas what am i have forgot to do?
0
answers
0
votes
27
views
asked 7 days ago

Not able to abort redshift connection - having a statement in waiting state

At certain point of time, all java threads which abort the redshift db connections get blocked in the service. Thread dump: ``` thread-2" #377 prio=5 os_prio=0 cpu=23073.41ms elapsed=1738215.53s tid=0x00007fd1c413a000 nid=0x5a1f waiting for monitor entry [0x00007fd193dfe000] java.lang.Thread.State: BLOCKED (on object monitor) at com.amazon.jdbc.common.SStatement.close(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) - waiting to lock <0x00000006086ac800> (a com.amazon.redshift.core.jdbc42.PGJDBC42Statement) at com.amazon.jdbc.common.SConnection.closeChildStatements(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at com.amazon.jdbc.common.SConnection.closeChildObjects(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at com.amazon.jdbc.common.SConnection.abortInternal(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) - locked <0x0000000607941af8> (a com.amazon.redshift.core.jdbc42.S42NotifiedConnection) at com.amazon.jdbc.jdbc41.S41Connection.access$000(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at com.amazon.jdbc.jdbc41.S41Connection$1.run(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.9.1/ThreadPoolExecutor.java:1128) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.9.1/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.9.1/Thread.java:829) ``` These are blocked on the threads which are still running statement on these connections. ``` thread-366" #23081 daemon prio=5 os_prio=0 cpu=972668.98ms elapsed=1553882.44s tid=0x00007fd1642b3000 nid=0x73ff waiting on condition [0x00007fd1920ac000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.9.1/Native Method) - parking to wait for <0x00000006086ae350> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.9.1/LockSupport.java:234) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(java.base@11.0.9.1/AbstractQueuedSynchronizer.java:2123) at java.util.concurrent.ArrayBlockingQueue.poll(java.base@11.0.9.1/ArrayBlockingQueue.java:432) at com.amazon.jdbc.communications.InboundMessagesPipeline.validateCurrentContainer(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at com.amazon.jdbc.communications.InboundMessagesPipeline.getNextMessageOfClass(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at com.amazon.redshift.client.PGMessagingContext.doMoveToNextClass(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at com.amazon.redshift.client.PGMessagingContext.getReadyForQuery(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at com.amazon.redshift.client.PGMessagingContext.closeOperation(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at com.amazon.redshift.dataengine.PGAbstractQueryExecutor.close(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at com.amazon.jdbc.common.SStatement.replaceQueryExecutor(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at com.amazon.jdbc.common.SStatement.executeNoParams(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) at com.amazon.jdbc.common.SStatement.execute(com.foo.drivers.redshift@1.2.43.1067/Unknown Source) - locked <0x00000006086ac800> (a com.amazon.redshift.core.jdbc42.PGJDBC42Statement) ``` Statement executed in these threads : `statement.execute(“SHOW SEARCH_PATH”);` Once the java service is restarted, it works fine. But after a few days, this issue comes up again. Q1 a. Why a close connection thread is blocked even if its child statement is in a queued state? Q1 b. Is there a way to force close the connection? Q2 Why are the child statement in the waiting state?
1
answers
0
votes
30
views
asked 8 days ago

Kinesis Analytics for SQL Application Issue

Hello, I am having trouble to properly handle query with tumbling window. My application sends 15 sensor data messages per second to Kinesis Data Stream, which is used as an input stream for Kinesis Analytics application. I am trying to run an aggregation query using a GROUP BY clause to process rows in a tumbling window by 60 second interval. The output stream then sends data to a lambda function. I expect that the messages should arrive at lambda every 60 seconds but instead, they arrive much faster, almost every second, and the aggregations don't work as expected. Here is the CloudFormation template that I am using: ApplicationCode: CREATE OR REPLACE STREAM "SENSORCALC_STREAM" ( "name" VARCHAR(16), "facilityId" INTEGER, "processId" BIGINT, "sensorId" INTEGER NOT NULL, "min_value" REAL, "max_value" REAL, "stddev_value" REAL); CREATE OR REPLACE PUMP "SENSORCALC_STREAM_PUMP" AS INSERT INTO "SENSORCALC_STREAM" SELECT STREAM "name", "facilityId", "processId", "sensorId", MIN("sensorData") AS "min_value", MAX("sensorData") AS "max_value", STDDEV_SAMP("sensorData") AS "stddev_value" FROM "SOURCE_SQL_STREAM_001" GROUP BY "facilityId","processId", "sensorId", "name", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND); KinesisAnalyticsSensorApplicationOutput: Type: "AWS::KinesisAnalytics::ApplicationOutput" DependsOn: KinesisAnalyticsSensorApplication Properties: ApplicationName: !Ref KinesisAnalyticsSensorApplication Output: Name: "SENSORCALC_STREAM" LambdaOutput: ResourceARN: !GetAtt SensorStatsFunction.Arn RoleARN: !GetAtt KinesisAnalyticsSensorRole.Arn DestinationSchema: RecordFormatType: "JSON" I would really appreciate your help in pointing what I am missing. Thank you, Serge
0
answers
0
votes
12
views
asked 8 days ago

Glue Hudi get the freshly added or updated records

Hello, I'm using Hudi connector in Glue, first, I bulk inserted the initial dataset to Hudi table, I'm adding a daily incremental records and I can query them using Athena, what I'm trying to do is to get the newly added, updated or deleted records in a separate parquet file. I've tried different approaches and configurations using both copy on write and merge on read tables but still can get the updates in a separate file. I used these configurations in different combinations: 'className' : 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.payload.event.time.field': 'ts', 'hoodie.table.name': 'table_name', 'hoodie.datasource.hive_sync.database': 'hudi_db', 'hoodie.datasource.hive_sync.table': 'table_name', 'hoodie.datasource.hive_sync.enable': 'false', # 'hoodie.datasource.write.partitionpath.field': 'date:SIMPLE', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.meta.sync.client.tool.class': 'org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool', 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'path': 's3://path/to/output/', # 'hoodie.datasource.write.operation': 'bulk_insert', 'hoodie.datasource.write.operation': 'upsert', # 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', # 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator', # 'hoodie.compaction.payload.class': 'org.apache.hudi.common.model.OverwriteWithLatestAvroPayload', # 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.delete.bootstrap.base.file': 'true', "hoodie.index.type": "GLOBAL_BLOOM", 'hoodie.file.index.enable': 'true', 'hoodie.bloom.index.update.partition.path': 'true', 'hoodie.bulkinsert.shuffle.parallelism': 1, # 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator' Thank you.
1
answers
0
votes
18
views
asked 8 days ago

EMR Serverless 6.6.0 Python SWIG Lib dependency

I'm trying to create an isolated Python virtual environment to package Python libraries necessary for a Pyspark job. I was successful to make it work by simply following these steps https://github.com/aws-samples/emr-serverless-samples/tree/main/examples/pyspark/dependencies However, there is one Python library dependency (SWIG) failing to install because it requires additional libs to be installed such as gcc gcc-c++ python3-devel. LIB: https://github.com/51Degrees/Device-Detection/tree/master/python So I added RUN yum install -y gcc gcc-c++ python3-devel to the Dockerfile image https://github.com/aws-samples/emr-serverless-samples/blob/main/examples/pyspark/dependencies/Dockerfile and it installed sucessfully and then I packaged the virtual env. However, the emr job fails with that lib python modules not being found, which makes me think that python3-devel is not present in EMR Serverless 6.6.0 Since I don't have control over the serverless environment, is any way around this? Or am I missing something? stderr ``` An error occurred while calling o198.count. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 0.0 failed 4 times, most recent failure: Lost task 19.3 in stage 0.0 (TID 89) ([2600:1f18:153d:6601:bfcc:6ff:50bc:240e] executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/hadoop/environment/lib64/python3.7/site-packages/FiftyOneDegrees/fiftyone_degrees_mobile_detector_v3_wrapper.py", line 15, in swig_import_helper return importlib.import_module(mname) File "/usr/lib64/python3.7/importlib/__init__.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) File "<frozen importlib._bootstrap>", line 1006, in _gcd_import File "<frozen importlib._bootstrap>", line 983, in _find_and_load File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked ModuleNotFoundError: No module named 'FiftyOneDegrees._fiftyone_degrees_mobile_detector_v3_wrapper' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "./jobs.zip/jobs/parsed_events_orc_processor/etl.py", line 360, in enrich_events event['device'] = calculate_device_data(event) File "./jobs.zip/jobs/parsed_events_orc_processor/etl.py", line 152, in calculate_device_data device_data = mobile_detector.match(user_agent) File "/home/hadoop/environment/lib64/python3.7/site-packages/fiftyone_degrees/mobile_detector/__init__.py", line 225, in match else settings.DETECTION_METHOD) File "/home/hadoop/environment/lib64/python3.7/site-packages/fiftyone_degrees/mobile_detector/__init__.py", line 63, in instance cls._INSTANCES[method] = cls._METHODS[method]() File "/home/hadoop/environment/lib64/python3.7/site-packages/fiftyone_degrees/mobile_detector/__init__.py", line 98, in __init__ from FiftyOneDegrees import fiftyone_degrees_mobile_detector_v3_wrapper File "/home/hadoop/environment/lib64/python3.7/site-packages/FiftyOneDegrees/fiftyone_degrees_mobile_detector_v3_wrapper.py", line 18, in <module> _fiftyone_degrees_mobile_detector_v3_wrapper = swig_import_helper() File "/home/hadoop/environment/lib64/python3.7/site-packages/FiftyOneDegrees/fiftyone_degrees_mobile_detector_v3_wrapper.py", line 17, in swig_import_helper return importlib.import_module('_fiftyone_degrees_mobile_detector_v3_wrapper') File "/usr/lib64/python3.7/importlib/__init__.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) ModuleNotFoundError: No module named '_fiftyone_degrees_mobile_detector_v3_wrapper' at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:954) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:133) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2559) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2508) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2507) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2507) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1149) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1149) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1149) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2747) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2689) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2678) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:154) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:241) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:240) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:509) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:471) at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3053) at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3052) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3770) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3768) at org.apache.spark.sql.Dataset.count(Dataset.scala:3052) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/hadoop/environment/lib64/python3.7/site-packages/FiftyOneDegrees/fiftyone_degrees_mobile_detector_v3_wrapper.py", line 15, in swig_import_helper return importlib.import_module(mname) File "/usr/lib64/python3.7/importlib/__init__.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) File "<frozen importlib._bootstrap>", line 1006, in _gcd_import File "<frozen importlib._bootstrap>", line 983, in _find_and_load File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked ModuleNotFoundError: No module named 'FiftyOneDegrees._fiftyone_degrees_mobile_detector_v3_wrapper' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream vs = list(itertools.islice(iterator, batch)) File "./jobs.zip/jobs/parsed_events_orc_processor/etl.py", line 360, in enrich_events event['device'] = calculate_device_data(event) File "./jobs.zip/jobs/parsed_events_orc_processor/etl.py", line 152, in calculate_device_data device_data = mobile_detector.match(user_agent) File "/home/hadoop/environment/lib64/python3.7/site-packages/fiftyone_degrees/mobile_detector/__init__.py", line 225, in match else settings.DETECTION_METHOD) File "/home/hadoop/environment/lib64/python3.7/site-packages/fiftyone_degrees/mobile_detector/__init__.py", line 63, in instance cls._INSTANCES[method] = cls._METHODS[method]() File "/home/hadoop/environment/lib64/python3.7/site-packages/fiftyone_degrees/mobile_detector/__init__.py", line 98, in __init__ from FiftyOneDegrees import fiftyone_degrees_mobile_detector_v3_wrapper File "/home/hadoop/environment/lib64/python3.7/site-packages/FiftyOneDegrees/fiftyone_degrees_mobile_detector_v3_wrapper.py", line 18, in <module> _fiftyone_degrees_mobile_detector_v3_wrapper = swig_import_helper() File "/home/hadoop/environment/lib64/python3.7/site-packages/FiftyOneDegrees/fiftyone_degrees_mobile_detector_v3_wrapper.py", line 17, in swig_import_helper return importlib.import_module('_fiftyone_degrees_mobile_detector_v3_wrapper') File "/usr/lib64/python3.7/importlib/__init__.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) ModuleNotFoundError: No module named '_fiftyone_degrees_mobile_detector_v3_wrapper' at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution ```
2
answers
0
votes
57
views
asked 9 days ago

Hudi Clustering

I am using EMR 6.6.0, which has hudi 10.1. I am trying to bulkinsert and do inline clustering using Hudi. But seems its not clustering the file as per file size being mentioned. But it is still producing the files in KB only. I tried below configuration: > hudi_clusteringopt = { 'hoodie.table.name': 'myhudidataset_upsert_legacy_new7', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': 'my_hudi_db', 'hoodie.datasource.hive_sync.table': 'myhudidataset_upsert_legacy_new7', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.write.operation": "bulk_insert", } # "hoodie.datasource.write.operation": "bulk_insert", try: inputDF.write.format("org.apache.hudi"). \ options(**hudi_clusteringopt). \ option("hoodie.parquet.small.file.limit", "0"). \ option("hoodie.clustering.inline", "true"). \ option("hoodie.clustering.inline.max.commits", "0"). \ option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824"). \ option("hoodie.clustering.plan.strategy.small.file.limit", "629145600"). \ option("hoodie.clustering.plan.strategy.sort.columns", "pk_col"). \ mode('append'). \ save("s3://xxxxxxxxxxxxxx"); except Exception as e: print(e) Here is the data set if someone wants to regenerate: inputDF = spark.createDataFrame( [ ("1001",1001, "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("1011",1011, "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("1021",1021, "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("1031",1031, "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("1041",1041, "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("1051",1051, "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id","id_val", "creation_date", "last_update_time"] )
1
answers
0
votes
7
views
asked 10 days ago
1
answers
0
votes
23
views
asked 12 days ago

Describe table in Athena fails with insufficient lake formation permissions

When I try to run the following query via the Athena JDBC Driver ```sql describe gitlab.issues ``` I get the following error: > [Simba][AthenaJDBC](100071) An error has been thrown from the AWS Athena client. FAILED: SemanticException Unable to fetch table gitlab. Insufficient Lake Formation permission(s) on gitlab (Service: AmazonDataCatalog; Status Code: 400; Error Code: AccessDeniedException; Request ID: be6aeb1b-fc06-410d-9723-2df066307b35; Proxy: null) [Execution ID: a2534d22-c4df-49e9-8515-80224779bf01] the following query works: ```sql select * from gitlab.issues limit 10 ``` The role that is used has the `DESCRIBE` permission on the `gitlab` database and `DESCRIBE, SELECT` permissions on the table `issues`. It also has the following IAM permissions: ```json { "Version": "2012-10-17", "Statement": [ { "Action": [ "athena:BatchGetNamedQuery", "athena:BatchGetQueryExecution", "athena:CreatePreparedStatement", "athena:DeletePreparedStatement", "athena:GetDataCatalog", "athena:GetDatabase", "athena:GetNamedQuery", "athena:GetPreparedStatement", "athena:GetQueryExecution", "athena:GetQueryResults", "athena:GetQueryResultsStream", "athena:GetTableMetadata", "athena:GetWorkGroup", "athena:ListDatabases", "athena:ListNamedQueries", "athena:ListPreparedStatements", "athena:ListDataCatalogs", "athena:ListEngineVersions", "athena:ListQueryExecutions", "athena:ListTableMetadata", "athena:ListTagsForResource", "athena:ListWorkGroups", "athena:StartQueryExecution", "athena:StopQueryExecution", "athena:UpdatePreparedStatement" ], "Resource": "*", "Effect": "Allow" }, { "Action": [ "glue:BatchGetCustomEntityTypes", "glue:BatchGetPartition", "glue:GetCatalogImportStatus", "glue:GetColumnStatisticsForPartition", "glue:GetColumnStatisticsForTable", "glue:GetCustomEntityType", "glue:GetDatabase", "glue:GetDatabases", "glue:GetPartition", "glue:GetPartitionIndexes", "glue:GetPartitions", "glue:GetSchema", "glue:GetSchemaByDefinition", "glue:GetSchemaVersion", "glue:GetSchemaVersionsDiff", "glue:GetTable", "glue:GetTableVersion", "glue:GetTableVersions", "glue:GetTables", "glue:GetUserDefinedFunction", "glue:GetUserDefinedFunctions", "glue:ListCustomEntityTypes", "glue:ListSchemaVersions", "glue:ListSchemas", "glue:QuerySchemaVersionMetadata", "glue:SearchTables" ], "Resource": "*", "Effect": "Allow" }, { "Condition": { "ForAnyValue:StringEquals": { "aws:CalledVia": "athena.amazonaws.com" } }, "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::aws-athena-query-results-123456789012-eu-west-1", "arn:aws:s3:::aws-athena-query-results-123456789012-eu-west-1/*", "arn:aws:s3:::aws-athena-federation-spill-123456789012-eu-west-1", "arn:aws:s3:::aws-athena-federation-spill-123456789012-eu-west-1/*" ], "Effect": "Allow" }, { "Action": [ "lakeformation:CancelTransaction", "lakeformation:CommitTransaction", "lakeformation:DescribeResource", "lakeformation:DescribeTransaction", "lakeformation:ExtendTransaction", "lakeformation:GetDataAccess", "lakeformation:GetQueryState", "lakeformation:GetQueryStatistics", "lakeformation:GetTableObjects", "lakeformation:GetWorkUnitResults", "lakeformation:GetWorkUnits", "lakeformation:StartQueryPlanning", "lakeformation:StartTransaction" ], "Resource": "*", "Effect": "Allow" }, { "Condition": { "ForAnyValue:StringEquals": { "aws:CalledVia": "athena.amazonaws.com" } }, "Action": "lambda:InvokeFunction", "Resource": "arn:aws:lambda:*:*:function:athena-federation-*", "Effect": "Allow" }, { "Condition": { "ForAnyValue:StringEquals": { "aws:CalledVia": "athena.amazonaws.com" } }, "Action": ["s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket"], "Resource": "*", "Effect": "Allow" } ] } ``` even if I make the role a LakeFormation Admin, Database Creator, assign Super Permissions to the table and database and add the AdministratorAccess IAM Policy to the role it still fails.
0
answers
0
votes
26
views
asked 13 days ago
0
answers
0
votes
30
views
  • 1
  • 90 / page