ValidationException error while writing into Timestream table

0

I've been trying to use a lambda function to write into a timestream db table, but I've been getting the below error.

THE ERROR

Error writing data to Timestream: An error occurred (ValidationException) when calling the WriteRecords operation: Invalid time for record.

And I've not been able to get the issue resolved. Please can someone help me with what could be the issue?

THE LAMBDA FUNCTION

import boto3
import json
import time
from datetime import datetime

# Initialize the S3 client
s3 = boto3.client('s3')

# Initialize the TimestreamWrite client
timestream_write = boto3.client('timestream-write')

# S3 bucket and key for storing the last readings
bucket_name = 'bcpbucket1'
key = 'lastReadings.json'

# Timestream database and table ARNs
database_arn = 'lambda_injected_db'
table_arn = 'lambda_injected_table'

# Function to read the last readings from S3
def read_last_readings():
    try:
        response = s3.get_object(Bucket=bucket_name, Key=key)
        data = response['Body'].read().decode('utf-8')
        return json.loads(data)
    except Exception as e:
        print("Error reading last readings from S3:", e)
        return {'lastX': None, 'lastY': None}

# Function to write the last readings to S3
def write_last_readings(lastX, lastY):
    try:
        s3.put_object(Bucket=bucket_name, Key=key, Body=json.dumps({'lastX': lastX, 'lastY': lastY}), ContentType='application/json')
    except Exception as e:
        print("Error writing last readings to S3:", e)

# Function to write data to Amazon Timestream
def write_to_timestream(x_axis, y_axis, total_velocity, device_id, timestamp):
    try:
        # Format the record
        record = {
            'Time': str(int(timestamp * 1000000)),  # Convert time to microseconds
            'Dimensions': [{'Name': 'device_id', 'Value': device_id}],
            'MeasureName': 'x_axis',
            'MeasureValue': str(x_axis),
            'MeasureValueType': 'DOUBLE',
            'TimeUnit': 'MILLISECONDS'
        }
        
        # Add y_axis as another measure value
        record['MeasureName'] = 'y_axis'
        record['MeasureValue'] = str(y_axis)
        
        # Write the record to Timestream
        timestream_write.write_records(DatabaseName=database_arn, TableName=table_arn, Records=[record])
    except Exception as e:
        print("Error writing data to Timestream:", e)

def lambda_handler(event, context):
    # Ensure that the "x_axis" and "y_axis" properties exist in the incoming data
    if 'x_axis' in event and 'y_axis' in event and 'device_id' in event:
        # Convert the current time to milliseconds since epoch
        current_time_ms = int(time.time() * 1000)

        # Calculate the total velocity (magnitude of the velocity vector)
        total_velocity = ((event['x_axis'] ** 2 + event['y_axis'] ** 2) ** 0.5) / 1  # Assuming time interval is 1 second

        # Write the last readings to S3
        write_last_readings(event['x_axis'], event['y_axis'])

        # Write the event data to Amazon Timestream
        write_to_timestream(event['x_axis'], event['y_axis'], total_velocity, event['device_id'], current_time_ms)
            
        # Return only the specified fields in the desired format
        return {
            'x_axis': event['x_axis'],
            'y_axis': event['y_axis'],
            'totalVelocity': total_velocity,
            'device_id': event['device_id'],
            'time': datetime.utcfromtimestamp(current_time_ms // 1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        }
    else:
        raise ValueError("Invalid data format. 'x_axis', 'y_axis', and 'device_id' properties are required.")

THE OUTPUT OF THE LAMBDA FUNCTION TO BE WRITTEN INTO TIMESTREAM

I get the below response together with the ValidationException error every time I run the lambda function

{ "x_axis": 5, "y_axis": 10, "totalVelocity": 11.180339887498949, "device_id": "sensor_1", "time": "2024-03-18 21:14:48.000" }

已提问 2 个月前157 查看次数
1 回答
0

Looking at the code, the lamda_handler is getting the current time in milliseconds

current_time_ms = int(time.time() * 1000)

and passing that value into write_to_timestream()

write_to_timestream is then converting this to microseconds. I believe this calculation is incorrect.

'Time': str(int(timestamp * 1000000)), # Convert time to microseconds

As the input is already in milliseconds, this code should only multiply by 1000 to get microseconds. I would suggest changing it to

'Time': str(int(timestamp * 1000)), # Convert time to microseconds

AWS
Steve
已回答 2 个月前
profile picture
专家
已审核 2 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则