当我尝试使用默认的 AWS Data Pipeline 模板将 CSV 或 TSV 文件导入 Amazon DynamoDB 时,出现“MalformedJson”错误。
解决方法
**注意:**此解决方法适用于 Amazon EMR 4.7.0 及更高版本。
仅当首先使用将 DynamoDB 表导出到 S3 模板导出数据时,才能使用从 S3 导入 DynamoDB 备份数据模板。如果未使用将 DynamoDB 表导出到 S3 模板,则使用包含 HiveActivity 对象的 DynamoDBDataFormat 对象创建新管道。为 HiveActivity 对象使用以下脚本。此脚本将从 Hive 类路径中删除不兼容的 jar。
delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};
以下是将 CSV 文件从 Amazon Simple Storage Service (Amazon S3) 导出到 DynamoDB 的管道定义示例。此示例管道将在私有子网中启动 Amazon EMR 资源,该子网比公有子网更安全。有关更多信息,请参阅在私有子网中配置 Amazon EMR 集群。此示例中的 CSV 文件包含以下数据:
- AnyCompany1,100
- AnyCompany2,20
- AnyCompany3,30
**注意:**在运行管道之前,DynamoDB 表必须已存在。请务必为“值”部分中列出的变量指定值。有关更多信息,请参阅将 myVariable 添加到管道定义。
{
"objects": [
{
"name": "DefaultEmrCluster1",
"id": "EmrClusterId_kvKJa",
"releaseLabel": "emr-5.23.0",
"type": "EmrCluster",
"subnetId": "#{mySubnetId}",
"emrManagedSlaveSecurityGroupId": "#{myCoreAndTaskSecurityGroup}",
"emrManagedMasterSecurityGroupId": "#{myMasterSecurityGroup}",
"serviceAccessSecurityGroupId": "#{myServiceAccessSecurityGroup}",
"terminateAfter": "24 Hours"
},
{
"dataFormat": {
"ref": "DynamoDBDataFormatId_YMozb"
},
"name": "DefaultDataNode2",
"id": "DataNodeId_WFWdO",
"type": "DynamoDBDataNode",
"tableName": "#{myDDBTableName}"
},
{
"directoryPath": "#{myInputS3Loc}",
"dataFormat": {
"ref": "DataFormatId_ciZN3"
},
"name": "DefaultDataNode1",
"id": "DataNodeId_OZ8Nz",
"type": "S3DataNode"
},
{
"column": [
"company string",
"id bigint"
],
"name": "DefaultDynamoDBDataFormat1",
"id": "DynamoDBDataFormatId_YMozb",
"type": "DynamoDBDataFormat"
},
{
"column": [
"company string",
"id bigint"
],
"name": "DefaultDataFormat1",
"id": "DataFormatId_ciZN3",
"type": "CSV"
},
{
"output": {
"ref": "DataNodeId_WFWdO"
},
"input": {
"ref": "DataNodeId_OZ8Nz"
},
"stage": "true",
"maximumRetries": "0",
"name": "DefaultHiveActivity1",
"hiveScript": "delete jar /usr/lib/hive/lib/hive-contrib.jar ;\ndelete jar /mnt/taskRunner/emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/open-csv.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hadoop-goodies.jar ;\ndelete jar /mnt/taskRunner/oncluster-emr-hive-goodies.jar ;\ndelete jar /mnt/taskRunner/pipeline-serde.jar ;\nINSERT OVERWRITE TABLE ${output1} SELECT * FROM ${input1};",
"id": "HiveActivityId_AwIZ9",
"runsOn": {
"ref": "EmrClusterId_kvKJa"
},
"type": "HiveActivity"
},
{
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"pipelineLogUri": "s3://awsdoc-example-bucket/dplogs/",
"role": "DataPipelineDefaultRole",
"scheduleType": "ONDEMAND",
"name": "Default",
"id": "Default"
}
],
"parameters": [
{
"description": "Input S3 folder",
"id": "myInputS3Loc",
"type": "AWS::S3::ObjectKey"
},
{
"description": "Destination DynamoDB table name",
"id": "myDDBTableName",
"type": "String"
}
],
"values": {
"myDDBTableName": "companyid",
"myInputS3Loc": "s3://awsdoc-example-bucket1/csvddb/",
"mySubnetId": "subnet_id",
"myCoreAndTaskSecurityGroup": "core and task security group",
"myMasterSecurityGroup": "master security group",
"myServiceAccessSecurityGroup": "service access security group"
}
}
相关信息
将数据从 DynamoDB 导出到 Amazon S3