By using AWS re:Post, you agree to the Terms of Use
/Analytics/

Questions tagged with Analytics

Sort by most recent
  • 1
  • 90 / page

Browse through the questions and answers listed below or filter and sort to narrow down your results.

Not able to get the data in query result in the Athena for the AWS config from S3 bucket

Hi, I have been trying to implement a monitoring solution to monitor the resources for AWS accounts in the organizations with AWS config, AWS Athena and Quicksight. I have set up all the services however Athena is not able to query all the data from the S3 bucket where the config data for all the accounts are stored. It is able to only query the data for the current account from where I am running the query. I can see the config data for all accounts in the S3 bucket as well. **Athena table creation query** ``` CREATE EXTERNAL TABLE aws_config_configuration_snapshot ( fileversion STRING, configSnapshotId STRING, configurationitems ARRAY < STRUCT < configurationItemVersion : STRING, configurationItemCaptureTime : STRING, configurationStateId : BIGINT, awsAccountId : STRING, configurationItemStatus : STRING, resourceType : STRING, resourceId : STRING, resourceName : STRING, ARN : STRING, awsRegion : STRING, availabilityZone : STRING, configurationStateMd5Hash : STRING, configuration : STRING, supplementaryConfiguration : MAP < STRING, STRING >, tags: MAP < STRING, STRING >, resourceCreationTime : STRING > > ) PARTITIONED BY (accountid STRING, dt STRING, region STRING) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://<S3_BUCKET_NAME>/AWSLogs/'; ``` **The lambda function used for data partitioning as per accounts** ``` import datetime import re import boto3 import os TABLE_NAME = 'aws_config_configuration_snapshot' DATABASE_NAME = 'sampledb' ACCOUNT_ID = None # Determined at runtime LATEST_PARTITION_VALUE = 'latest' athena = boto3.client('athena') def lambda_handler(event, context): global ACCOUNT_ID object_key = event['Records'][0]['s3']['object']['key'] match = get_configuration_snapshot_object_key_match(object_key) if match is None: print('Ignoring event for non-configuration snapshot object key', object_key) return print('Adding partitions for configuration snapshot object key', object_key) ACCOUNT_ID = context.invoked_function_arn.split(':')[4] object_key_parent = 's3://{bucket_name}/{object_key_parent}/'.format( bucket_name=event['Records'][0]['s3']['bucket']['name'], object_key_parent=os.path.dirname(object_key)) configuration_snapshot_accountid = get_configuration_snapshot_accountid(match) configuration_snapshot_region = get_configuration_snapshot_region(match) configuration_snapshot_date = get_configuration_snapshot_date(match) drop_partition(configuration_snapshot_accountid, configuration_snapshot_region, LATEST_PARTITION_VALUE) add_partition(configuration_snapshot_accountid, configuration_snapshot_region, LATEST_PARTITION_VALUE, object_key_parent) add_partition(configuration_snapshot_accountid, configuration_snapshot_region, get_configuration_snapshot_date(match).strftime('%Y-%m-%d'), object_key_parent) def get_configuration_snapshot_object_key_match(object_key): # Matches object keys like AWSLogs/123456789012/Config/us-east-1/2018/4/11/ConfigSnapshot/123456789012_Config_us-east-1_ConfigSnapshot_20180411T054711Z_a970aeff-cb3d-4c4e-806b-88fa14702hdb.json.gz return re.match('^AWSLogs/(\d+)/Config/([\w-]+)/(\d+)/(\d+)/(\d+)/ConfigSnapshot/[^\\\]+$', object_key) def get_configuration_snapshot_accountid(match): print('AccountId:', match.group(1)) return match.group(1) def get_configuration_snapshot_region(match): return match.group(2) def get_configuration_snapshot_date(match): return datetime.date(int(match.group(3)), int(match.group(4)), int(match.group(5))) def add_partition(accountid_partition_value, region_partition_value, dt_partition_value, partition_location): execute_query('ALTER TABLE {table_name} ADD PARTITION {partition} location \'{partition_location}\''.format( table_name=TABLE_NAME, partition=build_partition_string(accountid_partition_value, region_partition_value, dt_partition_value), partition_location=partition_location)) def drop_partition(accountid_partition_value, region_partition_value, dt_partition_value): execute_query('ALTER TABLE {table_name} DROP PARTITION {partition}'.format( table_name=TABLE_NAME, partition=build_partition_string(accountid_partition_value, region_partition_value, dt_partition_value))) def build_partition_string(accountid_partition_value, region_partition_value, dt_partition_value): return "(accountid='{accountid_partition_value}', dt='{dt_partition_value}', region='{region_partition_value}')".format( accountid_partition_value=accountid_partition_value, dt_partition_value=dt_partition_value, region_partition_value=region_partition_value) def execute_query(query): print('Executing query:', query) query_output_location = 's3://aws-athena-query-results-{account_id}-{region}'.format( account_id=ACCOUNT_ID, region=os.environ['AWS_REGION']) start_query_response = athena.start_query_execution( QueryString=query, QueryExecutionContext={ 'Database': DATABASE_NAME }, ResultConfiguration={ 'OutputLocation': query_output_location, } ) print('Query started') is_query_running = True while is_query_running: get_query_execution_response = athena.get_query_execution( QueryExecutionId=start_query_response['QueryExecutionId'] ) query_state = get_query_execution_response['QueryExecution']['Status']['State'] is_query_running = query_state in ('RUNNING','QUEUED') if not is_query_running and query_state != 'SUCCEEDED': raise Exception('Query failed') print('Query completed') ``` **sample query tried:** ``` CREATE OR REPLACE VIEW v_config_ec2_vpcs AS SELECT DISTINCT "accountId" "633328536665" , "region" "us-east-1" , "configurationItem"."resourceid" "ResourceId" , "configurationItem"."tags"['name'] "TagName" , "json_extract_scalar"("configurationItem"."configuration", '$.isdefault') "IsDefault" , "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[0].cidrblock') "CidrBlock0" , "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[1].cidrblock') "CidrBlock1" , "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[2].cidrblock') "CidrBlock2" , "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[3].cidrblock') "CidrBlock3" , "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[4].cidrblock') "CidrBlock4" FROM default.aws_config_configuration_snapshot CROSS JOIN UNNEST("configurationitems") t (configurationItem) WHERE (("dt" = 'latest') AND ("configurationItem"."resourcetype" = 'AWS::EC2::VPC')) ``` It is not able to get the data from the S3 bucket for all the AWS account for some reason(only the data for the current account the data is queried.). I have checked the s3 bucket policy and it is set up as per the given below solution. Solution referred: * https://aws.amazon.com/blogs/mt/visualizing-aws-config-data-using-amazon-athena-and-amazon-quicksight/ * https://aws.amazon.com/blogs/mt/how-to-query-your-aws-resource-configuration-states-using-aws-config-and-amazon-athena/ Thanks and Regards, Mahesh B.
0
answers
0
votes
6
views
asked 3 hours ago
0
answers
0
votes
30
views

Quicksight Athena - analysis error: "HIVE_UNSUPPORTED_FORMAT: Unable to create input format"

Hello. I'm trying to create an analysis from my DocumentDB instance. I'm using the aws services Glue, Athena and Quicksight. In Glue I have created a connection to the DocumentDB and a crawler for auto creating tables. This works as expected and tables are created and displayed in glue. Even though I specify that the crawler should not give the tables any prefixes, it does add the database name as a prefix. When I look at the Glue catalog in Athena (the default AwsDataCatalog) I do see the database that was created in glue, however it does not show any tables. If I click on edit, it takes me to the correct database in glue which displays the tables that have been created by the previously mentioned crawler. So my first question is **Why doesn't the tables show up in Athena?** This is blocking me from performing queries in Athena. When I go to QuickSight and select the default Athena glue catalog ("AwsDataCatalog") I DO get the tables created by the crawler, and I can create datasets. However, when I try to create an analysis using these datasets, I get the error: ``` sourceErrorCode: 100071 sourceErrorMessage: [Simba][AthenaJDBC](100071) An error has been thrown from the AWS Athena client. HIVE_UNSUPPORTED_FORMAT: Unable to create input format ``` I have looked a bit around and some people said that this error is due to the table properties **"Input format"** and **"Output format"** being empty (which they indeed are for me). I have tried entering almost all the different formats to the table, but I keep on getting the Quicksight error above only now it has the input format at the end ``` HIVE_UNSUPPORTED_FORMAT: Unable to create input format json ``` **So my second questions is** I do not see anywhere in the crawler where I can specify input or output format. Does it have to be done manually? And What are the correct input and output formats for my setup?
0
answers
0
votes
32
views
asked a month ago

Quicksight: Navigation Action with parameters doesn't update the controls in the new tab

Hello! I have an analysis in Quicksight with two tabs - one is an "overview" tab where we look at all of our units, and then there's a "deep dive" tab where it gives more detail about a single unit. The "overview" tab has a table where it lists all of the units and various metrics for them. I want to be able to click on a row in that table, and have it take me to the "deep dive" tab. When it goes to the deep dive tab, I want it to fill in a couple controls on the new tab (like the unit serial number) so that all the visuals filter by that serial number, and the tab is showing me just information about this specific unit. It seems like a Navigation Action with updating parameters is the way to go, but it doesn't work. When I click on the row I want (both in the analysis and the dashboard), it takes me to the deep dive tab, but it doesn't fill in the controls, so they still display "all". it seems like this is not the functionality intended. In the "Action" tab for the visual in the "overview" tab, I have selected the following options: Activation: "Select" Action Type: Navigation Action Target Sheet: deep dive tab Parameters: Here I have listed the parameters I want to fill, and the fields that correspond to them. Again, when I click on the row, it takes me to the new tab, but it doesn't fill in the parameters. Is this a bug or is this as intended? If it's intended, how do I make it so users can click on a row and it takes me to the deep dive tab and fills in the controls so that the visuals filter by the control values?
1
answers
1
votes
23
views
asked 2 months ago

EMR Studio PySpark Kernel uses lowered version of pip

I am using a Jupyter Notebook which is provided by an AWS managed service called EMR Studio. My understanding of how these notebooks work is that they are hosted on EC2 instances that I provision as part of my EMR cluster. Specifically with the PySpark kernel using the task nodes. Currently when I run the command `sc.list_packages()` I see that pip is at version 9.0.1 whereas if I SSH onto the main node and run `pip list` I see that pip is at version 20.2.2. I have issues running the command `sc.install_pypi_package()` due to the lowered pip version in the Notebook. In the notebook cell if I run `import pip` then `pip` I see that the module is located at ``` <module 'pip' from '/mnt1/yarn/usercache/<LIVY_IMPERSONATION_ROLE>/appcache/application_1652110228490_0001/container_1652110228490_0001_01_000001/tmp/1652113783466-0/lib/python3.7/site-packages/pip/__init__.py'> ``` I am assuming this is most likely within a virtualenv of some sort running as an application on the task node? I am unsure of this and I have no concrete evidence of how the virtualenv is provisioned if there is one. If I run `sc.uninstall_package('pip')` then `sc.list_packages()` I see pip at a version of 20.2.2 which is what I am looking to initially start off with. The module path is the same as previously mentioned. How can I get pip 20.2.2 in the virtualenv instead of pip 9.0.1? If I import a package like numpy I see that the module is located at a different location from where pip is. Any reason for this? ``` <module 'numpy' from '/usr/local/lib64/python3.7/site-packages/numpy/__init__.py'> ``` As for pip 9.0.1 the only reference I can find at the moment is in `/lib/python2.7/site-packages/virtualenv_support/pip-9.0.1-py2.py3-none-any.whl`. One directory outside of this I see a file called `virtualenv-15.1.0-py2.7.egg-info` which if I `cat` the file states that it upgrades to pip 9.0.1. I have tried to remove the pip 9.0.1 wheel file and replaced it with a pip 20.2.2 wheel which caused issues with the PySpark kernel being able to provision properly. There is also a `virtualenv.py` file which does reference a `__version__ = "15.1.0"`. Lastly I have noticed in this AWS blog post that there is a picture which shows pip at version 19.2.3 but I am not sure how that was achieved. It is below the console output for the command `sc.list_packages()`. https://aws.amazon.com/blogs/big-data/install-python-libraries-on-a-running-cluster-with-emr-notebooks/
0
answers
0
votes
9
views
asked 2 months ago

Sending Reublished MQTT data to DynamoDB table

I have successfully republished data from my "lorawan/+/uplink" topic to my new topic "datashow". I am now trying send the data I extracted from the original topic to a DynamoDB. I created a rule named "SensorDB_rule" to select the data from the "datashow" topic and send it to a DynamoDB. The data I receive is listed below. ``` { "output": { "DeviceID": "eui-70b3d57ed004c725", "PayloadData": { "Battery": 100, "Sensor": -27.730278573288206, "alt": 63.10, "lat": 38.789196, "lon": -77.073914, "time": "18:52:43" } } } ``` The rule SQL Statement is listed below ``` SELECT output.DeviceID as Device_ID, output.PayloadData.Battery as Battery, output.PayloadData.Sensor as Sensor, output.PayloadData.alt as Altitude, output.PayloadData.lat as Latitude, output.PayloadData.lon as Longitude, output.PayloadData.time as Time FROM 'datashow' ``` I created a DynamoDB table named "SensorDB" and set "DeviceID" as my Partition Key and "time" as my Sort key. With both set to String ``` DeviceID time ``` I'm using the Default settings for my DB table. and created the table Back in the "SensorDB_rule" I selected my table I just created ("SensorDB") and entered the following ``` Partition key- DeviceID Partition key type- STRING Partition key value- $DeviceID Sort Key- time Sort key type- STRING Sort key value- $time ``` and wrote the message to the column "MyMessageData" finally I created a new IAM role names "SensorDB_role" and attached the following permissions AWSIoTLogging AWSIoTRuleActions AWSIoTEventsFullAccess AWSIoTFullAccess AmazonDynamoDBFullAccess I have double checked all my information but I am still not getting anything inside of my table. Any suggestions on how to fix this? (adding policies, correcting my SQL statement, changing my table settings)
3
answers
0
votes
14
views
asked 2 months ago

What is the best way to plot array types in quicksight

Is it possible to plot X-Axes values in SignalX_Sampled_messsignal and Y-Axes values in SignalX_Sampled_zeitvektor) in QS Data example: ``` SignalX_Sampled_messsignal,SignalX_Sampled_zeitvektor [47.5, 47.5, 47.5, 47.5, 47.5, 47.5, 47.5, 47.5, 47.5],[-1016, -816, -616, -416, -216, -16, 184, 384, 584] [26.25, 26.25, 26.25, 26.25, 26.25, 26.25, 26.25, 26.25, 26.25],[-1015, -815, -615, -415, -215, -15, 185, 385, 585] [18.25, 18.25, 18.25, 18.25, 18.25, 18.25, 18.25, 18.25, 18.25],[-1045, -845, -645, -445, -245, -45, 155, 355, 555] [79.75, 79.75, 79.75, 79.75, 79.75, 79.75, 79.75, 79.75, 79.75],[-1045, -845, -645, -445, -245, -45, 155, 355, 555] [7.75, 7.75, 7.75, 7.75, 7.75, 7.75, 7.75, 7.75, 7.75, 7.75],[-1076, -876, -676, -476, -276, -76, 124, 324, 524, 724] [26.75, 26.75, 26.75, 26.75, 26.75, 26.75, 26.75, 26.75, 26.75],[-1006, -806, -606, -406, -206, -6, 194, 394, 594] [13.75, 13.75, 13.75, 13.75, 13.75, 13.75, 13.75, 13.75, 13.75, 13.75],[-1144, -944, -744, -544, -344, -144, 56, 256, 456, 656] [67.75, 67.75, 67.75, 67.75, 67.75, 67.75, 67.75, 67.75, 67.75],[-1046, -846, -646, -446, -246, -46, 154, 354, 554] [43.25, 43.25, 43.25, 43.25, 43.25, 43.25, 43.25, 43.25, 43.25, 43.25],[-1127, -927, -727, -527, -327, -127, 73, 273, 473, 673] [22.5, 22.5, 22.5, 22.5, 22.5, 22.5, 22.5, 22.5, 22.5],[-1046, -846, -646, -446, -246, -46, 154, 354, 554] [25.75, 25.75, 25.75, 25.75, 25.75, 25.75, 25.75, 25.75, 25.75],[-1044, -844, -644, -444, -244, -44, 156, 356, 556] [25.75, 25.75, 25.75, 25.75, 25.75, 25.75, 25.75, 25.75, 25.75, 25.75],[-1066, -866, -666, -466, -266, -66, 134, 334, 534, 734] ```
0
answers
0
votes
17
views
asked 2 months ago

AWs trigger EventBatchingCondition/BatchWindow is not optional

Hi team, I have a glue workflow : trigger (type = "EVENT") => trigger a glue job (to take data from S3 and push them to MySQL RDS) I configured the glue Triggering criteria to kickoff the glue job after 5 events were received. in the console it says : > Specify the number of events received or maximum elapsed time before firing this trigger. > Time delay in seconds (optional) on AWS documentation it says also it's not required : ``` BatchWindow Window of time in seconds after which EventBridge event trigger fires. Window starts when first event is received. Type: Integer Valid Range: Minimum value of 1. Maximum value of 900. Required: No ``` So I want only my trigger to be triggered only and only after 5 events are received and not depending on: Time delay in seconds (optional). actually, the Time delay in seconds (optional) is set to 900 by default and my job is started after 900s even if there are no 5 events received. that's not the behaviour we want. We want ONLY the job to be started after x events are received. I tried via the console to edit the trigger and remove the 900s for the Time delay in seconds (optional) input but I can't save it until I put a value on it. it says it's optional but it doesn't seem to be. is there a workaround to make the trigger not take account of Time delay in seconds (optional)? and only be launched when it received x events and nothing else. right now the behaviour I have is that my job is triggered after 900s, we want to eliminate this case and let the job be triggered only and only if there is x event received and nothing else. how can I make the Time delay in seconds (optional) input optional, because now the console forces me to put a value in there? thank you.
1
answers
0
votes
22
views
asked 2 months ago

Is there a way to create a Redshift Table from a Glue table's schema?

Athena tables can be created from Glue tables which can have schemas based on crawlers. **Is it also possible to use the schema of a Glue table to generate a *Redshift-compatible* `CREATE TABLE` statement? ** I tried `SHOW CREATE TABLE encounter;` in Athena. And then I tried plugging in the resulting `CREATE TABLE` statement in Redshift, but got an error: ``` ERROR: syntax error at or near "`" Position: 23. ``` I can go through the statement Athena generated and clean it up to fit Redshift requirements, like taking out the back-ticks, but I'm wondering if there's any more direct way to generate a table based on a Glue table? This is that `CREATE TABLE` statement that Athena generated: ``` CREATE EXTERNAL TABLE `encounter`( `resourcetype` string COMMENT 'from deserializer', `id` string COMMENT 'from deserializer', `meta` struct<lastupdated:string,profile:array<string>> COMMENT 'from deserializer', `identifier` array<struct<use:string,system:string,value:string>> COMMENT 'from deserializer', `status` string COMMENT 'from deserializer', `class` struct<system:string,code:string> COMMENT 'from deserializer', `type` array<struct<coding:array<struct<system:string,code:string,display:string>>,text:string>> COMMENT 'from deserializer', `subject` struct<reference:string,display:string> COMMENT 'from deserializer', `participant` array<struct<type:array<struct<coding:array<struct<system:string,code:string,display:string>>,text:string>>,period:struct<start:string,end:string>,individual:struct<reference:string,display:string>>> COMMENT 'from deserializer', `period` struct<start:string,end:string> COMMENT 'from deserializer', `location` array<struct<location:struct<reference:string,display:string>>> COMMENT 'from deserializer', `serviceprovider` struct<reference:string,display:string> COMMENT 'from deserializer', `reasoncode` array<struct<coding:array<struct<system:string,code:string,display:string>>>> COMMENT 'from deserializer') ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='class,id,identifier,location,meta,participant,period,reasonCode,resourceType,serviceProvider,status,subject,type') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket/Encounter/' TBLPROPERTIES ( 'CrawlerSchemaDeserializerVersion'='1.0', 'CrawlerSchemaSerializerVersion'='1.0', 'UPDATED_BY_CRAWLER'='healthlake-export-crawler', 'averageRecordSize'='1561', 'classification'='json', 'compressionType'='none', 'objectCount'='14', 'recordCount'='53116', 'sizeKey'='83059320', 'typeOfData'='file') ``` Here's an example of what the original data looks like (it's synthetic data, so not PHI): ``` { "period": { "start": "2019-11-18T13:53:49-08:00", "end": "2019-11-18T14:23:49-08:00" }, "subject": { "reference": "Patient/92e36d1e-66a2-4e77-9f50-155f7edf819c", "display": "Cyndi533 Bogan287" }, "serviceProvider": { "reference": "Organization/3ecb1bdd-03d7-3fd2-b52d-8df2a04f5b0a", "display": "SOUTH SHORE SKIN CENTER, LLC" }, "id": "b39745ae-14dd-46b3-9345-2916efa759ad", "type": [{ "coding": [{ "system": "http://snomed.info/sct", "code": "410620009", "display": "Well child visit (procedure)" }], "text": "Well child visit (procedure)" }], "class": { "system": "http://terminology.hl7.org/CodeSystem/v3-ActCode", "code": "AMB" }, "participant": [{ "period": { "start": "2019-11-18T13:53:49-08:00", "end": "2019-11-18T14:23:49-08:00" }, "individual": { "reference": "Practitioner/c51e847b-fcd0-3f98-98a7-7e4274a2e6f3", "display": "Dr. Jacquelyne425 O'Reilly797" }, "type": [{ "coding": [{ "system": "http://terminology.hl7.org/CodeSystem/v3-ParticipationType", "code": "PPRF", "display": "primary performer" }], "text": "primary performer" }] }], "resourceType": "Encounter", "status": "finished", "meta": { "lastUpdated": "2022-04-08T15:40:39.926Z" } } ```
2
answers
0
votes
48
views
asked 2 months ago

How to use psycopg2 to load data into Redshift tables with the copy command

I am trying to load data from an EC2 instance into Redshift tables but cannot figure out how to do this using the copy command. I have tried the following to create the sql queries: ``` def copy_query_creator(table_name, schema): copy_sql_template = sql.SQL("COPY {table_name} from stdin iam_role 'iam_role' DATEFORMAT 'MM-DD-YYYY' TIMEFORMAT 'MM-DD-YYYY HH12:MI:SS AM' ACCEPTINVCHARS fixedwidth {schema}").format(table_name = sql.Identifier(table_name),schema = schema) return copy_sql_template ``` and ``` def copy_query_creator_2(table_name, iam_role, schema): copy_sql_base = """ COPY {} FROM STDIN iam_role {} DATEFORMAT 'MM-DD-YYYY' TIMEFORMAT 'MM-DD-YYYY HH12:MI:SS AM' ACCEPTINVCHARS fixedwidth {}""".format(table_name, iam_role, schema) print(copy_sql_base) return copy_sql_base ``` where schema is the fixedwidth_spec in the example snippet below: ``` copy table_name from 's3://mybucket/prefix' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole' fixedwidth 'fixedwidth_spec'; ``` The function that uses the query created looks like so: ``` def copy_query(self, filepath): schema = Query.create_schema() #returns the formatted fixedwidth_spec table_name = Query.get_table_def() #returns the table_name print(copy_query_creator_2(table_name, iam_role, schema)) self.connect() with self.connection.cursor() as cursor: try: with open(filepath) as f: cursor.copy_expert(copy_query_creator_2(table_name, iam_role, schema), f) print('copy worked') logging.info(f'{copy_query_creator_2(table_name, iam_role, schema)} ran; {cursor.rowcount} records copied.') except (Exception, psycopg2.Error) as error: logging.error(error) print(error) ``` The two attempts return errors. The first returns 'Composed elements must be Composable, got %r instead' while the latter returns 'error at or near STDIN'. Please help.
0
answers
0
votes
16
views
asked 2 months ago

aws glue job fail for escaper char

Hi team, I tried to load a big CSV file from s3 to RDS MySQL using AWS glue, I have an escaper character on the file (special character). This escape char is also defined on the crawled CSV table. each time the job fails with an error : `An error occurred while calling o122.pyWriteDynamicFrame. Duplicate entry '123456' for key 'MySQL table.PRIMARY'` I don't have any duplicate keys on my file and the table is truncated each time before running the job. I tried to narrow down the issue by dividing the file into chunks each chank runs successfully, but the whole file in a single job always fails with the above error. I divided the whole file into chunks,** every chunk runs successfully and I get the full data.** I can't figure out why ? is this a glue issue or a data issue ..? I know the issue is related to my escape character because when I removed them the whole file passed **OR** when I replace my special character escape with "\" the whole file passes also. is that because glue doesn't support certain escape characters (I have this issue with big files) Not sure why the whole file with the escaper fails and when we run it in sub-chunk every chunk passes? any idea? glue script: ``` datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db_csv", table_name = "tbl_csvxx", transformation_ctx = "datasource0") applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "string", "id", "string"), ("col1", "string", "col1", "string"), ("date1", "string", "date2", "timestamp"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string"), ("col4", "string", "col24", "string"), ("col5", "string", "col5", "string"),...], transformation_ctx = "applymapping1") selectfields2 = SelectFields.apply(frame = applymapping1, paths = [ "col1", "col2", "col3", "id","col4", "col5",...], transformation_ctx = "selectfields2") datasink3 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = selectfields2, catalog_connection = conn_name, connection_options = {"dbtable": "mysqltable", "database": db_name}, transformation_ctx = "datasink3") ``` sample data : ``` "123","2018-02-09 12:16:38.000","2018-02-09 12:16:38.000","addr1 ®" addr2®" addr3®"",,,"22","city1","121",,,,,"CC" "456","2018-02-09 12:16:38.000","2018-02-09 12:16:38.000","sds, dssdds®"F®", sds sds, dwr, re2",,,"ree364","ABD","288",,,,,"N" "789","2018-02-09 12:16:38.000","2018-02-09 12:16:38.000","Alle# 02, Sept# 06, sdsx,",,"SAP# ®"C®"","DPPK# 05","dssd","313","Alkl",,,"1547","P" ``` Thank you.
1
answers
0
votes
32
views
asked 3 months ago

Creating Dynamic Frame using MongoDB connection, successfully able to crawl data in Glue Data Catalog

Hi All, I created a mongodb connection successfully, my connection tests successfully and was able to use a Crawler to create metadata in the Glue Data Catalog. However, when i use below where i am adding my mongodb database name and collection name in additional_options parameter i get an error: ***data_catalog_database = 'tinkerbell' data_catalog_table = 'tinkerbell_funds' glueContext.create_dynamic_frame_from_catalog( database = data_catalog_database, table_name = data_catalog_table, additional_options = {"database":"tinkerbell", "collection":"funds"}) *** following is the error: An error was encountered: An error occurred while calling o177.getDynamicFrame. : java.lang.NoSuchMethodError: com.mongodb.internal.connection.DefaultClusterableServerFactory.<init>(Lcom/mongodb/connection/ClusterId;Lcom/mongodb/connection/ClusterSettings;Lcom/mongodb/connection/ServerSettings;Lcom/mongodb/connection/ConnectionPoolSettings;Lcom/mongodb/connection/StreamFactory;Lcom/mongodb/connection/StreamFactory;Lcom/mongodb/MongoCredential;Lcom/mongodb/event/CommandListener;Ljava/lang/String;Lcom/mongodb/MongoDriverInformation;Ljava/util/List;)V When i use it without the additional_parameters: ***glueContext.create_dynamic_frame_from_catalog( database = data_catalog_database, table_name = data_catalog_table)*** I get following error: An error was encountered: Missing collection name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.collection' property Traceback (most recent call last): File "/home/glue_user/aws-glue-libs/PyGlue.zip/awsglue/context.py", line 179, in create_dynamic_frame_from_catalog return source.getFrame(**kwargs) File "/home/glue_user/aws-glue-libs/PyGlue.zip/awsglue/data_source.py", line 36, in getFrame jframe = self._jsource.getDynamicFrame() File "/home/glue_user/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/glue_user/spark/python/pyspark/sql/utils.py", line 117, in deco raise converted from None pyspark.sql.utils.IllegalArgumentException: Missing collection name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.collection' property Can someone please help me pass these parameters correctly?
0
answers
0
votes
6
views
asked 3 months ago
  • 1
  • 90 / page