DynamoDB Opensearch Zero ETL pipeline cannot stream delete operation

0

Hello, i created a zero etl pipeline between a dynamodb table and opensearch index. My table configured like this

this.videoTable = new Table(this, 'Videos', {
			tableName: 'Videos',
			removalPolicy: RemovalPolicy.DESTROY,
			billingMode: BillingMode.PAY_PER_REQUEST,
			partitionKey: { name: 'PK', type: AttributeType.STRING },
			sortKey: { name: 'SK', type: AttributeType.STRING },
			stream: StreamViewType.NEW_AND_OLD_IMAGES, // Enable DynamoDB Streams
			pointInTimeRecovery: true
		})

and my pipeline configuration is like this.

const pipelineConfig = {
			version: '2',
			'dynamodb-pipeline': {
				source: {
					dynamodb: {
						acknowledgments: true,
						tables: [
							{
								table_arn: this.videoTable.tableArn,
								stream: {
									start_position: 'LATEST'
								},
								export: {
									s3_bucket: bucket.bucketName,
									s3_region: this.region,
								}
							}
						],
						aws: {
							sts_role_arn: pipelineRole.roleArn,
							region: this.region
						}
					}
				},
				route: [
					{
						videos_route: '/entityType == "Video"'
					}
				],
				sink: [
					{
						opensearch: {
							hosts: [`https://${this.eventDomain.domainEndpoint}`],
							index: 'videos',
							routes: ['videos_route'],
							index_type: 'custom',
							document_id: '${getMetadata("primary_key")}',
							action: '${getMetadata("opensearch_action")}',
							document_version: '${getMetadata("document_version")}',
							document_version_type: 'external',
							aws: {
								sts_role_arn: pipelineRole.roleArn,
								region: this.region,
							},
							dlq: {
								s3: {
									bucket: dlqBucket.bucketName,
									region: this.region,
									sts_role_arn: pipelineRole.roleArn,
									key_path_prefix: "dynamodb-pipeline-dlq/"
								}
							}
						}
					}
				]
			}
		};

But when i removed a video from dynamo it's not removed from opensearch. When i added one more sink as S3 like this

- s3:
        aws:
          sts_role_arn: 'role_arn'
          # Provide the region of the domain.
          region: eu-west-1
        # Replace with the bucket to send the logs to
        bucket: 'bucket_name'
        threshold:
          event_collect_timeout: 60s
        codec:
          ndjson:

I see that when an item is removed from dynamo it is saved to s3 as an empty object like this {}. What am i doing wrong here couldn't find specific documentation or question, any help is appreciated. Thank you in advance.

  • Hi, may I suggest that your format your post with code blocks i.e. </> to make your config more readable and to allow us to better understand your problem.? Thanks

  • hello, oh sorry didn't see that it posted like that now i formatted it

gefragt vor 4 Monaten250 Aufrufe
Keine Antworten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen