How to Create an AWS Kinesis Stream and Put Data into the Stream Using Python Boto3?

0

I've been working with AWS Kinesis and I want to automate the creation of a Kinesis stream and the insertion of data into the stream using Python and Boto3. I have setup AWS SDKs and have necessary access. Could someone provide me with a step-by-step code snippet on how to achieve this?

2 Antworten
1
Akzeptierte Antwort

The examples in the following documentation may be helpful for creating a Kinesis Stream and writing records.
https://docs.aws.amazon.com/code-library/latest/ug/python_3_kinesis_code_examples.html

The stream is created using create_stream() and the record is written using put_record().
Also, although not mentioned in this document, put_records() can be used to register multiple records at once.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/create_stream.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_record.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_records.html

profile picture
EXPERTE
beantwortet vor einem Jahr
1

install boto3 then you can use the following code as an example

import time
import boto3
import json

# Create a session
session = boto3.Session(
    aws_access_key_id='YOUR_ACCESS_KEY',
    aws_secret_access_key='YOUR_SECRET_KEY',
    region_name='us-west-2'  # or your preferred region
)

# Create a Kinesis resource
kinesis = session.resource('kinesis')

# Name of your stream
stream_name = 'MyNewStream'

# Create a stream
stream = kinesis.create_stream(
    StreamName=stream_name,
    ShardCount=1  # Modify as per your requirements
)

# Waiting for the stream to become active
while True:
    stream.reload()  # Refresh stream status
    if stream.status == 'ACTIVE':
        break
    print('Waiting for stream to become active...')
    time.sleep(1)

# Sample data to put into stream
data = {
    'message': 'Hello from AWS Kinesis!'
}

# Convert data to JSON format
data_as_json = json.dumps(data)

# Put data into stream
response = kinesis.meta.client.put_record(
    StreamName=stream_name,
    Data=data_as_json,
    PartitionKey='partitionkey'  # Replace with your own partition key
)

print('Data inserted into stream: ', response)
profile picture
EXPERTE
beantwortet vor einem Jahr

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen