How to Create an AWS Kinesis Stream and Put Data into the Stream Using Python Boto3?

0

I've been working with AWS Kinesis and I want to automate the creation of a Kinesis stream and the insertion of data into the stream using Python and Boto3. I have setup AWS SDKs and have necessary access. Could someone provide me with a step-by-step code snippet on how to achieve this?

alex
已提问 1 年前747 查看次数
2 回答
1
已接受的回答

The examples in the following documentation may be helpful for creating a Kinesis Stream and writing records.
https://docs.aws.amazon.com/code-library/latest/ug/python_3_kinesis_code_examples.html

The stream is created using create_stream() and the record is written using put_record().
Also, although not mentioned in this document, put_records() can be used to register multiple records at once.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/create_stream.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_record.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_records.html

profile picture
专家
已回答 1 年前
1

install boto3 then you can use the following code as an example

import time
import boto3
import json

# Create a session
session = boto3.Session(
    aws_access_key_id='YOUR_ACCESS_KEY',
    aws_secret_access_key='YOUR_SECRET_KEY',
    region_name='us-west-2'  # or your preferred region
)

# Create a Kinesis resource
kinesis = session.resource('kinesis')

# Name of your stream
stream_name = 'MyNewStream'

# Create a stream
stream = kinesis.create_stream(
    StreamName=stream_name,
    ShardCount=1  # Modify as per your requirements
)

# Waiting for the stream to become active
while True:
    stream.reload()  # Refresh stream status
    if stream.status == 'ACTIVE':
        break
    print('Waiting for stream to become active...')
    time.sleep(1)

# Sample data to put into stream
data = {
    'message': 'Hello from AWS Kinesis!'
}

# Convert data to JSON format
data_as_json = json.dumps(data)

# Put data into stream
response = kinesis.meta.client.put_record(
    StreamName=stream_name,
    Data=data_as_json,
    PartitionKey='partitionkey'  # Replace with your own partition key
)

print('Data inserted into stream: ', response)
profile picture
专家
已回答 1 年前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则