How do I create an Amazon MSK event source mapping to invoke a Lambda function using a NAT Gateway internet connection?
This guide outlines the process of integrating Amazon Managed Streaming for Apache Kafka (MSK) with AWS Lambda. It covers setting up VPC and its components, security groups, and an MSK cluster, as well as creating and configuring a Lambda function to process data from the MSK topic. A key focus is on setting up the Lambda trigger, which automatically invokes the function when new messages arrive in the MSK topic. This trigger mechanism ensures real-time processing of streaming data.
STEP 1 :- CREATE VPC
-
Access VPC Creation:
- Navigate to the AWS Management Console.
- Go to the VPC section.
- Click "Your VPCs" and then "Create VPC".
-
VPC Settings:
- Choose "VPC and more" to create a VPC with additional networking resources.
-
Name Configuration:
- Enter "mskToLambda_Demo" as the base name for resource tags.
-
IP Configuration:
- Set IPv4 CIDR block to Default
- Select "No IPv6 CIDR block".
-
Tenancy Setting:
- Choose "Default" from the dropdown menu.
-
Availability Zone (AZ) Configuration:
- Select "3" for the number of Availability Zones.
-
Subnet Configuration:
- Set "3" for the number of public subnets.
- Set "3" for the number of private subnets.
-
NAT Gateways:
- Select "In 1 AZ" for NAT gateway creation.
-
VPC Endpoints:
- Choose "S3 Gateway" for VPC endpoints.
-
Review Settings:
- Verify the VPC name will be "mskToLambda_Demo-vpc".
- Confirm 6 subnets will be created across 3 AZs (us-east-1a, us-east-1b, us-east-1c).
- Each AZ will have one public and one private subnet.
-
Create VPC:
- After reviewing all settings, click "Create VPC" to finalize the process.
-
Review VPC
-
After Your VPC is created, click on vpc id on main page to have a look at resources.
-
STEP II. Create IAM Authentication Enabled MSK Cluster
-
For detailed instructions on creating an Amazon MSK cluster, please refer to the official AWS documentation at https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html. When following these instructions, ensure that you select the same VPC that was created in Step I of this guide. Maintaining consistency in the VPC configuration is crucial for proper integration and communication between the components of this setup.
-
Please make sure to name your security group as "MSK_SG". We will configure Security group Rules for this Security group in Step VI after we have Security group ids of all components of this setup.
STEP III. Create an IAM Role for your EC2 Instance to Access MSK
- To create the necessary IAM policy for proper access to your Amazon MSK created in step II, please follow the official AWS guidelines available at https://docs.aws.amazon.com/msk/latest/developerguide/create-client-iam-role.html. This step is crucial for establishing proper authentication and authorization for your MSK cluster.
STEP IV. Create an EC2 instance
-
For detailed instructions on creating an EC2 instance to serve as a client machine for your Amazon MSK setup, please refer to the official AWS documentation at https://docs.aws.amazon.com/msk/latest/developerguide/create-client-machine.html. When following these instructions, it is critical to ensure that you select the VPC created in Step I. Additionally, please make sure to assign the IAM role created in Step III to this EC2 instance. Proper configuration of the EC2 instance is essential for secure communication with your MSK cluster and for performing operations such as creating topics and producing messages.
-
Please make sure to name your security group as "EC2_SG". We will configure Security group Rules for this Security group in Step VI after we have Security group ids of all components of this setup.
STEP V :- Create a Lambda function
-
Open the AWS Lambda console
- Sign in to the AWS Management Console
- Navigate to the Lambda service
-
Create a new function:
- Click on "Create function"
- Choose "Author from scratch"
-
Configure basic settings:
- Function name: Enter a name for your function
- Runtime: Leave as default
- Architecture: Leave as default (x86_64)
-
Advanced settings:
- Expand the "Advanced settings" section - Enable VPC: Check this box
- VPC: Select the VPC you created in step I
- Subnets: Select at least two private subnets
- Security groups: Choose Default Security group ( You can name it as "Lambda_SG)
-
Create the function:
- Click on "Create function" at the bottom of the page
-
Function code:
- In the Function code section, you'll see the code editor
- Replace the default code with following :-Lambda function code
export const handler = async (event) => {
for (let key in event.records) {
console.log('Key: ', key)
event.records[key].map((record) => {
console.log('Record: ', record)
const msg = Buffer.from(record.value, 'base64').toString()
console.log('Message:', msg)
})
}
}
- Click “Deploy”
STEP VI :- Create Security Group Rules
- Assuming you have named security groups for MSK as "MSK_SG", Lambda as "Lambda_SG" and EC2 instance as "EC2_SG", please ensure all components have the specified rules as outlined below.
Note: The security group configurations are designed to follow the principle of least privilege. The SSH access to the EC2 instance is set to 0.0.0.0/0 (anywhere) because the public IP of the EC2 instance may change, and you need to maintain access for management purposes. While this is less restrictive, it's a common practice for EC2 instances. For enhanced security in production environments, consider using:
-
AWS Systems Manager Session Manager for remote access without opening port 22.
-
A bastion host or VPN for controlled SSH access.
-
Restrict SSH access to your organization's IP range if it's static.
-
For testing purposes, you may temporarily open all traffic on all ports for each security group. However, in production, implement the most restrictive rules possible that still allow necessary functionality.
STEP VII :- Install Kafka on EC2 and Create Topic
-
To install Kafka on your EC2 instance and create the required topic, use the guide at https://docs.aws.amazon.com/msk/latest/developerguide/create-topic.html. Follow the instructions to install Kafka, then create a topic named "MSKTutorialTopic". Ensure you use the correct bootstrap server string and client properties file for your MSK cluster. Replace any placeholder values in the commands with your actual MSK cluster details.
-
After completing all the steps and after running the create topic command, If the command succeeds, you see the following message: Created topic MSKTutorialTopic.
STEP VIII :- Create Lambda Trigger
1. Edit IAM Role of Lambda
-
Go to the Configuration tab: In your function's dashboard, select the "Configuration" tab
-
Access Permissions: In the left sidebar of the Configuration view, click on "Permissions"
-
Edit the execution role: In the "Execution role" section, click on the link under "Role name" .This will open the IAM console in a new tab, focused on your Lambda's execution role
-
Add inline policy: In the IAM console, scroll down to the "Permissions" tab, Click on "Add inline policy"
-
Create the custom policy: Choose the JSON tab
Paste the custom policy below, replacing the placeholders with your specific values:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:DescribeCluster",
"kafka-cluster:DescribeTopicDynamicConfiguration",
"kafka-cluster:AlterTopic",
"kafka-cluster:DescribeTopic",
"kafka-cluster:DescribeGroup",
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeClusterDynamicConfiguration",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData",
"kafka-cluster:CreateTopic",
"kafka:DescribeCluster",
"kafka:GetBootstrapBrokers",
"kafka:ListClusters"
],
"Resource": "arn:aws:kafka:{REGION}:{ACCOUNT_ID}:*/{CLUSTER_NAME}/*"
},
{
"Effect": "Allow",
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:{REGION}:{ACCOUNT_ID}:*"
}
]
}
-
Review and create the policy: Click "Review policy" ,Give your policy a name (e.g., "LambdaMSKCustomPolicy") and Click "Create policy"
-
Verify the policy: Back on the role summary page, you should see your new custom policy listed under "Permissions policies"
2. Add trigger:
-
In the Function overview, click on "Add trigger"
-
In the Trigger configuration dropdown, select "MSK"
3. Configure the MSK trigger:
-
MSK cluster: Select your MSK cluster from the dropdown created in step III
-
Authentication: Leave Blank
-
Secrets Manager key: Leave Blank
-
Batch size: 100
-
Starting position: Latest
-
Batch Window: Leave Blank
-
Topic name: MSKTutorialTopic (Should be the same as the topic name we created)
- Click “Add” , Once your Trigger is Successfully created, verify State is Enabled and Last Processing result should be OK .
STEP IX :- Testing
-
As the trigger is ready, we will produce data to topic MSKTutorialTopic we created from Ec2 console.
-
Command :- <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapServerString --producer.config client.properties --topic MSKTutorialTopic
-
You would get a prompt to insert data, produce sample data, for eg :- “Sending Test data to Topic MSKTutorialTopic” as shown
-
Verify if sending this topic data has triggered lambda by looking for data in cloudwatch logs for this lambda function
-
Open the Monitor Tab: In your function's dashboard, click on the "Monitor" tab
-
Access CloudWatch Logs: Scroll down to the "Logs" section and Click on the "View CloudWatch logs" button
-
View Log Streams: You'll be taken directly to the CloudWatch Logs page for your function. You'll see a list of log streams, typically one for each invocation of your function. The streams are sorted with the most recent at the top
-
Select a Log Stream: Click on the most recent log stream (usually at the top of the list)
-
Examine the Logs: You should be able to see the message you sent from Topic in logs.
CONCLUSION :- By integrating Amazon MSK with AWS Lambda, you've established a robust, serverless architecture for real-time stream processing. This setup harnesses Lambda's event-driven invocation, enabling instant processing of messages as they land in your Kafka topics. Such low-latency data handling is crucial for time-sensitive applications and opens up possibilities for real-time analytics, immediate fraud detection, or responsive IoT systems. In this guide, A key focus is on enabling internet connectivity for MSK through the use of a NAT (Network Address Translation) gateway. This configuration allows the MSK cluster to securely access internet resources while maintaining the cluster within a private subnet, enhancing both functionality and security. You can also use VPC Endpoints instead by using the document :- https://docs.aws.amazon.com/lambda/latest/dg/services-msk-tutorial.html
Relevant content
- Accepted Answerasked 2 years agolg...
- Accepted Answerasked a year agolg...
- AWS OFFICIALUpdated 3 years ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 months ago