Websocket message sended twice by a lambda triggered by DynamoDB Stream and with a buffer offset
I send data to front-end client by APIGateway WebSocket with a Lambda in JS, triggered by a DynamoDB Stream : The JS Code to POST a message to the front-end is :
const params = {
ConnectionId: clientId,
Data: JSON.stringify(message) // message envoyé au client
};
await client.postToConnection(params).promise()
When I use this code in a Lambda which is used by some routes in APIGateway it works perfectly. When I use an other lambda which is activate by a dynamoDB Stream, the lambda is executed when I insert new data and the post event appears on CloudWatch for the lambda but don't appears on CloudWatch for the APIGateway Socket and of course on the front-end ??? If I insert again new different data in dynamoDB, CloudWatch shows the Post of the first inserted data and API Gateway send it twice ??? And again and again there is always an offset in the message reception in the front-end. I can't understand why ???
It looks as if your Lambda function execution environment freezes before actually sending the data. Can it be that you are not waiting on the right thing so it only gets into the Node.js event loop but only gets performed in the next invocation?
This is the code of my lambda triggerred by dynamoDB Stream :
'use strict';
var AWS = require("aws-sdk");
const ENDPOINT = 'https://8tfhywx7dk.execute-api.eu-west-2.amazonaws.com/dev/';
const DB_TABLE = 'ConnectionDB';
let client = new AWS.ApiGatewayManagementApi({ endpoint: ENDPOINT });
let dynamo = new AWS.DynamoDB.DocumentClient();
exports.handler = async (event, context, callback) => {
// on récupère les connexions clients
const connections = await getAllConnections();
event.Records.forEach((record) => {
// console.log('Stream enregistrement: ', record)
if (record.eventName == 'INSERT') {
var id = JSON.stringify(record.dynamodb.NewImage.id.S);
var nom = JSON.stringify(record.dynamodb.NewImage.nom.S);
const message = {
action: 'insert',
message: `{"id":${id},"nom":${nom}}`
}
for (var connectID of connections) {
sendMessage(connectID, message);
}
}
});
};
async function getAllConnections() {
const { Items } = await dynamo.scan({
TableName: DB_TABLE,
AttributesToGet: ['connectionID']
}).promise();
const connections = Items.map(({ connectionID }) => connectionID)
return connections;
}
async function sendMessage(clientId, message) {
const params = {
ConnectionId: clientId,
Data: JSON.stringify(message) // message envoyé au client
};
console.log('postToConnection: ', clientId, " message : ", message)
await client.postToConnection(params).promise()
console.log('Message émis')
}
Now if I add a new data in DynamoDB :
{ "id": { "S": "UC0005" }, "nom": { "S": "mon mac5" }}
1) The value is added in the DB.
2) CloudWatch for this lambda see the POST at the right hour without any error :
| 1646755297533 | 2022-03-08T16:01:37.533Z 99bd4d1d-a40f-4357-9979-317711de0d74 INFO postToConnection: OqyaRcjprPEAcvw= message : { action: 'insert', message: '{"id":"UC0005","nom":"mon mac5"}' }
| 1646755297595 | END RequestId: 99bd4d1d-a40f-4357-9979-317711de0d74
etc.
3) CloudWatch for the API Getway (/aws/apigateway/8tfhywx7dk/dev) DON'T show any log !!! 4) And of course the front-end don't received anything !
Now I add a new value:
{ "id": { "S": "UC0006" }, "nom": { "S": "mon mac6" }}
1) The value is added in the DB.
2) CloudWatch for this lambda see the POST at the right hour :
2022-03-08T16:04:41.894Z a43ee8ba-ecad-4d16-956d-496f89e8b41e INFO postToConnection: OqyaRcjprPEAcvw= message : { action: 'insert', message: '{"id":"UC0006","nom":"mon mac6"}' }
END RequestId: a43ee8ba-ecad-4d16-956d-496f89e8b41e
3) CloudWatch for the API Getway (/aws/apigateway/xxxxxxxxx/dev) shows now a new log :
...
| 1646755480663 | (b6478f03-36ad-4bb3-b27b-19ea70ab7254) WebSocket Request Route: [POST /@connections/{connectionId}]
| 1646755480663 | (b6478f03-36ad-4bb3-b27b-19ea70ab7254) Method request path: {connectionId={connectionId}}
(b6478f03-36ad-4bb3-b27b-19ea70ab7254) Method request body before transformations: {
"action": "insert",
"message": "{\"id\":\"UC0005\",\"nom\":\"mon mac5\"}"
}
etc...
4) And of course the front-end (I used https://www.piesocket.com/) received the old value ! But two times !
- RECEIVED: {"action":"insert","message":"{\"id\":\"UC0005\",\"nom\":\"mon mac5\"}"}
- RECEIVED: {"action":"insert","message":"{\"id\":\"UC0005\",\"nom\":\"mon mac5\"}"}
I AM LOST !!!
Relevant questions
On an apigateway websocket is the status returned from the lambda handler?
Accepted Answerasked 2 years agoWebsocket: display users online, how to send message when clien disconnect
asked 3 years agoApiGateway Websocket API: async python Lambda function
asked 2 years agoIs websocket api is protected by AWS WAF ?
asked 2 years agoWebsocket message sended twice by a lambda triggered by DynamoDB Stream and with a buffer offset
asked 3 months agoSetting up a $connect route that requires a WebSocket subprotocol using a mock integration
asked 20 days agoDynamoDB Stream filters comparing values in NewImage and OldImage
Accepted Answerasked 3 months agoHow to have a websocket connection to lambda in a green grass core
asked 3 years agoLambda Alias with DynamoDB Streams
Accepted Answerasked 6 years agoReturn Value from Lambda function triggered by SQS to individual client
Accepted Answerasked 4 months ago