Connect IOT Core, Kafka Rule to managed Confluent Kafka cluster

0

I am testing the IOT Core integration to Kafka via the Kafka rule action. The Kafka cluster is a Confluent Managed Kafka in "Confluent Cloud" (which is on AWS, but as a SAS service). The documentation is very sparse on how to connect to external kafka clusters. I used the Destination VPN and setup various combinations of EINs with EIP, NAT gateway , Internet Gateway. I opened all security groups in and out with no blocking. I verified routes are routing external traffic though the IGW I used the Accessability tool in VPC to verify that end to end, the IGW and IOT ENIs were accessable to each other (yes) I verified via a test java program that the kafka cluster is publicly reachable (via laptop) I verifed the same in a EC2 instance in the VPC. I used VPC flow logs and see traffic in and out of VPN on the kafka port to the correct IP addreesses On kafka side, verified java tests succeeded in posting messages I tested default VPC and a new VPC with no other components - same result. No matter what I do I get the same error and no messages in kafka COnfiguration is using SASL_SSL with PLAIN auth. Secrets are using the SASL plain user/passord in secrets. Error:

{ "ruleName": "KafkaNewDest", "topic": "xxxx/iot-user/george", "cloudwatchTraceId": "c9b00bd9-473a-f106-b70b-c7d1cca1b0b9", "clientId": "iotconsole-dea798d2-833a-4924-97d7-b954e169218c", "base64OriginalPayload": "ewogICJjb3VudCI6IDEKfQ==", "failures": [ { "failedAction": "KafkaAction", "failedResource": "dev.priv.dmm.test", "errorMessage": "KafkaAction failed to send a message to the specified bootstrap servers. Topic dev.priv.dmm.test not present in metadata after 1000 ms.. Message arrived on: xxxx/iot-user/george, Action: kafka, topic: dev.priv.dmm.test, bootstrap.servers: pkc-xxxx.us-east-2.aws.confluent.cloud:9092" } ] }

Any suggestions on what to try next ?
Odd data point -- using a totally invalid user or password does not change the results. Same error.

已提問 2 年前檢視次數 548 次
1 個回答
1

I just started a test cluster on the Confluent cloud and was able to publish messages using the AWS IoT Kafka Action, here the steps I used to set it up:

  • Created a VPC with two subnets
  • Enabled DNS resolution on my VPC
  • Attached VPC Internet Gateway and a created default routing rule on both subnets to the IGW
  • Created an AWS IoT VPC Destination using both subnets created before and referenced a Security Group allowing all outbound traffic
  • Attached an EIP two both ENI created by the AWS IoT Destination
  • Configured the Kafka Action
aws iot get-topic-rule --rule-name KafkaConfluentTest
{
    "ruleArn": "arn:aws:iot:eu-west-1:12345678901:rule/KafkaConfluentTest",
    "rule": {
        "ruleName": "KafkaConfluentTest",
        "sql": "SELECT * FROM 'kafka' ",
        "description": "",
        "createdAt": "2022-08-04T10:24:09+02:00",
        "actions": [
            {
                "kafka": {
                    "destinationArn": "arn:aws:iot:eu-west-1:12345678901:ruledestination/vpc/8758ddb2-a2d1-4d6d-bfa2-10658b9511d1",
                    "topic": "test1",
                    "key": "",
                    "partition": "",
                    "clientProperties": {
                        "acks": "1",
                        "bootstrap.servers": "pkc-xxxxxx.eu-west-1.aws.confluent.cloud:9092",
                        "compression.type": "none",
                        "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                        "sasl.mechanism": "PLAIN",
                        "sasl.plain.password": "${get_secret('kafka-sasl-passwor
d', 'SecretString', 'arn:aws:iam::12345678901:role/AWSIoTServiceRole')}",
                        "sasl.plain.username": "${get_secret('kafka-sasl-username', 'SecretString', 'arn:aws:iam::12345678901:role/AWSIoTServiceRole')}",
                        "security.protocol": "SASL_SSL",
                        "value.serializer": "org.apache.kafka.common.serialization.ByteBufferSerializer"
                    }
                }
            }
        ],
        "ruleDisabled": false,
        "awsIotSqlVersion": "2016-03-23"
    }
}

After publishing messages using the AWS IoT Test client to my rule, I can see a new entry in the Conflunent.cloud dashboard under Data Integration -> Clients with Client id : aws-iot-rules-engine-a1921787-6654-42aa-9a67-00f9...

If you have things you want me to test in my environment, let me know.

profile pictureAWS
專家
Jan_B
已回答 2 年前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南