How to Create an AWS Kinesis Stream and Put Data into the Stream Using Python Boto3?

0

I've been working with AWS Kinesis and I want to automate the creation of a Kinesis stream and the insertion of data into the stream using Python and Boto3. I have setup AWS SDKs and have necessary access. Could someone provide me with a step-by-step code snippet on how to achieve this?

alex
질문됨 일 년 전747회 조회
2개 답변
1
수락된 답변

The examples in the following documentation may be helpful for creating a Kinesis Stream and writing records.
https://docs.aws.amazon.com/code-library/latest/ug/python_3_kinesis_code_examples.html

The stream is created using create_stream() and the record is written using put_record().
Also, although not mentioned in this document, put_records() can be used to register multiple records at once.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/create_stream.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_record.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis/client/put_records.html

profile picture
전문가
답변함 일 년 전
1

install boto3 then you can use the following code as an example

import time
import boto3
import json

# Create a session
session = boto3.Session(
    aws_access_key_id='YOUR_ACCESS_KEY',
    aws_secret_access_key='YOUR_SECRET_KEY',
    region_name='us-west-2'  # or your preferred region
)

# Create a Kinesis resource
kinesis = session.resource('kinesis')

# Name of your stream
stream_name = 'MyNewStream'

# Create a stream
stream = kinesis.create_stream(
    StreamName=stream_name,
    ShardCount=1  # Modify as per your requirements
)

# Waiting for the stream to become active
while True:
    stream.reload()  # Refresh stream status
    if stream.status == 'ACTIVE':
        break
    print('Waiting for stream to become active...')
    time.sleep(1)

# Sample data to put into stream
data = {
    'message': 'Hello from AWS Kinesis!'
}

# Convert data to JSON format
data_as_json = json.dumps(data)

# Put data into stream
response = kinesis.meta.client.put_record(
    StreamName=stream_name,
    Data=data_as_json,
    PartitionKey='partitionkey'  # Replace with your own partition key
)

print('Data inserted into stream: ', response)
profile picture
전문가
답변함 일 년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠