How to set document id when delivering data from Kinesis Data Firehose to Opensearch index


What I'm trying to do:

  1. I am using a kinesis data stream to ingest data from a python client sending in JSON's.
  2. I setup a kinesis firehose delivery stream with source as the kinesis-data-stream from the previous step and destination as an index on opensearch. I also use a lambda to transform the stream before delivering to opensearch.

Now i would like to set the document id for the record I'm transforming in the lambda . I tried setting the key on the transformed record object to be id but that creates a document attribute named id and sets a value to it. What i would like is to set the _id on the search doc.When i try to set the _id attribute directly by returning it within the transformed record back to firehose delivery stream it generated a destination error :

"message": "{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse field [_id] of type [_id] in document with id \\u002749627661727302149016659535428367209810930350656512327682.0\\u0027. Preview of field\\u0027s value: \\u00279b428743-b19a-4fb1-90f2-f132a423b2e8\\u0027\",\"caused_by\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.\"}}",
    "errorCode": "400",

Is there anyway to the doc _id for the documents loaded from firehose to opensearch or would i need to take a different approach by selecting the destination to be a http endpoint and using the rest API's provided by Opensearch (which would kinda be a hassle compared to directly being able to just set the _id attrib) ?

What I'm really trying to do is update the indexed documents on a change event. I understand that firehose uses the bulk api from Opensearch , but am unsure about how the upsert operation is handled internally by the kinesis destination connector to opensearch. Hence specifying a fixed id from another DB would be ideal for both insert and update ops to Opensearch in my case.It would be super useful to atleast be able to dictate the type of operation based on some attribute of the kinesis record to opensearch with a reference id for the doc to update.

2 Answers

Hi, have you figured out how Firehose handles upsert to OpenSearch? I also need this information for my use case

answered 2 years ago

Firehose Delivery Stream destinations are append-only and in the case of Opensearch, do not support upsert. Firehose will generate a unique ID for each record it streams and use that as the document ID. This cannot be user-configured at this time. If you are an AWS Enterprise Support customer, you can request this feature be added to Firehose by talking with your Solution Architect (SA) or Technical Account Manager (TAM).

One possible short-term solution is to use a Kinesis Stream and trigger a Lambda function to upsert documents to Opensearch using the Opensearch APIs. The Python client would push JSON data to the Kinesis Stream, and rather than having the Lambda function only perform transformations, it would trigger for records in the stream, perform the transformation, and handle upserting to Opensearch.

answered 2 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions