Questions tagged with Amazon Kinesis

Content language: English

Sort by most recent

Browse through the questions and answers listed below or filter and sort to narrow down your results.

AWS Greengrass doesn't send data to AWS Kinesis

The main purpose of my program is to connect to an incoming MQTT channel, and send the data received to my AWS Kinesis Stream called "MyKinesisStream". Here is my code: ``` import argparse import logging import random from paho.mqtt import client as mqtt_client from stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ReadMessagesOptions, ) broker = 'localhost' port = 1883 topic = "clients/test/hello/world" client_id = f'python-mqtt-{random.randint(0, 100)}' username = '...' password = '...' logging.basicConfig(level=logging.INFO) logger = logging.getLogger() args = "" def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.username_pw_set(username, password) client.on_connect = on_connect client.connect(broker, port) return client def sendDataToKinesis( stream_name: str, kinesis_stream_name: str, payload, batch_size: int = None, ): try: print("Debug: sendDataToKinesis with params:", stream_name + " | ", kinesis_stream_name, " | ", batch_size) print("payload:", payload) print("type payload:", type(payload)) except Exception as e: print("Error while printing out the parameters", str(e)) logger.exception(e) try: # Create a client for the StreamManager kinesis_client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: kinesis_client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig( identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name, batch_size=batch_size, )] ) kinesis_client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) sequence_no = kinesis_client.append_message(stream_name=stream_name, data=payload) print( "Successfully appended message to stream with sequence number ", sequence_no ) readValue = kinesis_client.read_messages(stream_name, ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000)) print("DEBUG read test: ", readValue) except Exception as e: print("Exception while running: " + str(e)) logger.exception(e) finally: # Always close the client to avoid resource leaks print("closing connection") if kinesis_client: kinesis_client.close() def subscribe(client: mqtt_client, args): def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") sendDataToKinesis(args.greengrass_stream, args.kinesis_stream, msg.payload, args.batch_size) client.subscribe(topic) client.on_message = on_message def run(args): mqtt_client_instance = connect_mqtt() subscribe(mqtt_client_instance, args) mqtt_client_instance.loop_forever() def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument('--greengrass-stream', required=False, default='...') parser.add_argument('--kinesis-stream', required=False, default='MyKinesisStream') parser.add_argument('--batch-size', required=False, type=int, default=500) return parser.parse_args() if __name__ == '__main__': args = parse_args() run(args) ``` (the dotted parts ... are commented out as they are sensitive information, but they are correct values.) The problem is that it just won't send any data to our kinesis stream. I get the following STDOUT from the run: ``` 2022-11-25T12:13:47.640Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Connected to MQTT Broker!. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Received `{"machineId":2, .... "timestamp":"2022-10-24T12:21:34.8777249Z","value":true}` from `clients/test/hello/world` topic. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Debug: sendDataToKinesis with params: test | MyKinesisStream | 100. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. payload: b'{"machineId":2,... ,"timestamp":"2022-10-24T12:21:34.8777249Z","value":true}'. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. type payload: <class 'bytes'>. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Successfully appended message to stream with sequence number 0. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. DEBUG read test: [<Class Message. stream_name: 'test', sequence_number: 0, ingest_time: 1669376980985, payload: b'{"machineId":2,"mach'>]. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} 2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. closing connection. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING} ``` So we can see that the data arrives from MQTT, the python code executes the append message, and it seems that my kinesis streams have the information as it can read it in the next step... then closes the connection without any error. But the problem is, that from AWS side, we cannot see the data arriving on the stream: [![screnshot of the aws console][1]][1] What can be the problem here? Our greengrass core is configured properly, can be accessed from the AWS, and the Component is running and healthy also: [![Screenshot of IoT Core status][2]][2] [![Screenshot of the state if the StreamManager component][3]][3] [1]: https://i.stack.imgur.com/wN5I4.png [2]: https://i.stack.imgur.com/JMJmn.png [3]: https://i.stack.imgur.com/2qnfn.png
1
answers
0
votes
19
views
ForestG
asked 9 days ago

Greengrass - Stream Manager - Failed to establish connection with the server

Hi there, I've installed the stream manager via pip, also installed the stream manager component via AWS console and everything looks healthy. I am trying to run these samples from github: https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/tree/main/samples and I am getting this error message (using sudo): ``` ERROR:StreamManagerClient:Received ConnectResponse with unexpected status ResponseStatusCode.Unauthorized. ERROR:root:Exception while running Traceback (most recent call last): File "s3.py", line 41, in main client = StreamManagerClient() File "/usr/local/lib/python3.8/dist-packages/stream_manager-1.1.1-py3.8.egg/stream_manager/streammanagerclient.py", line 114, in __init__ UtilInternal.sync(self.__connect(), loop=self.__loop) File "/usr/local/lib/python3.8/dist-packages/stream_manager-1.1.1-py3.8.egg/stream_manager/utilinternal.py", line 39, in sync return asyncio.run_coroutine_threadsafe(coro, loop=loop).result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception File "/usr/local/lib/python3.8/dist-packages/stream_manager-1.1.1-py3.8.egg/stream_manager/streammanagerclient.py", line 152, in __connect await asyncio.wait_for(self.__connect_request_response(), timeout=self.request_timeout) File "/usr/lib/python3.8/asyncio/tasks.py", line 494, in wait_for return fut.result() File "/usr/local/lib/python3.8/dist-packages/stream_manager-1.1.1-py3.8.egg/stream_manager/streammanagerclient.py", line 309, in __connect_request_response raise ConnectFailedException("Failed to establish connection with the server") stream_manager.exceptions.ConnectFailedException: Failed to establish connection with the server ``` So the error says that if failed to establish a connection with the server, but I am not sure what server is it referring to? DId I miss a step somewhere? Cheers, Mark
0
answers
0
votes
27
views
asked a month ago

Kinesis Analytics throws OutOfMemoryError during testing of small, single event

The following code is uploaded to Kinesis Analytics application: ``` from pyflink.datastream import StreamExecutionEnvironment from pyflink.common.typeinfo import Types from pyflink.common import Row env = StreamExecutionEnvironment.get_execution_environment() def my_map_func(row): items = [Row('1'), Row('2'), Row('3') ] return Row(items) ds = env.from_collection( collection=[(1, 'aaa'), (2, 'bbb')], type_info=Types.ROW([Types.INT(), Types.STRING()])) ds2 = ds.map(my_map_func, output_type=Types.ROW([ Types.LIST( Types.ROW([ Types.STRING(), ]) ), ])) with ds2.execute_and_collect() as results: for result in results: print(result) ``` Resulting in the following error when executing on the task managers: ``` Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseChars(SegmentsUtil.java:89) at org.apache.flink.table.runtime.util.StringUtf8Utils.decodeUTF8(StringUtf8Utils.java:126) at org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:90) at org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:41) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:345) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:72) at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:137) ``` I have also reproduced this same scenario in Kinesis Analytics Studio
1
answers
0
votes
88
views
mattyf
asked a month ago