I am consuming Amazon Connect CTRs through Amazon Kinesis and inserting my data into Postgres. I am facing very unexpected behavior from Kinesis and Lambda function. Whenever a CTR record comes through kinesis, my lambda gets invoked and after inserting that record into Postgres, it again gets invoked and is very unexpected behavior. Here is my code, if anything wrong with the code please correct me:
import json
import boto3
import base64
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.extensions import AsIs
hostt = 'ec2-8585854236252958.compute-1.amazonaws.com'
username = 'sa'
passwordd = '212312asd@'
databasee = 'postgres'
def lambda_handler(event, context):
print(event['Records'])
print(event)
for record in event['Records']:
conn = psycopg2.connect(
host = hostt,
user = username,
password = passwordd,
database = databasee
)
cur = conn.cursor(cursor_factory = RealDictCursor)
payload = base64.b64decode(record['kinesis']['data'])
de_serialize_payload = json.loads(payload)
print(len(de_serialize_payload))
print(de_serialize_payload)
try:
for dsp in de_serialize_payload:
if de_serialize_payload['Agent'] != None and de_serialize_payload['CustomerEndpoint'] != None and de_serialize_payload['Recording'] != None and de_serialize_payload['TransferredToEndpoint'] != None:
required_data = {
'arn' : de_serialize_payload['Agent']['ARN'],
'aftercontactworkduration' : de_serialize_payload['Agent']['AfterContactWorkDuration'],
'aftercontactworkendtimestamp' : de_serialize_payload['Agent']['AfterContactWorkEndTimestamp'],
'aftercontactworkstarttimestamp' : de_serialize_payload['Agent']['AfterContactWorkStartTimestamp'],
'agentconnectionattempts' : de_serialize_payload['AgentConnectionAttempts'],
'agentinteractionduration' : de_serialize_payload['Agent']['AgentInteractionDuration'],
'answeringmachinedetectionstatus' : de_serialize_payload['AnsweringMachineDetectionStatus'],
'channel' : de_serialize_payload['Channel'],
'connectedtoagenttimestamp' : de_serialize_payload['Agent']['ConnectedToAgentTimestamp'],
'connectedtosystemtimestamp' : de_serialize_payload['ConnectedToSystemTimestamp'],
'customerendpointaddress' : de_serialize_payload['CustomerEndpoint']['Address'],
'customerendpointtype' : de_serialize_payload['CustomerEndpoint']['Type'],
'customerholdduration' : de_serialize_payload['Agent']['CustomerHoldDuration'],
'dequeuetimestamp' : de_serialize_payload['Queue']['DequeueTimestamp'],
'disconnectreason' : de_serialize_payload['DisconnectReason'],
'disconnecttimestamp' : de_serialize_payload['DisconnectTimestamp'],
'queueduration' : de_serialize_payload['Queue']['Duration'],
'enqueuetimestamp' : de_serialize_payload['Queue']['EnqueueTimestamp'],
'hierarchygroups' : de_serialize_payload['Agent']['HierarchyGroups'],
'initialcontactid' : de_serialize_payload['InitialContactId'],
'initiationmethod' : de_serialize_payload['InitiationMethod'],
'initiationtimestamp' : de_serialize_payload['InitiationTimestamp'],
'instancearn' : de_serialize_payload['InstanceARN'],
'lastupdatetimestamp' : de_serialize_payload['LastUpdateTimestamp'],
'longestholdduration' : de_serialize_payload['Agent']['LongestHoldDuration'],
'nextcontactid' : de_serialize_payload['NextContactId'],
'numberofholds' : de_serialize_payload['Agent']['NumberOfHolds'],
'previouscontactid': de_serialize_payload['PreviousContactId'],
'queuearn' : de_serialize_payload['Queue']['ARN'],
'queuename' : de_serialize_payload['Queue']['Name'],
'recordingdeletionreason' : de_serialize_payload['Recording']['DeletionReason'],
'recordinglocation' : de_serialize_payload['Recording']['Location'],
'recordingstatus' : de_serialize_payload['Recording']['Status'],
'recordingtype' : de_serialize_payload['Recording']['Type'],
'routingprofilearn' : de_serialize_payload['Agent']['RoutingProfile']['ARN'],
'routingprofilename' : de_serialize_payload['Agent']['RoutingProfile']['Name'],
'scheduledtimestamp' : de_serialize_payload['ScheduledTimestamp'],
'systemendpointaddress' : de_serialize_payload['SystemEndpoint']['Address'],
'systemendpointtype' : de_serialize_payload['SystemEndpoint']['Type'],
'transfercompletedtimestamp' : de_serialize_payload['TransferCompletedTimestamp'],
'transferredtoendpoint' : de_serialize_payload['TransferredToEndpoint']['Address'],
'username' : de_serialize_payload['Agent']['Username'],
'voiceidresult' : de_serialize_payload['VoiceIdResult'],
'id' : de_serialize_payload['ContactId']
}
columns = required_data.keys()
print(columns)
values = [required_data[column] for column in columns]
print(values)
insert_statement = "insert into public.ctr (%s) values %s;"
cur.execute(insert_statement, (AsIs(','.join(columns)), tuple(values)))
print(cur.mogrify(insert_statement, (AsIs(','.join(columns)), tuple(values))))
conn.commit()
count = cur.rowcount
except (Exception, psycopg2.Error) as error:
print("Failed to insert record into the table", error)
I would really appreciate if someone guides me and sort out this query.
Hi Nick, first of all thanks for password, but it is fake, and secondly where do you want to make changes in the code.
Glad the password is fake!
I don't want you to make changes to your code (yet) - can you go into CloudWatch Logs and check the output from the function when it runs. The first step of the investigation is to find out if all invocations of the function end in success.