Multiple additions to datastore from single JSON

0

My IoT device sends out a JSON via MQTT to IoT Core, which is passed to a single IoT Analytics channel and then to a pipeline.

An example JSON received is as follows:

{
    "mesh_name": "mesh_network_1",
    "datetime": "2019-07-17 09:00",
    "eui64": "BB797FFFFE897420",
    "sensors": [
        {
            "name": "office_1",
            "value": 24.6,
            "category": "temperature",
        },
        {
            "name": "office_2",
            "value": 55,
            "category": "humidity",
         },
        {
            "name": "outdoor_1",
            "value": 22.3,
            "category": "temperature",
         }
         ... (every sensor occupies a separate object within this list)
     ]
}

For each of the sensors I want to have a separate entry to the datastore. For example this input JSON would give:

mesh|datetime|name|value|category

"mesh_network_1"|"2019-07-17 09:00"|"office_1"|24.6|"temperature"

"mesh_network_1"|"2019-07-17 09:00"|"office_2"|55|"humidity"

"mesh_network_1"|"2019-07-17 09:00"|"outdoor_1"|22.3|"temperature"

However, I'm not sure how to do this within an IoT Analytics pipeline, as this seems to assume a single datastore addition per JSON.

Anyone know if there's a good way to do this, or will I have to just send three MQTT messages to convey this example instead? I'd really prefer not to do that.

Ashw
asked 5 years ago216 views
2 Answers
0

Edit: formatting.

I think the simplest way to achieve this would be to leverage a Lambda Activity on your Pipeline (https://docs.aws.amazon.com/iotanalytics/latest/userguide/pipeline-activities.html#aws-iot-analytics-pipeline-activities-lambda), and have it parse the single JSON payload into the desired structure. This depends somewhat on the 'raw' structure of the messages sent to the Channel.

So, for instance, we can send data to the Channel via CLI batch-put-message (https://docs.aws.amazon.com/cli/latest/reference/iotanalytics/batch-put-message.html), like so:

aws iotanalytics batch-put-message --channel-name sample_channel --messages '[{"messageId": "message1", "payload": "{\"sensors\": [{\"name\": \"office_1\", \"value\": 24.6, \"category\": \"temperature\"},{\"name\": \"office_2\", \"value\": 55, \"category\": \"temperature\"}]}"}]'

The Channel would then have a single message structured like this:

{
  "messageId": "message1",
  "payload": {
    "sensors": [
      {
        "name": "office_1",
        "value": 24.6,
        "category": "temperature"
      },
      {
        "name": "office_2",
        "value": 55,
        "category": "temperature"
      }
    ]
  }
}

If your Pipeline has a Lambda Activity, then the message(s) from the Channel will be passed to your Lambda function in the event argument.

I created a simple Lambda function (using Python 3.7) using the AWS Lambda console inline editor, and named it sample_lambda:

import json
import sys
import logging

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)


def lambda_handler(event, context):
    # This can be handy to see the raw structure of the incoming event
    # will log to the matching CloudWatch log:
    # /aws/lambda/<name_of_the_lambda>
    # logger.info("raw event: {}".format(event))
    
    parsed_rows = []
    
    # Depending on the batchSize setting of the Lambda Pipeline Activity,
    # you may receive multiple messages in a single event
    for message_payload in event:
        if 'sensors' in message_payload:
            for row in message_payload['sensors']:
                parsed = {}
                for key, value in row.items():
                    parsed[key] = value
                parsed_rows.append(parsed)

    return parsed_rows

I added the proper permissions so that the IoT-Analytics could invoke the lambda function via CLI:

aws lambda add-permission --function-name sample_lambda --statement-id statm01 --principal iotanalytics.amazonaws.com --action lambda:InvokeFunction

Reprocessing the Pipeline, the parsed rows are placed in the DataStore; executing the DataSet, I get this net result:

"sensors","name","value","category","__dt"
,"office_1",24.6,"temperature","2019-07-19 00:00:00.000"
,"office_2",55,"temperature","2019-04-26 00:00:00.000"

Edited by: Frank-AWS on Jul 22, 2019 6:39 AM

Edited by: Frank-AWS on Jul 22, 2019 6:41 AM

AWS
answered 5 years ago
0

Thanks, this worked perfectly.

Ashw
answered 5 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions