Building Kafka producers with AWS Lambda: a multi-language integration guide

4 minute read
Content level: Advanced
0

When using AWS Lambda as a Kafka producer, you need to include a Kafka client library. This guide demonstrates how to integrate Kafka client libraries for Python and Node.js in your Lambda functions to produce messages to Apache Kafka.

In today's data-driven world, real-time data streaming has become crucial for modern applications. AWS Lambda combined with Amazon Managed Streaming for Apache Kafka (MSK) offers a powerful solution for building serverless event streaming applications. Let's explore how to implement Kafka producers using AWS Lambda across different programming languages.

Prerequisites:

Before diving in, ensure you have:

  • An AWS account with appropriate permissions
  • A running Amazon MSK cluster
  • Properly configured IAM roles and policies
  • VPC configuration for Lambda-MSK connectivity

Python implementation

Python's simplicity makes it an excellent choice for building Kafka producers. Here's how to get started:

  • First, set up your project structure:
mkdir kafka-lambda-python
cd kafka-lambda-python
echo kafka-python > requirements.txt
pip install -r requirements.txt -t .
  • Create your Lambda function using the kafka-python library. Sample code (The function generates device metrics data including: Device model, Device ID, Interface status, CPU usage, Memory usage, Event timestamp)
from kafka import KafkaProducer
import json
import random
from datetime import datetime
topicname='YOUR_TOPIC_NAME'

BROKERS = "ADD_YOUR_BROKER_NAME-1:9092,ADD_YOUR_BROKER_NAME-2:9092, ADD_YOUR_BROKER_NAME-3:9092"
producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='PLAINTEXT')


def getModel():
    product=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (product[randomnum])

def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])


def getCPU():
    i = random.randint(50, 100)
    return (str(i))

def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
def genData():
	
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    
    new_dict={}
    new_dict["model"]=model
    new_dict["deviceid"]=deviceid
    new_dict["interface"]=interface
    new_dict["interfacestatus"]=interfacestatus
    new_dict["cpuusage"]=cpuusage
    new_dict["memoryusage"]=memoryusage
    new_dict["event_time"]=event_time

    return new_dict

def lambda_handler(event, context):
    while True:
        data =genData()
        print(data)
        try:
            future = producer.send(topicname, value=data)
            producer.flush()
            record_metadata = future.get(timeout=10)
            #print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
        except Exception as e:
            print(e.with_traceback())
        

Node.js implementation

For JavaScript developers, the kafkajs library provides a robust solution:

  • Initialize your project:
mkdir kafka-lambda-node
cd kafka-lambda-node
npm init -y
npm install kafkajs
  • Sample code for index.js
const { Kafka } = require('kafkajs');

const TOPIC_NAME = 'YOUR_TOPIC_NAME';
const BROKERS = [
    'ADD_YOUR_BROKER_NAME-1:9092',
    'ADD_YOUR_BROKER_NAME-2:9092',
    'ADD_YOUR_BROKER_NAME-3:9092'
];

// Configure Kafka Producer
const kafka = new Kafka({
    clientId: 'device-metrics-producer',
    brokers: BROKERS,
    retry: {
        initialRetryTime: 500,
        retries: 3
    },
    requestTimeout: 20000
});

const producer = kafka.producer();

// Helper Functions
function getRandomInt(min, max) {
    return Math.floor(Math.random() * (max - min + 1)) + min;
}

function getModel() {
    const products = [
        "Ultra WiFi Modem",
        "Ultra WiFi Booster",
        "EVG2000",
        "Sagemcom 5366 TN",
        "ASUS AX5400"
    ];
    return products[getRandomInt(0, products.length - 1)];
}

function getInterfaceStatus() {
    const status = Array(12).fill("connected").concat(["down", "down"]);
    return status[getRandomInt(0, status.length - 1)];
}

function getCPU() {
    return getRandomInt(50, 100).toString();
}

function getMemory() {
    return getRandomInt(1000, 1500).toString();
}

function formatDateTime() {
    const now = new Date();
    return now.toISOString().slice(0, 19).replace('T', ' ');
}

function genData() {
    return {
        model: getModel(),
        deviceid: `dvc${getRandomInt(1000, 10000)}`,
        interface: 'eth4.1',
        interfacestatus: getInterfaceStatus(),
        cpuusage: getCPU(),
        memoryusage: getMemory(),
        event_time: formatDateTime()
    };
}

// Lambda handler function
exports.handler = async (event, context) => {
    try {
        // Connect to Kafka
        await producer.connect();
        console.log('Producer connected successfully');

        // Generate and send data
        const data = genData();
        console.log('Sending data:', data);

        // Send message to Kafka
        const response = await producer.send({
            topic: TOPIC_NAME,
            messages: [
                {
                    value: JSON.stringify(data)
                }
            ]
        });

        // Disconnect producer
        await producer.disconnect();

        return {
            statusCode: 200,
            body: JSON.stringify({
                message: 'Message sent successfully',
                data: data,
                kafkaResponse: response
            })
        };

    } catch (error) {
        console.error('Error:', error);
        return {
            statusCode: 500,
            body: JSON.stringify({
                message: 'Error sending message',
                error: error.message
            })
        };
    }
};

Deployment steps

For both implementations:

  • Package your function and other dependencies on that directory.

zip -r function.zip .

  • Create a new Lambda function
  • Select appropriate runtime (Python or Node.js)
  • Configure VPC settings to match your MSK cluster
  • Upload the function.zip file

When implementing Kafka producers in AWS Lambda, consider these key points:

  • Connection Management: Follow these best practices for connection reuse.
  • Error Handling: Implement error handling and retry mechanisms.
  • VPC Configuration: Ensure proper network access between Lambda and MSK cluster.

Conclusion

AWS Lambda provides a flexible platform for building Kafka producers. By following these examples and best practices, you can build reliable, scalable streaming solutions that integrate seamlessly with Amazon MSK.

profile pictureAWS
EXPERT
published 2 months ago366 views