Routing MQTT messages to self-managed Kafka using IoT Core Rule Engine
This blog covers how to send device messages from AWS IoT Core to a self-managed Kafka cluster hosted on Amazon Elastic Compute Cloud (Amazon EC2) using the AWS IoT Rules Engine.
Introduction
This blog covers how to send device messages from AWS IoT Core to a self-managed Kafka cluster hosted on Amazon Elastic Compute Cloud (Amazon EC2) using the AWS IoT Rules Engine. AWS IoT Core provides secure, bi-directional communication for Internet-connected devices (such as sensors, actuators, embedded devices, wireless devices, and smart appliances) to connect to the AWS Cloud over MQTT, HTTPS, and LoRaWAN. The AWS IoT Rules Engine allows routing of data from IoT Core to various downstream services without additional implementation effort. Apache Kafka, a popular technology for data streaming, is one of the integration options supported by the AWS IoT Rules engine. Setting up this integration with a self-managed Kafka cluster requires specific steps, which are outlined in this guide. For those interested in a managed solution, Amazon Managed streaming for Apache Kafka(Amazon MSK) is also available, with setup instructions provided in this blog . This blog focuses exclusively on self-managed Kafka.
Overview
This architecture demonstrates how devices can send messages to a self-managed Kafka server deployed on Amazon Elastic Compute Cloud (Amazon EC2). Devices communicate with AWS IoT Core using the MQTT or HTTPS protocols. An AWS IoT Core rule is configured to route these messages to the appropriate Kafka topic on the server. The AWS IoT Core rule accesses the Kafka server securely by using credentials stored in AWS Secrets manager and a VPC Destinations configuration, ensuring secure communication within the Amazon Virtual Private Cloud (Amazon VPC). In this post, you'll use the MQTT client to produce messages, which the Kafka server will then consume from the specified Kafka topic.
Prerequisites
- AWS account with console access
- Familiarity with Linux commands
- Familiarity with AWS Console, Amazon EC2 and AWS IoT Core.
Walkthrough
For setting up the aforementioned architecture, follow the steps outlined below:
- Setting up Kafka Server on Amazon EC2
- Setup Client to Consume Kafka Messages
- Create Credentials in AWS Secrets manager
- Set up AWS IAM role and policy for AWS IoT Rule to send a message to Kafka
- Configure an AWS IoT Rule action to send a message to Kafka Cluster
- Testing the data pipeline using the MQTT Client
Step 1: Setting up Kafka Server on Amazon EC2
Create an Amazon EC2 instance:
- From the Amazon EC2 console, choose Launch instance.
- Enter a Name for your client machine, such as
my-kafka-cluster
. - Under Application and OS Images (Amazon Machine Image), do the following:
- Choose Quick Start, and then choose the Ubuntu.
- From Amazon Machine Image (AMI), select the “Ubuntu Server 24.04 LTS (HVM),EBS General Purpose (SSD) Volume Type”.
- Choose the instance type as t2.micro.
- Under Key pair (login), for Key pair name, select Proceed without a key pair (Not recommended) as EC2 Instance Connect will be used to access the instance. EC2 Instance Connect pushes a key pair to the instance's metadata, which is then used for the connection.
Note: There are multiple ways to connect to your EC2 instance; refer here for more details. - Under Network Settings, select your Amazon VPC and Subnet, or leave them as default. For Firewall (security groups), choose Allow SSH traffic from "com.amazonaws.region.ec2-instance-connect" (replace region with your region code. e.g., com.amazonaws.us-west-2.ec2-instance-connect). Also, ensure that Auto-assign Public IP is set to Enable.
- Keep other options as-is and choose Launch instance.
Connect to your instance:
-
After the instance is running, note the Private IPv4 addresses and Private IP DNS name, then click Connect.
-
On the Connect to instance page, choose the “EC2 Instance Connect” tab and click “Connect”.
Note: If you encounter any permission issues, verify that the required policies for EC2 Instance Connect are attached, as outlined here.
Installing and Setting Up Apache Kafka:
In this step, you will install the necessary Java Development Kit, downloading the Apache Kafka package, extracting its contents, and preparing the Kafka directory for use by renaming and navigating to it.
-
After connecting via SSH, execute the following commands in sequence. Export the previously captured "Private IPv4 address" and "Private IP DNS name" as environment variables (replacing with your values) to reuse in subsequent commands.
export KAFKA_IP="<Private IPV4 addresses>" export KAFKA_IP_DNS="<Private IP DNS name>"
sudo apt-get update sudo apt-get install openjdk-17-jdk wget https://dlcdn.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz tar -xvzf kafka_2.13-3.8.0.tgz rm kafka_2.13-3.8.0.tgz mv kafka_2.13-3.8.0 kafka cd kafka
Setting Up SSL/TLS for Secure Kafka Communication
To set up SSL/TLS for secure communication between the Kafka server and client, you'll follow these steps:
-
Create a new file (such as v3.ext) to define the necessary certificate extensions, including the Subject Alternative Name, ensuring proper SSL/TLS configuration for the Kafka server. To dynamically replace the Kafka IP, start by creating a template file.
sudo nano v3.ext.template
Copy and save the following content to a file.
authorityKeyIdentifier=keyid,issuer basicConstraints=CA:FALSE keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment subjectAltName = @alt_names [alt_names] IP.1 = ${KAFKA_IP}
When using
sudo nano
to edit a file, you can save the file by following these steps:- Press
Ctrl + O
(this is the shortcut for "Write Out" or save). - Press
Enter
to confirm the filename (or modify it if needed). - To exit nano, press
Ctrl + X
.
Run following command to dynamically replace IP address.
envsubst < v3.ext.template > v3.ext
- Press
-
Run the following command to create a new self-signed CA certificate. Use "password" as the PEM passphrase (or choose and remember your own passphrase), and customize the parameters according to your preferences. Ensure the CN value is set to
${KAFKA_IP_DNS}
. You can replace the values forC
,ST
,L
, andO
with your own country, state, city, and organization.openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -subj "/C=US/ST=NewYork/L=NewYork/O=AnyCompany/CN=${KAFKA_IP_DNS}"
-
Run the following command to import the newly created CA certificate into the Kafka server truststore
keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert
-
Run the following command to generate a key pair for the Kafka server, including the server's IP address in the certificate as a Subject Alternative Name (SAN). This step ensures proper SSL/TLS encryption for secure communication between the Kafka server and its clients. You can replace the values for
OU
,O
,L
,ST
, andC
with your own organization unit, organization, city, state, and country if needed.keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkeypair -keyalg RSA -ext "SAN=IP:${KAFKA_IP}" -dname "CN=${KAFKA_IP_DNS}, OU=IT, O=AnyCompany, L=NewYork, ST=NewYork, C=US"
-
Run following commands to enable secure SSL/TLS communication between the Kafka server and its clients by generating and signing certificates, ensuring the Kafka server and client trust each other using the CA certificate.
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file -ext "SAN=IP:${KAFKA_IP}" openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:password -sha256 -extfile v3.ext keytool -keystore kafka.server.keystore.jks -alias CARoot -keyalg RSA -importcert -file ca-cert keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -importcert -file cert-signed keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
-
Run following command to verify SAN in KeyStore.
keytool -list -v -keystore kafka.server.keystore.jks
confirm occurrence of following :
#4: ObjectId: 2.5.29.17 Criticality=false SubjectAlternativeName [ IPAddress: 172.31.34.95 ]
Upload Kafka client truststore to Amazon S3
In this section, you'll upload the Kafka client truststore to Amazon S3 for the client to download and communicate with the Kafka server.
Note: You can choose your own method or storage location for this file, which will be used later when setting up the Kafka client.
-
Install the AWS CLI by following the instructions provided in this guide.
-
Refer to this article for granting Amazon EC2 access to Amazon S3.
Make sure the role has the AmazonS3FullAccess and SecretsManagerReadWrite policies attached, as these will be required later in this section.
-
Create an Amazon S3 bucket.
Store your current account region and the name of the S3 bucket you want to create for storing the client truststore in environment variables. Be sure to replace <AccountId> and <Region> (e.g., us-west-2).
export Region="<Region>" export S3bucketname=kafkatruststorebucket-<AccountId>
Run the following command to create an S3 bucket:
aws s3 mb s3://${S3bucketname} --region $Region
-
Run following command to store client truststore to S3.
aws s3 cp kafka.client.truststore.jks s3://${S3bucketname}/kafka.client.truststore.jks
Configuring Kafka Server Properties
This section outlines the essential steps for configuring Kafka server properties to enable secure communication using SASL_SSL.
-
Edit server.properties file using following command:
sudo nano config/server.properties
Under the “Server basics” section, paste the following configuration and save it. Be sure to replace the IP addresses.
listeners=PLAINTEXT://172.31.26.208:9092,SASL_SSL://172.31.26.208:9093 sasl.enabled.mechanisms=PLAIN ssl.truststore.location=/home/ubuntu/kafka/kafka.server.truststore.jks ssl.truststore.password=password ssl.keystore.location=/home/ubuntu/kafka/kafka.server.keystore.jks ssl.keystore.password=password ssl.key.password=password ssl.client.auth=required
It should look like below:
-
Create server_jaas.config file to configure SASL authentication for the Kafka server:
sudo nano server_jaas.conf
paste the following configuration and save it.
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="uname" password="password" user_uname="password"; };
Invoke the following command to load jaas file to environment:
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/ubuntu/kafka/server_jaas.conf"
Run Zookeeper and Kafka Server
- Run Zookeeper using following command:
bin/zookeeper-server-start.sh config/zookeeper.properties
- Open a new session and navigate to Kafka directory and run following commands:
If you're using a smaller server, run the following command to prevent heap errors:export KAFKA_OPTS="-Djava.security.auth.login.config=/home/ubuntu/kafka/server_jaas.conf"
Start Kafka Server:export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
bin/kafka-server-start.sh config/server.properties
Step 2: Setup Client to consume kafka Messages.
In this section, you'll use the same Amazon EC2 instance as the consumer. However, you can follow the same steps if you're setting up on a different machine after installing kafka packages.
Setup Consumer on Amazon EC2 instance
-
Open a new session, ensure you're in the Kafka directory, create the "for-remote/pvt" directory structure, and navigate to the pvt directory
-
Copy the client truststore file from Amazon S3, replacing "kafkatruststorebucket-<AccountId>" with your bucket name.
aws s3 cp s3://kafkatruststorebucket-<AccountId>/kafka.client.truststore.jks kafka.client.truststore.jks
-
Create a new file client.properties to specify properties needed to authenticate and communicate with kafka cluster securely.
sudo nano client.properties
Paste the following configuration and save it.
security.protocol=SASL_SSL sasl.mechanism=PLAIN ssl.truststore.location=/home/ubuntu/kafka/for-remote/pvt/kafka.client.truststore.jks ssl.truststore.password=password
-
Create a new file client_jaas.conf to configure SASL authentication for the kafka client.
sudo nano client_jaas.conf
Paste the following configuration and save it.
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="uname" password="password"; };
-
Run following commands: First navigate to Kafka working directory.
cd ~/kafka
Export the "Private IPv4 addresses" as environment variable.
export KAFKA_IP="<Private IPV4 addresses>"
Invoke the following command to load jaas file to environment:
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/ubuntu/kafka/for-remote/pvt/client_jaas.conf"
-
Create a topic named “kafka_test_topic”:
bin/kafka-topics.sh --bootstrap-server ${KAFKA_IP}:9092 --topic kafka_test_topic --create --partitions 1 --replication-factor 1
-
Start consumer (you will not see any message yet)
bin/kafka-console-consumer.sh --topic kafka_test_topic --bootstrap-server ${KAFKA_IP}:9093 --from-beginning --consumer.config /home/ubuntu/kafka/for-remote/pvt/client.properties
Note: Ensure that your Kafka server and Zookeeper services are running.
-
Optional: Verify the setup by sending messages from Kafka Server: Open a new session, ensure you're in the Kafka directory and run the following commands:
Export the "Private IPv4 addresses" as environment variable.
export KAFKA_IP="<Private IPV4 addresses>"
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/ubuntu/kafka/for-remote/pvt/client_jaas.conf" bin/kafka-console-producer.sh --topic kafka_test_topic --bootstrap-server ${KAFKA_IP}:9093 --producer.config /home/ubuntu/kafka/for-remote/pvt/client.properties
Send any message, and you should see it received in the consumer session.
Step 3: Create credentials in AWS Secrets Manager
-
Open a new session and navigate to kafka directory.
-
Run following command to store your account region (e.g., "us-west-2"):
export Region="<Region>"
-
Run the following command to create a secret named "Kafka_Keystore".
aws secretsmanager create-secret --name Kafka_Keystore --secret-binary fileb://kafka.client.truststore.jks --region $Region
-
Run following commands to create SASL credentials.
aws secretsmanager create-secret --name kafka-sasl-username --secret-string '{"kafka-sasl-username":"uname"}' --region $Region
aws secretsmanager create-secret --name kafka-sasl-password --secret-string '{"kafka-sasl-password":"password"}' --region $Region
Step 4: Set up AWS IAM role and policy for AWS IoT Rule to send a message to Kafka
Create a role to allow AWS IoT rule to access Secrets
-
Sign in to AWS IAM, go to Policies, choose Create policy, select JSON, and paste the following policy (replacing with you AccountId and Region).
{ "Version": "2012-10-17", "Statement": [ { "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:<Region>:<AccountId>:secret:Kafka_Keystore-*", "arn:aws:secretsmanager:<Region>:<AccountId>:secret:kafka-sasl-username-*", "arn:aws:secretsmanager:<Region>:<AccountId>:secret:kafka-sasl-password-*" ], "Effect": "Allow" } ] }
-
On next page, Name the policy "AwsIoTSecretManagerAccess" and click Create Policy.
-
Go to Roles, create a new role, select EC2 as the use case, and assign the AwsIoTSecretManagerAccess policy (Note: EC2 selection is required for SecretsManager Policy)
-
Type Role name as “IoTkafkaSASLSecretsRole” and choose Create Role.
-
After the Role is created, search the newly-created Role name to view the Summary of the role.
-
In the Trust relationships tab, choose Edit trust relationship, replace it with the following one, and choose Update Policy.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "iot.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
Create a role to allow AWS IoT to reach your Kafka server on Amazon EC2.
-
Follow the same steps outlined above to create a policy with the permissions listed below, naming it 'IoTVpcDestinationPolicy', and create a role named 'IoTCreateVpcENIRole'.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" }, { "Effect": "Allow", "Action": "ec2:CreateNetworkInterfacePermission", "Resource": "*", "Condition": { "StringEquals": { "ec2:ResourceTag/VPCDestinationENI": "true" } } }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "*", "Condition": { "StringEquals": { "ec2:CreateAction": "CreateNetworkInterface", "aws:RequestTag/VPCDestinationENI": "true" } } } ] }
Step 5: Configure an AWS IoT Rule action to send a message to Kafka Cluster
Create VPC Destination:
The Apache Kafka rule action routes data to a Kafka cluster in a VPC. The VPC configuration is automatically enabled when you specify the VPC destinations, so let's create the VPC destination.
-
Sign in to the AWS IoT Core. Choose Message routing from the left side-menu, and select Destinations.
-
Choose Create Destination and select “Create VPC Destination”
-
Enter values for the following fields and click on Create
VPC ID & Subnet IDs: kafka server VPC and Subnet IDs
Security Group: Select the Security Group of the Amazon VPC, which has a rule allowing all traffic within the VPC (navigate to Amazon VPC => Security Groups)
Select IoTCreateVpcENIRole as IAM Role and choose Create
-
Next, update the inbound rules of your Amazon EC2 instance's security group to include the security group of your VPC. To do this, go to the Security tab of your EC2 instance, click on Security groups then select Edit inbound rules.
Create IoT Kafka Rule:
-
Sign in to AWS IoT Core, go to Message routing, and select Rules.
-
Choose Create rule, name it, and enter the SQL query: SELECT * FROM 'kafka_test_topic'
-
In Rule actions, choose Apache Kafka cluster and select the same VPC Destination as used in previous step. Enter “kafka_test_topic” as Kafka Topic name. keep Key and Partition to their default values.
-
Set the Client properties as follows(replace EC2 Instance IP address and AccountID):
Bootstrap.server = <Private IPV4 addresses>:9093
security.protocol = SASL_SSL
ssl.truststore = ${get_secret('Kafka_Keystore', 'SecretBinary', 'arn:aws:iam::<AccountID>:role/IoTkafkaSASLSecretsRole')}
ssl.truststore.password = password
sasl.plain.username = ${get_secret('kafka-sasl-username', 'SecretString','kafka-sasl-username', 'arn:aws:iam::<AccountID>:role/IoTkafkaSASLSecretsRole')}
sasl.plain.password = ${get_secret('kafka-sasl-password', 'SecretString','kafka-sasl-password', 'arn:aws:iam::<AccountID>:role/IoTkafkaSASLSecretsRole')}
-
Leave all other values unchanged, select Next, and then click Create on the following page.
Step 6: Testing the data pipeline using the MQTT Client
-
Open the MQTT test client from the AWS IoT Core page and publish a message to the 'Kafka_test_topic'
-
You will see data published on the MQTT topic being streamed to Kafka consumer. Ensure your Kafka Server, Zookeeper, and Consumer are running.
Common troubleshooting checks:
- Ensure your Amazon VPC's security group is added to the Amazon EC2 instance's inbound rules.
- Verify that Amazon S3 and Amazon Secrets Manager permissions are attached to your Amazon EC2 instance.
- If you encounter a "node disconnected" error, verify that both your Zookeeper and Kafka servers are running..
Cleaning up:
To avoid incurring future changes, delete the following resources:
- Amazon EC2
- AWS IoT Core rule
- AWS Secrets Manager Secrets
- Amazon S3
Conclusion:
In this post, you have learned how to configure an IoT Rule Action to deliver device messages to a self-managed Apache Kafka server using AWS IoT Core. With this setup, you can now securely deliver MQTT messages to a self-managed Kafka server, enabling the creation of a real-time streaming data pipeline.
Article co-authors:
相关内容
- AWS 官方已更新 1 年前
- AWS 官方已更新 3 年前
- AWS 官方已更新 1 年前
- AWS 官方已更新 2 年前