ValidationException error while writing into Timestream table

0

I've been trying to use a lambda function to write into a timestream db table, but I've been getting the below error.

THE ERROR

Error writing data to Timestream: An error occurred (ValidationException) when calling the WriteRecords operation: Invalid time for record.

And I've not been able to get the issue resolved. Please can someone help me with what could be the issue?

THE LAMBDA FUNCTION

import boto3
import json
import time
from datetime import datetime

# Initialize the S3 client
s3 = boto3.client('s3')

# Initialize the TimestreamWrite client
timestream_write = boto3.client('timestream-write')

# S3 bucket and key for storing the last readings
bucket_name = 'bcpbucket1'
key = 'lastReadings.json'

# Timestream database and table ARNs
database_arn = 'lambda_injected_db'
table_arn = 'lambda_injected_table'

# Function to read the last readings from S3
def read_last_readings():
    try:
        response = s3.get_object(Bucket=bucket_name, Key=key)
        data = response['Body'].read().decode('utf-8')
        return json.loads(data)
    except Exception as e:
        print("Error reading last readings from S3:", e)
        return {'lastX': None, 'lastY': None}

# Function to write the last readings to S3
def write_last_readings(lastX, lastY):
    try:
        s3.put_object(Bucket=bucket_name, Key=key, Body=json.dumps({'lastX': lastX, 'lastY': lastY}), ContentType='application/json')
    except Exception as e:
        print("Error writing last readings to S3:", e)

# Function to write data to Amazon Timestream
def write_to_timestream(x_axis, y_axis, total_velocity, device_id, timestamp):
    try:
        # Format the record
        record = {
            'Time': str(int(timestamp * 1000000)),  # Convert time to microseconds
            'Dimensions': [{'Name': 'device_id', 'Value': device_id}],
            'MeasureName': 'x_axis',
            'MeasureValue': str(x_axis),
            'MeasureValueType': 'DOUBLE',
            'TimeUnit': 'MILLISECONDS'
        }
        
        # Add y_axis as another measure value
        record['MeasureName'] = 'y_axis'
        record['MeasureValue'] = str(y_axis)
        
        # Write the record to Timestream
        timestream_write.write_records(DatabaseName=database_arn, TableName=table_arn, Records=[record])
    except Exception as e:
        print("Error writing data to Timestream:", e)

def lambda_handler(event, context):
    # Ensure that the "x_axis" and "y_axis" properties exist in the incoming data
    if 'x_axis' in event and 'y_axis' in event and 'device_id' in event:
        # Convert the current time to milliseconds since epoch
        current_time_ms = int(time.time() * 1000)

        # Calculate the total velocity (magnitude of the velocity vector)
        total_velocity = ((event['x_axis'] ** 2 + event['y_axis'] ** 2) ** 0.5) / 1  # Assuming time interval is 1 second

        # Write the last readings to S3
        write_last_readings(event['x_axis'], event['y_axis'])

        # Write the event data to Amazon Timestream
        write_to_timestream(event['x_axis'], event['y_axis'], total_velocity, event['device_id'], current_time_ms)
            
        # Return only the specified fields in the desired format
        return {
            'x_axis': event['x_axis'],
            'y_axis': event['y_axis'],
            'totalVelocity': total_velocity,
            'device_id': event['device_id'],
            'time': datetime.utcfromtimestamp(current_time_ms // 1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        }
    else:
        raise ValueError("Invalid data format. 'x_axis', 'y_axis', and 'device_id' properties are required.")

THE OUTPUT OF THE LAMBDA FUNCTION TO BE WRITTEN INTO TIMESTREAM

I get the below response together with the ValidationException error every time I run the lambda function

{ "x_axis": 5, "y_axis": 10, "totalVelocity": 11.180339887498949, "device_id": "sensor_1", "time": "2024-03-18 21:14:48.000" }

已提問 2 個月前檢視次數 157 次
1 個回答
0

Looking at the code, the lamda_handler is getting the current time in milliseconds

current_time_ms = int(time.time() * 1000)

and passing that value into write_to_timestream()

write_to_timestream is then converting this to microseconds. I believe this calculation is incorrect.

'Time': str(int(timestamp * 1000000)), # Convert time to microseconds

As the input is already in milliseconds, this code should only multiply by 1000 to get microseconds. I would suggest changing it to

'Time': str(int(timestamp * 1000)), # Convert time to microseconds

AWS
Steve
已回答 2 個月前
profile picture
專家
已審閱 2 個月前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南