DynamoDB Opensearch Zero ETL pipeline cannot stream delete operation

0

Hello, i created a zero etl pipeline between a dynamodb table and opensearch index. My table configured like this

this.videoTable = new Table(this, 'Videos', {
			tableName: 'Videos',
			removalPolicy: RemovalPolicy.DESTROY,
			billingMode: BillingMode.PAY_PER_REQUEST,
			partitionKey: { name: 'PK', type: AttributeType.STRING },
			sortKey: { name: 'SK', type: AttributeType.STRING },
			stream: StreamViewType.NEW_AND_OLD_IMAGES, // Enable DynamoDB Streams
			pointInTimeRecovery: true
		})

and my pipeline configuration is like this.

const pipelineConfig = {
			version: '2',
			'dynamodb-pipeline': {
				source: {
					dynamodb: {
						acknowledgments: true,
						tables: [
							{
								table_arn: this.videoTable.tableArn,
								stream: {
									start_position: 'LATEST'
								},
								export: {
									s3_bucket: bucket.bucketName,
									s3_region: this.region,
								}
							}
						],
						aws: {
							sts_role_arn: pipelineRole.roleArn,
							region: this.region
						}
					}
				},
				route: [
					{
						videos_route: '/entityType == "Video"'
					}
				],
				sink: [
					{
						opensearch: {
							hosts: [`https://${this.eventDomain.domainEndpoint}`],
							index: 'videos',
							routes: ['videos_route'],
							index_type: 'custom',
							document_id: '${getMetadata("primary_key")}',
							action: '${getMetadata("opensearch_action")}',
							document_version: '${getMetadata("document_version")}',
							document_version_type: 'external',
							aws: {
								sts_role_arn: pipelineRole.roleArn,
								region: this.region,
							},
							dlq: {
								s3: {
									bucket: dlqBucket.bucketName,
									region: this.region,
									sts_role_arn: pipelineRole.roleArn,
									key_path_prefix: "dynamodb-pipeline-dlq/"
								}
							}
						}
					}
				]
			}
		};

But when i removed a video from dynamo it's not removed from opensearch. When i added one more sink as S3 like this

- s3:
        aws:
          sts_role_arn: 'role_arn'
          # Provide the region of the domain.
          region: eu-west-1
        # Replace with the bucket to send the logs to
        bucket: 'bucket_name'
        threshold:
          event_collect_timeout: 60s
        codec:
          ndjson:

I see that when an item is removed from dynamo it is saved to s3 as an empty object like this {}. What am i doing wrong here couldn't find specific documentation or question, any help is appreciated. Thank you in advance.

  • Hi, may I suggest that your format your post with code blocks i.e. </> to make your config more readable and to allow us to better understand your problem.? Thanks

  • hello, oh sorry didn't see that it posted like that now i formatted it

asked 3 months ago245 views
No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions