How to Create an AWS Kinesis Stream and Put Data into the Stream Using Python Boto3?

0

I've been working with AWS Kinesis and I want to automate the creation of a Kinesis stream and the insertion of data into the stream using Python and Boto3. I have setup AWS SDKs and have necessary access. Could someone provide me with a step-by-step code snippet on how to achieve this?

2 Answers
1
Accepted Answer

The examples in the following documentation may be helpful for creating a Kinesis Stream and writing records.
https://docs.aws.amazon.com/code-library/latest/ug/python_3_kinesis_code_examples.html

The stream is created using create_stream() and the record is written using put_record().
Also, although not mentioned in this document, put_records() can be used to register multiple records at once.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/create_stream.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_record.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_records.html

profile picture
EXPERT
answered a year ago
1

install boto3 then you can use the following code as an example

import time
import boto3
import json

# Create a session
session = boto3.Session(
    aws_access_key_id='YOUR_ACCESS_KEY',
    aws_secret_access_key='YOUR_SECRET_KEY',
    region_name='us-west-2'  # or your preferred region
)

# Create a Kinesis resource
kinesis = session.resource('kinesis')

# Name of your stream
stream_name = 'MyNewStream'

# Create a stream
stream = kinesis.create_stream(
    StreamName=stream_name,
    ShardCount=1  # Modify as per your requirements
)

# Waiting for the stream to become active
while True:
    stream.reload()  # Refresh stream status
    if stream.status == 'ACTIVE':
        break
    print('Waiting for stream to become active...')
    time.sleep(1)

# Sample data to put into stream
data = {
    'message': 'Hello from AWS Kinesis!'
}

# Convert data to JSON format
data_as_json = json.dumps(data)

# Put data into stream
response = kinesis.meta.client.put_record(
    StreamName=stream_name,
    Data=data_as_json,
    PartitionKey='partitionkey'  # Replace with your own partition key
)

print('Data inserted into stream: ', response)
profile picture
EXPERT
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions