I've been trying to use a lambda function to write into a timestream db table, but I've been getting the below error.
THE ERROR
Error writing data to Timestream: An error occurred (ValidationException) when calling the WriteRecords operation: Invalid time for record.
And I've not been able to get the issue resolved. Please can someone help me with what could be the issue?
THE LAMBDA FUNCTION
import boto3
import json
import time
from datetime import datetime
# Initialize the S3 client
s3 = boto3.client('s3')
# Initialize the TimestreamWrite client
timestream_write = boto3.client('timestream-write')
# S3 bucket and key for storing the last readings
bucket_name = 'bcpbucket1'
key = 'lastReadings.json'
# Timestream database and table ARNs
database_arn = 'lambda_injected_db'
table_arn = 'lambda_injected_table'
# Function to read the last readings from S3
def read_last_readings():
try:
response = s3.get_object(Bucket=bucket_name, Key=key)
data = response['Body'].read().decode('utf-8')
return json.loads(data)
except Exception as e:
print("Error reading last readings from S3:", e)
return {'lastX': None, 'lastY': None}
# Function to write the last readings to S3
def write_last_readings(lastX, lastY):
try:
s3.put_object(Bucket=bucket_name, Key=key, Body=json.dumps({'lastX': lastX, 'lastY': lastY}), ContentType='application/json')
except Exception as e:
print("Error writing last readings to S3:", e)
# Function to write data to Amazon Timestream
def write_to_timestream(x_axis, y_axis, total_velocity, device_id, timestamp):
try:
# Format the record
record = {
'Time': str(int(timestamp * 1000000)), # Convert time to microseconds
'Dimensions': [{'Name': 'device_id', 'Value': device_id}],
'MeasureName': 'x_axis',
'MeasureValue': str(x_axis),
'MeasureValueType': 'DOUBLE',
'TimeUnit': 'MILLISECONDS'
}
# Add y_axis as another measure value
record['MeasureName'] = 'y_axis'
record['MeasureValue'] = str(y_axis)
# Write the record to Timestream
timestream_write.write_records(DatabaseName=database_arn, TableName=table_arn, Records=[record])
except Exception as e:
print("Error writing data to Timestream:", e)
def lambda_handler(event, context):
# Ensure that the "x_axis" and "y_axis" properties exist in the incoming data
if 'x_axis' in event and 'y_axis' in event and 'device_id' in event:
# Convert the current time to milliseconds since epoch
current_time_ms = int(time.time() * 1000)
# Calculate the total velocity (magnitude of the velocity vector)
total_velocity = ((event['x_axis'] ** 2 + event['y_axis'] ** 2) ** 0.5) / 1 # Assuming time interval is 1 second
# Write the last readings to S3
write_last_readings(event['x_axis'], event['y_axis'])
# Write the event data to Amazon Timestream
write_to_timestream(event['x_axis'], event['y_axis'], total_velocity, event['device_id'], current_time_ms)
# Return only the specified fields in the desired format
return {
'x_axis': event['x_axis'],
'y_axis': event['y_axis'],
'totalVelocity': total_velocity,
'device_id': event['device_id'],
'time': datetime.utcfromtimestamp(current_time_ms // 1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
}
else:
raise ValueError("Invalid data format. 'x_axis', 'y_axis', and 'device_id' properties are required.")
THE OUTPUT OF THE LAMBDA FUNCTION TO BE WRITTEN INTO TIMESTREAM
I get the below response together with the ValidationException error every time I run the lambda function
{ "x_axis": 5, "y_axis": 10, "totalVelocity": 11.180339887498949, "device_id": "sensor_1", "time": "2024-03-18 21:14:48.000" }