Why Kinesis stream calls my Lambda function more than one time?

0

I am consuming Amazon Connect CTRs through Amazon Kinesis and inserting my data into Postgres. I am facing very unexpected behavior from Kinesis and Lambda function. Whenever a CTR record comes through kinesis, my lambda gets invoked and after inserting that record into Postgres, it again gets invoked and is very unexpected behavior. Here is my code, if anything wrong with the code please correct me:

import json
import boto3
import base64
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.extensions import AsIs
hostt = 'ec2-8585854236252958.compute-1.amazonaws.com'
username = 'sa'
passwordd = '212312asd@'
databasee = 'postgres'
def lambda_handler(event, context):
    print(event['Records'])
    print(event)
   for record in event['Records']:
         conn = psycopg2.connect(
        host = hostt,
        user = username,
        password = passwordd,
        database = databasee
        )
        cur = conn.cursor(cursor_factory = RealDictCursor)
        payload = base64.b64decode(record['kinesis']['data'])
        de_serialize_payload = json.loads(payload)
        print(len(de_serialize_payload))
        print(de_serialize_payload)
        try:
            for dsp in de_serialize_payload:
                if de_serialize_payload['Agent'] != None and de_serialize_payload['CustomerEndpoint'] != None and de_serialize_payload['Recording'] != None and de_serialize_payload['TransferredToEndpoint'] != None:
                    required_data = {
                        'arn' : de_serialize_payload['Agent']['ARN'],
                        'aftercontactworkduration' : de_serialize_payload['Agent']['AfterContactWorkDuration'],
                        'aftercontactworkendtimestamp' : de_serialize_payload['Agent']['AfterContactWorkEndTimestamp'],
                        'aftercontactworkstarttimestamp' : de_serialize_payload['Agent']['AfterContactWorkStartTimestamp'],
                        'agentconnectionattempts' : de_serialize_payload['AgentConnectionAttempts'],
                        'agentinteractionduration' : de_serialize_payload['Agent']['AgentInteractionDuration'],
                        'answeringmachinedetectionstatus' : de_serialize_payload['AnsweringMachineDetectionStatus'],
                        'channel' : de_serialize_payload['Channel'],
                        'connectedtoagenttimestamp' : de_serialize_payload['Agent']['ConnectedToAgentTimestamp'],
                        'connectedtosystemtimestamp' : de_serialize_payload['ConnectedToSystemTimestamp'],
                        'customerendpointaddress' : de_serialize_payload['CustomerEndpoint']['Address'],
                        'customerendpointtype' : de_serialize_payload['CustomerEndpoint']['Type'],
                        'customerholdduration' : de_serialize_payload['Agent']['CustomerHoldDuration'],
                        'dequeuetimestamp' : de_serialize_payload['Queue']['DequeueTimestamp'],
                        'disconnectreason' : de_serialize_payload['DisconnectReason'],
                        'disconnecttimestamp' : de_serialize_payload['DisconnectTimestamp'],
                        'queueduration' : de_serialize_payload['Queue']['Duration'],
                        'enqueuetimestamp' : de_serialize_payload['Queue']['EnqueueTimestamp'],
                        'hierarchygroups' : de_serialize_payload['Agent']['HierarchyGroups'],
                        'initialcontactid' : de_serialize_payload['InitialContactId'],
                        'initiationmethod' : de_serialize_payload['InitiationMethod'],
                        'initiationtimestamp' : de_serialize_payload['InitiationTimestamp'],
                        'instancearn' : de_serialize_payload['InstanceARN'],
                        'lastupdatetimestamp' : de_serialize_payload['LastUpdateTimestamp'],
                        'longestholdduration' : de_serialize_payload['Agent']['LongestHoldDuration'],
                        'nextcontactid' : de_serialize_payload['NextContactId'],
                        'numberofholds' : de_serialize_payload['Agent']['NumberOfHolds'],
                        'previouscontactid': de_serialize_payload['PreviousContactId'],
                        'queuearn' : de_serialize_payload['Queue']['ARN'],
                        'queuename' : de_serialize_payload['Queue']['Name'],
                        'recordingdeletionreason' : de_serialize_payload['Recording']['DeletionReason'],
                        'recordinglocation' : de_serialize_payload['Recording']['Location'],
                        'recordingstatus' : de_serialize_payload['Recording']['Status'],
                        'recordingtype' : de_serialize_payload['Recording']['Type'],
                        'routingprofilearn' : de_serialize_payload['Agent']['RoutingProfile']['ARN'],
                        'routingprofilename' : de_serialize_payload['Agent']['RoutingProfile']['Name'],
                        'scheduledtimestamp' : de_serialize_payload['ScheduledTimestamp'],
                        'systemendpointaddress' : de_serialize_payload['SystemEndpoint']['Address'],
                        'systemendpointtype' : de_serialize_payload['SystemEndpoint']['Type'],
                        'transfercompletedtimestamp' : de_serialize_payload['TransferCompletedTimestamp'],
                        'transferredtoendpoint' : de_serialize_payload['TransferredToEndpoint']['Address'],
                        'username' : de_serialize_payload['Agent']['Username'],
                        'voiceidresult' : de_serialize_payload['VoiceIdResult'],
                        'id' : de_serialize_payload['ContactId']
                        } 
                    columns = required_data.keys()
                    print(columns)
                    values = [required_data[column] for column in columns]
                    print(values)
                    insert_statement = "insert into public.ctr (%s) values %s;"
                    cur.execute(insert_statement, (AsIs(','.join(columns)), tuple(values)))
                    print(cur.mogrify(insert_statement, (AsIs(','.join(columns)), tuple(values))))
                    conn.commit()
                    count = cur.rowcount
                 except (Exception, psycopg2.Error) as error:
                      print("Failed to insert record into the table", error)

I would really appreciate if someone guides me and sort out this query.

2 Answers
1
Accepted Answer

Hi

Very important!!! You've included a password in your question - I'd strongly recommend that you rotate this.

In terms of why your code is called twice, I would check out CloudWatch logs to see if the first Lambda invocation is ending in success. If a Lambda fails, then it will be called again with the entire batch of records - the same payload. Can you change the insert statement to an upset to make the change idempotent?

Thanks

Nick

AWS
Nick
answered 2 years ago
  • Hi Nick, first of all thanks for password, but it is fake, and secondly where do you want to make changes in the code.

  • Glad the password is fake!

    I don't want you to make changes to your code (yet) - can you go into CloudWatch Logs and check the output from the function when it runs. The first step of the investigation is to find out if all invocations of the function end in success.

0

i see you have kinesis stream as a trigger, that invokes lambda, important point here to note is, amazon connect generated multiple event for a single call,

please refer this

answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions