By using AWS re:Post, you agree to the Terms of Use

Connect IOT Core, Kafka Rule to managed Confluent Kafka cluster

0

I am testing the IOT Core integration to Kafka via the Kafka rule action. The Kafka cluster is a Confluent Managed Kafka in "Confluent Cloud" (which is on AWS, but as a SAS service). The documentation is very sparse on how to connect to external kafka clusters. I used the Destination VPN and setup various combinations of EINs with EIP, NAT gateway , Internet Gateway. I opened all security groups in and out with no blocking. I verified routes are routing external traffic though the IGW I used the Accessability tool in VPC to verify that end to end, the IGW and IOT ENIs were accessable to each other (yes) I verified via a test java program that the kafka cluster is publicly reachable (via laptop) I verifed the same in a EC2 instance in the VPC. I used VPC flow logs and see traffic in and out of VPN on the kafka port to the correct IP addreesses On kafka side, verified java tests succeeded in posting messages I tested default VPC and a new VPC with no other components - same result. No matter what I do I get the same error and no messages in kafka COnfiguration is using SASL_SSL with PLAIN auth. Secrets are using the SASL plain user/passord in secrets. Error:

{ "ruleName": "KafkaNewDest", "topic": "xxxx/iot-user/george", "cloudwatchTraceId": "c9b00bd9-473a-f106-b70b-c7d1cca1b0b9", "clientId": "iotconsole-dea798d2-833a-4924-97d7-b954e169218c", "base64OriginalPayload": "ewogICJjb3VudCI6IDEKfQ==", "failures": [ { "failedAction": "KafkaAction", "failedResource": "dev.priv.dmm.test", "errorMessage": "KafkaAction failed to send a message to the specified bootstrap servers. Topic dev.priv.dmm.test not present in metadata after 1000 ms.. Message arrived on: xxxx/iot-user/george, Action: kafka, topic: dev.priv.dmm.test, bootstrap.servers: pkc-xxxx.us-east-2.aws.confluent.cloud:9092" } ] }

Any suggestions on what to try next ?
Odd data point -- using a totally invalid user or password does not change the results. Same error.

1 Answers
1

I just started a test cluster on the Confluent cloud and was able to publish messages using the AWS IoT Kafka Action, here the steps I used to set it up:

  • Created a VPC with two subnets
  • Enabled DNS resolution on my VPC
  • Attached VPC Internet Gateway and a created default routing rule on both subnets to the IGW
  • Created an AWS IoT VPC Destination using both subnets created before and referenced a Security Group allowing all outbound traffic
  • Attached an EIP two both ENI created by the AWS IoT Destination
  • Configured the Kafka Action
aws iot get-topic-rule --rule-name KafkaConfluentTest
{
    "ruleArn": "arn:aws:iot:eu-west-1:12345678901:rule/KafkaConfluentTest",
    "rule": {
        "ruleName": "KafkaConfluentTest",
        "sql": "SELECT * FROM 'kafka' ",
        "description": "",
        "createdAt": "2022-08-04T10:24:09+02:00",
        "actions": [
            {
                "kafka": {
                    "destinationArn": "arn:aws:iot:eu-west-1:12345678901:ruledestination/vpc/8758ddb2-a2d1-4d6d-bfa2-10658b9511d1",
                    "topic": "test1",
                    "key": "",
                    "partition": "",
                    "clientProperties": {
                        "acks": "1",
                        "bootstrap.servers": "pkc-xxxxxx.eu-west-1.aws.confluent.cloud:9092",
                        "compression.type": "none",
                        "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                        "sasl.mechanism": "PLAIN",
                        "sasl.plain.password": "${get_secret('kafka-sasl-passwor
d', 'SecretString', 'arn:aws:iam::12345678901:role/AWSIoTServiceRole')}",
                        "sasl.plain.username": "${get_secret('kafka-sasl-username', 'SecretString', 'arn:aws:iam::12345678901:role/AWSIoTServiceRole')}",
                        "security.protocol": "SASL_SSL",
                        "value.serializer": "org.apache.kafka.common.serialization.ByteBufferSerializer"
                    }
                }
            }
        ],
        "ruleDisabled": false,
        "awsIotSqlVersion": "2016-03-23"
    }
}

After publishing messages using the AWS IoT Test client to my rule, I can see a new entry in the Conflunent.cloud dashboard under Data Integration -> Clients with Client id : aws-iot-rules-engine-a1921787-6654-42aa-9a67-00f9...

If you have things you want me to test in my environment, let me know.

profile picture
answered 12 days ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions