Why Kinesis stream calls my Lambda function more than one time?
I am consuming Amazon Connect CTRs through Amazon Kinesis and inserting my data into Postgres. I am facing very unexpected behavior from Kinesis and Lambda function. Whenever a CTR record comes through kinesis, my lambda gets invoked and after inserting that record into Postgres, it again gets invoked and is very unexpected behavior. Here is my code, if anything wrong with the code please correct me:
import json
import boto3
import base64
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.extensions import AsIs
hostt = 'ec2-8585854236252958.compute-1.amazonaws.com'
username = 'sa'
passwordd = '212312asd@'
databasee = 'postgres'
def lambda_handler(event, context):
print(event['Records'])
print(event)
for record in event['Records']:
conn = psycopg2.connect(
host = hostt,
user = username,
password = passwordd,
database = databasee
)
cur = conn.cursor(cursor_factory = RealDictCursor)
payload = base64.b64decode(record['kinesis']['data'])
de_serialize_payload = json.loads(payload)
print(len(de_serialize_payload))
print(de_serialize_payload)
try:
for dsp in de_serialize_payload:
if de_serialize_payload['Agent'] != None and de_serialize_payload['CustomerEndpoint'] != None and de_serialize_payload['Recording'] != None and de_serialize_payload['TransferredToEndpoint'] != None:
required_data = {
'arn' : de_serialize_payload['Agent']['ARN'],
'aftercontactworkduration' : de_serialize_payload['Agent']['AfterContactWorkDuration'],
'aftercontactworkendtimestamp' : de_serialize_payload['Agent']['AfterContactWorkEndTimestamp'],
'aftercontactworkstarttimestamp' : de_serialize_payload['Agent']['AfterContactWorkStartTimestamp'],
'agentconnectionattempts' : de_serialize_payload['AgentConnectionAttempts'],
'agentinteractionduration' : de_serialize_payload['Agent']['AgentInteractionDuration'],
'answeringmachinedetectionstatus' : de_serialize_payload['AnsweringMachineDetectionStatus'],
'channel' : de_serialize_payload['Channel'],
'connectedtoagenttimestamp' : de_serialize_payload['Agent']['ConnectedToAgentTimestamp'],
'connectedtosystemtimestamp' : de_serialize_payload['ConnectedToSystemTimestamp'],
'customerendpointaddress' : de_serialize_payload['CustomerEndpoint']['Address'],
'customerendpointtype' : de_serialize_payload['CustomerEndpoint']['Type'],
'customerholdduration' : de_serialize_payload['Agent']['CustomerHoldDuration'],
'dequeuetimestamp' : de_serialize_payload['Queue']['DequeueTimestamp'],
'disconnectreason' : de_serialize_payload['DisconnectReason'],
'disconnecttimestamp' : de_serialize_payload['DisconnectTimestamp'],
'queueduration' : de_serialize_payload['Queue']['Duration'],
'enqueuetimestamp' : de_serialize_payload['Queue']['EnqueueTimestamp'],
'hierarchygroups' : de_serialize_payload['Agent']['HierarchyGroups'],
'initialcontactid' : de_serialize_payload['InitialContactId'],
'initiationmethod' : de_serialize_payload['InitiationMethod'],
'initiationtimestamp' : de_serialize_payload['InitiationTimestamp'],
'instancearn' : de_serialize_payload['InstanceARN'],
'lastupdatetimestamp' : de_serialize_payload['LastUpdateTimestamp'],
'longestholdduration' : de_serialize_payload['Agent']['LongestHoldDuration'],
'nextcontactid' : de_serialize_payload['NextContactId'],
'numberofholds' : de_serialize_payload['Agent']['NumberOfHolds'],
'previouscontactid': de_serialize_payload['PreviousContactId'],
'queuearn' : de_serialize_payload['Queue']['ARN'],
'queuename' : de_serialize_payload['Queue']['Name'],
'recordingdeletionreason' : de_serialize_payload['Recording']['DeletionReason'],
'recordinglocation' : de_serialize_payload['Recording']['Location'],
'recordingstatus' : de_serialize_payload['Recording']['Status'],
'recordingtype' : de_serialize_payload['Recording']['Type'],
'routingprofilearn' : de_serialize_payload['Agent']['RoutingProfile']['ARN'],
'routingprofilename' : de_serialize_payload['Agent']['RoutingProfile']['Name'],
'scheduledtimestamp' : de_serialize_payload['ScheduledTimestamp'],
'systemendpointaddress' : de_serialize_payload['SystemEndpoint']['Address'],
'systemendpointtype' : de_serialize_payload['SystemEndpoint']['Type'],
'transfercompletedtimestamp' : de_serialize_payload['TransferCompletedTimestamp'],
'transferredtoendpoint' : de_serialize_payload['TransferredToEndpoint']['Address'],
'username' : de_serialize_payload['Agent']['Username'],
'voiceidresult' : de_serialize_payload['VoiceIdResult'],
'id' : de_serialize_payload['ContactId']
}
columns = required_data.keys()
print(columns)
values = [required_data[column] for column in columns]
print(values)
insert_statement = "insert into public.ctr (%s) values %s;"
cur.execute(insert_statement, (AsIs(','.join(columns)), tuple(values)))
print(cur.mogrify(insert_statement, (AsIs(','.join(columns)), tuple(values))))
conn.commit()
count = cur.rowcount
except (Exception, psycopg2.Error) as error:
print("Failed to insert record into the table", error)
I would really appreciate if someone guides me and sort out this query.
- 最新
- 投票最多
- 评论最多
Hi
Very important!!! You've included a password in your question - I'd strongly recommend that you rotate this.
In terms of why your code is called twice, I would check out CloudWatch logs to see if the first Lambda invocation is ending in success. If a Lambda fails, then it will be called again with the entire batch of records - the same payload. Can you change the insert statement to an upset to make the change idempotent?
Thanks
Nick
i see you have kinesis stream as a trigger, that invokes lambda, important point here to note is, amazon connect generated multiple event for a single call,
相关内容
- AWS 官方已更新 1 年前
- AWS 官方已更新 2 年前
- AWS 官方已更新 2 个月前
- AWS 官方已更新 1 年前
Hi Nick, first of all thanks for password, but it is fake, and secondly where do you want to make changes in the code.
Glad the password is fake!
I don't want you to make changes to your code (yet) - can you go into CloudWatch Logs and check the output from the function when it runs. The first step of the investigation is to find out if all invocations of the function end in success.