ValidationException error while writing into Timestream table

0

I've been trying to use a lambda function to write into a timestream db table, but I've been getting the below error.

THE ERROR

Error writing data to Timestream: An error occurred (ValidationException) when calling the WriteRecords operation: Invalid time for record.

And I've not been able to get the issue resolved. Please can someone help me with what could be the issue?

THE LAMBDA FUNCTION

import boto3
import json
import time
from datetime import datetime

# Initialize the S3 client
s3 = boto3.client('s3')

# Initialize the TimestreamWrite client
timestream_write = boto3.client('timestream-write')

# S3 bucket and key for storing the last readings
bucket_name = 'bcpbucket1'
key = 'lastReadings.json'

# Timestream database and table ARNs
database_arn = 'lambda_injected_db'
table_arn = 'lambda_injected_table'

# Function to read the last readings from S3
def read_last_readings():
    try:
        response = s3.get_object(Bucket=bucket_name, Key=key)
        data = response['Body'].read().decode('utf-8')
        return json.loads(data)
    except Exception as e:
        print("Error reading last readings from S3:", e)
        return {'lastX': None, 'lastY': None}

# Function to write the last readings to S3
def write_last_readings(lastX, lastY):
    try:
        s3.put_object(Bucket=bucket_name, Key=key, Body=json.dumps({'lastX': lastX, 'lastY': lastY}), ContentType='application/json')
    except Exception as e:
        print("Error writing last readings to S3:", e)

# Function to write data to Amazon Timestream
def write_to_timestream(x_axis, y_axis, total_velocity, device_id, timestamp):
    try:
        # Format the record
        record = {
            'Time': str(int(timestamp * 1000000)),  # Convert time to microseconds
            'Dimensions': [{'Name': 'device_id', 'Value': device_id}],
            'MeasureName': 'x_axis',
            'MeasureValue': str(x_axis),
            'MeasureValueType': 'DOUBLE',
            'TimeUnit': 'MILLISECONDS'
        }
        
        # Add y_axis as another measure value
        record['MeasureName'] = 'y_axis'
        record['MeasureValue'] = str(y_axis)
        
        # Write the record to Timestream
        timestream_write.write_records(DatabaseName=database_arn, TableName=table_arn, Records=[record])
    except Exception as e:
        print("Error writing data to Timestream:", e)

def lambda_handler(event, context):
    # Ensure that the "x_axis" and "y_axis" properties exist in the incoming data
    if 'x_axis' in event and 'y_axis' in event and 'device_id' in event:
        # Convert the current time to milliseconds since epoch
        current_time_ms = int(time.time() * 1000)

        # Calculate the total velocity (magnitude of the velocity vector)
        total_velocity = ((event['x_axis'] ** 2 + event['y_axis'] ** 2) ** 0.5) / 1  # Assuming time interval is 1 second

        # Write the last readings to S3
        write_last_readings(event['x_axis'], event['y_axis'])

        # Write the event data to Amazon Timestream
        write_to_timestream(event['x_axis'], event['y_axis'], total_velocity, event['device_id'], current_time_ms)
            
        # Return only the specified fields in the desired format
        return {
            'x_axis': event['x_axis'],
            'y_axis': event['y_axis'],
            'totalVelocity': total_velocity,
            'device_id': event['device_id'],
            'time': datetime.utcfromtimestamp(current_time_ms // 1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        }
    else:
        raise ValueError("Invalid data format. 'x_axis', 'y_axis', and 'device_id' properties are required.")

THE OUTPUT OF THE LAMBDA FUNCTION TO BE WRITTEN INTO TIMESTREAM

I get the below response together with the ValidationException error every time I run the lambda function

{ "x_axis": 5, "y_axis": 10, "totalVelocity": 11.180339887498949, "device_id": "sensor_1", "time": "2024-03-18 21:14:48.000" }

asked a month ago122 views
1 Answer
0

Looking at the code, the lamda_handler is getting the current time in milliseconds

current_time_ms = int(time.time() * 1000)

and passing that value into write_to_timestream()

write_to_timestream is then converting this to microseconds. I believe this calculation is incorrect.

'Time': str(int(timestamp * 1000000)), # Convert time to microseconds

As the input is already in milliseconds, this code should only multiply by 1000 to get microseconds. I would suggest changing it to

'Time': str(int(timestamp * 1000)), # Convert time to microseconds

AWS
Steve
answered a month ago
profile picture
EXPERT
reviewed a month ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions