如何解决使用默认 Data Pipeline 模板将 CSV 或 TSV 文件导入 DynamoDB 时出现的“MalformedJson”错误?
2 分钟阅读
0
当我尝试使用默认的 AWS Data Pipeline 模板将 CSV 或 TSV 文件导入 Amazon DynamoDB 时,出现“MalformedJson”错误。
解决方法
**注意:**此解决方法适用于 Amazon EMR 4.7.0 及更高版本。
仅当首先使用将 DynamoDB 表导出到 S3 模板导出数据时,才能使用从 S3 导入 DynamoDB 备份数据模板。如果未使用将 DynamoDB 表导出到 S3 模板,则使用包含 HiveActivity 对象的 DynamoDBDataFormat 对象创建新管道。为 HiveActivity 对象使用以下脚本。此脚本将从 Hive 类路径中删除不兼容的 jar。
delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};
以下是将 CSV 文件从 Amazon Simple Storage Service (Amazon S3) 导出到 DynamoDB 的管道定义示例。此示例管道将在私有子网中启动 Amazon EMR 资源,该子网比公有子网更安全。有关更多信息,请参阅在私有子网中配置 Amazon EMR 集群。此示例中的 CSV 文件包含以下数据:
- AnyCompany1,100
- AnyCompany2,20
- AnyCompany3,30
**注意:**在运行管道之前,DynamoDB 表必须已存在。请务必为“值”部分中列出的变量指定值。有关更多信息,请参阅将 myVariable 添加到管道定义。
{ "objects": [ { "name": "DefaultEmrCluster1", "id": "EmrClusterId_kvKJa", "releaseLabel": "emr-5.23.0", "type": "EmrCluster", "subnetId": "#{mySubnetId}", "emrManagedSlaveSecurityGroupId": "#{myCoreAndTaskSecurityGroup}", "emrManagedMasterSecurityGroupId": "#{myMasterSecurityGroup}", "serviceAccessSecurityGroupId": "#{myServiceAccessSecurityGroup}", "terminateAfter": "24 Hours" }, { "dataFormat": { "ref": "DynamoDBDataFormatId_YMozb" }, "name": "DefaultDataNode2", "id": "DataNodeId_WFWdO", "type": "DynamoDBDataNode", "tableName": "#{myDDBTableName}" }, { "directoryPath": "#{myInputS3Loc}", "dataFormat": { "ref": "DataFormatId_ciZN3" }, "name": "DefaultDataNode1", "id": "DataNodeId_OZ8Nz", "type": "S3DataNode" }, { "column": [ "company string", "id bigint" ], "name": "DefaultDynamoDBDataFormat1", "id": "DynamoDBDataFormatId_YMozb", "type": "DynamoDBDataFormat" }, { "column": [ "company string", "id bigint" ], "name": "DefaultDataFormat1", "id": "DataFormatId_ciZN3", "type": "CSV" }, { "output": { "ref": "DataNodeId_WFWdO" }, "input": { "ref": "DataNodeId_OZ8Nz" }, "stage": "true", "maximumRetries": "0", "name": "DefaultHiveActivity1", "hiveScript": "delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};", "id": "HiveActivityId_AwIZ9", "runsOn": { "ref": "EmrClusterId_kvKJa" }, "type": "HiveActivity" }, { "failureAndRerunMode": "CASCADE", "resourceRole": "DataPipelineDefaultResourceRole", "pipelineLogUri": "s3://awsdoc-example-bucket/dplogs/", "role": "DataPipelineDefaultRole", "scheduleType": "ONDEMAND", "name": "Default", "id": "Default" } ], "parameters": [ { "description": "Input S3 folder", "id": "myInputS3Loc", "type": "AWS::S3::ObjectKey" }, { "description": "Destination DynamoDB table name", "id": "myDDBTableName", "type": "String" } ], "values": { "myDDBTableName": "companyid", "myInputS3Loc": "s3://awsdoc-example-bucket1/csvddb/", "mySubnetId": "subnet_id", "myCoreAndTaskSecurityGroup": "core and task security group", "myMasterSecurityGroup": "master security group", "myServiceAccessSecurityGroup": "service access security group" } }
相关信息
AWS 官方已更新 2 年前
没有评论
相关内容
- AWS 官方已更新 2 年前
- AWS 官方已更新 1 年前
- AWS 官方已更新 9 个月前
- AWS 官方已更新 9 个月前