By using AWS re:Post, you agree to the Terms of Use
/How to set document id when delivering data from Kinesis Data Firehose to Opensearch index/

How to set document id when delivering data from Kinesis Data Firehose to Opensearch index


What I'm trying to do: 1. I am using a kinesis data stream to ingest data from a python client sending in JSON's. 2. I setup a kinesis firehose delivery stream with source as the kinesis-data-stream from the previous step and destination as an index on opensearch. I also use a lambda to transform the stream before delivering to opensearch.

Now i would like to set the document id for the record I'm transforming in the lambda . I tried setting the key on the transformed record object to be id but that creates a document attribute named id and sets a value to it. What i would like is to set the _id on the search doc.When i try to set the _id attribute directly by returning it within the transformed record back to firehose delivery stream it generated a destination error :

"message": "{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse field [_id] of type [_id] in document with id \\u002749627661727302149016659535428367209810930350656512327682.0\\u0027. Preview of field\\u0027s value: \\u00279b428743-b19a-4fb1-90f2-f132a423b2e8\\u0027\",\"caused_by\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.\"}}",
    "errorCode": "400",

Is there anyway to the doc _id for the documents loaded from firehose to opensearch or would i need to take a different approach by selecting the destination to be a http endpoint and using the rest API's provided by Opensearch (which would kinda be a hassle compared to directly being able to just set the _id attrib) ?

What I'm really trying to do is update the indexed documents on a change event. I understand that firehose uses the bulk api from Opensearch , but am unsure about how the upsert operation is handled internally by the kinesis destination connector to opensearch. Hence specifying a fixed id from another DB would be ideal for both insert and update ops to Opensearch in my case.It would be super useful to atleast be able to dictate the type of operation based on some attribute of the kinesis record to opensearch with a reference id for the doc to update.

1 Answers

Hi, have you figured out how Firehose handles upsert to OpenSearch? I also need this information for my use case

answered 2 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions