Why Kinesis stream calls my Lambda function more than one time?

0

I am consuming Amazon Connect CTRs through Amazon Kinesis and inserting my data into Postgres. I am facing very unexpected behavior from Kinesis and Lambda function. Whenever a CTR record comes through kinesis, my lambda gets invoked and after inserting that record into Postgres, it again gets invoked and is very unexpected behavior. Here is my code, if anything wrong with the code please correct me:

import json
import boto3
import base64
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.extensions import AsIs
hostt = 'ec2-8585854236252958.compute-1.amazonaws.com'
username = 'sa'
passwordd = '212312asd@'
databasee = 'postgres'
def lambda_handler(event, context):
    print(event['Records'])
    print(event)
   for record in event['Records']:
         conn = psycopg2.connect(
        host = hostt,
        user = username,
        password = passwordd,
        database = databasee
        )
        cur = conn.cursor(cursor_factory = RealDictCursor)
        payload = base64.b64decode(record['kinesis']['data'])
        de_serialize_payload = json.loads(payload)
        print(len(de_serialize_payload))
        print(de_serialize_payload)
        try:
            for dsp in de_serialize_payload:
                if de_serialize_payload['Agent'] != None and de_serialize_payload['CustomerEndpoint'] != None and de_serialize_payload['Recording'] != None and de_serialize_payload['TransferredToEndpoint'] != None:
                    required_data = {
                        'arn' : de_serialize_payload['Agent']['ARN'],
                        'aftercontactworkduration' : de_serialize_payload['Agent']['AfterContactWorkDuration'],
                        'aftercontactworkendtimestamp' : de_serialize_payload['Agent']['AfterContactWorkEndTimestamp'],
                        'aftercontactworkstarttimestamp' : de_serialize_payload['Agent']['AfterContactWorkStartTimestamp'],
                        'agentconnectionattempts' : de_serialize_payload['AgentConnectionAttempts'],
                        'agentinteractionduration' : de_serialize_payload['Agent']['AgentInteractionDuration'],
                        'answeringmachinedetectionstatus' : de_serialize_payload['AnsweringMachineDetectionStatus'],
                        'channel' : de_serialize_payload['Channel'],
                        'connectedtoagenttimestamp' : de_serialize_payload['Agent']['ConnectedToAgentTimestamp'],
                        'connectedtosystemtimestamp' : de_serialize_payload['ConnectedToSystemTimestamp'],
                        'customerendpointaddress' : de_serialize_payload['CustomerEndpoint']['Address'],
                        'customerendpointtype' : de_serialize_payload['CustomerEndpoint']['Type'],
                        'customerholdduration' : de_serialize_payload['Agent']['CustomerHoldDuration'],
                        'dequeuetimestamp' : de_serialize_payload['Queue']['DequeueTimestamp'],
                        'disconnectreason' : de_serialize_payload['DisconnectReason'],
                        'disconnecttimestamp' : de_serialize_payload['DisconnectTimestamp'],
                        'queueduration' : de_serialize_payload['Queue']['Duration'],
                        'enqueuetimestamp' : de_serialize_payload['Queue']['EnqueueTimestamp'],
                        'hierarchygroups' : de_serialize_payload['Agent']['HierarchyGroups'],
                        'initialcontactid' : de_serialize_payload['InitialContactId'],
                        'initiationmethod' : de_serialize_payload['InitiationMethod'],
                        'initiationtimestamp' : de_serialize_payload['InitiationTimestamp'],
                        'instancearn' : de_serialize_payload['InstanceARN'],
                        'lastupdatetimestamp' : de_serialize_payload['LastUpdateTimestamp'],
                        'longestholdduration' : de_serialize_payload['Agent']['LongestHoldDuration'],
                        'nextcontactid' : de_serialize_payload['NextContactId'],
                        'numberofholds' : de_serialize_payload['Agent']['NumberOfHolds'],
                        'previouscontactid': de_serialize_payload['PreviousContactId'],
                        'queuearn' : de_serialize_payload['Queue']['ARN'],
                        'queuename' : de_serialize_payload['Queue']['Name'],
                        'recordingdeletionreason' : de_serialize_payload['Recording']['DeletionReason'],
                        'recordinglocation' : de_serialize_payload['Recording']['Location'],
                        'recordingstatus' : de_serialize_payload['Recording']['Status'],
                        'recordingtype' : de_serialize_payload['Recording']['Type'],
                        'routingprofilearn' : de_serialize_payload['Agent']['RoutingProfile']['ARN'],
                        'routingprofilename' : de_serialize_payload['Agent']['RoutingProfile']['Name'],
                        'scheduledtimestamp' : de_serialize_payload['ScheduledTimestamp'],
                        'systemendpointaddress' : de_serialize_payload['SystemEndpoint']['Address'],
                        'systemendpointtype' : de_serialize_payload['SystemEndpoint']['Type'],
                        'transfercompletedtimestamp' : de_serialize_payload['TransferCompletedTimestamp'],
                        'transferredtoendpoint' : de_serialize_payload['TransferredToEndpoint']['Address'],
                        'username' : de_serialize_payload['Agent']['Username'],
                        'voiceidresult' : de_serialize_payload['VoiceIdResult'],
                        'id' : de_serialize_payload['ContactId']
                        } 
                    columns = required_data.keys()
                    print(columns)
                    values = [required_data[column] for column in columns]
                    print(values)
                    insert_statement = "insert into public.ctr (%s) values %s;"
                    cur.execute(insert_statement, (AsIs(','.join(columns)), tuple(values)))
                    print(cur.mogrify(insert_statement, (AsIs(','.join(columns)), tuple(values))))
                    conn.commit()
                    count = cur.rowcount
                 except (Exception, psycopg2.Error) as error:
                      print("Failed to insert record into the table", error)

I would really appreciate if someone guides me and sort out this query.

2 回答
1
已接受的回答

Hi

Very important!!! You've included a password in your question - I'd strongly recommend that you rotate this.

In terms of why your code is called twice, I would check out CloudWatch logs to see if the first Lambda invocation is ending in success. If a Lambda fails, then it will be called again with the entire batch of records - the same payload. Can you change the insert statement to an upset to make the change idempotent?

Thanks

Nick

AWS
Nick
已回答 2 年前
  • Hi Nick, first of all thanks for password, but it is fake, and secondly where do you want to make changes in the code.

  • Glad the password is fake!

    I don't want you to make changes to your code (yet) - can you go into CloudWatch Logs and check the output from the function when it runs. The first step of the investigation is to find out if all invocations of the function end in success.

0

i see you have kinesis stream as a trigger, that invokes lambda, important point here to note is, amazon connect generated multiple event for a single call,

please refer this

已回答 9 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则