MSK Connect not authenticating

0

Hi all,

I'm having an "Access Denied" error when using the Splunk Connect to Kafka to send data from my AWS MSK cluster to a custom built Splunk application running on an EC2 instance.

Using: splunk-kafka-connect-v209 downloading from Splunk

Worker config:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.flush.interval.ms=10000

Connector config:

connector.class=com.splunk.kafka.connect.SplunkSinkConnector
splunk.hec.raw=true
splunk.hec.ssl.validate.certs=false
topics=msk-serverless-tutorial
tasks.max=1
splunk.hec.ack.enabled=false
splunk.indexes=<Splunk index name>
splunk.hec.token=<Splunk HEC token>
splunk.hec.uri=https://<Splunk ec2 ip address>:8088

IAM role policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka:*",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeRouteTables",
                "ec2:DescribeVpcEndpoints",
                "ec2:DescribeVpcAttribute",
                "kms:DescribeKey",
                "kms:CreateGrant",
                "logs:CreateLogDelivery",
                "logs:GetLogDelivery",
                "logs:UpdateLogDelivery",
                "logs:DeleteLogDelivery",
                "logs:ListLogDeliveries",
                "logs:PutResourcePolicy",
                "logs:DescribeResourcePolicies",
                "logs:DescribeLogGroups",
                "S3:GetBucketPolicy",
                "firehose:TagDeliveryStream"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:CreateVpcEndpoint"
            ],
            "Resource": [
                "arn:*:ec2:*:*:vpc/*",
                "arn:*:ec2:*:*:subnet/*",
                "arn:*:ec2:*:*:security-group/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:CreateVpcEndpoint"
            ],
            "Resource": [
                "arn:*:ec2:*:*:vpc-endpoint/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:RequestTag/AWSMSKManaged": "true"
                },
                "StringLike": {
                    "aws:RequestTag/ClusterArn": "*"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:CreateTags"
            ],
            "Resource": "arn:*:ec2:*:*:vpc-endpoint/*",
            "Condition": {
                "StringEquals": {
                    "ec2:CreateAction": "CreateVpcEndpoint"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DeleteVpcEndpoints"
            ],
            "Resource": "arn:*:ec2:*:*:vpc-endpoint/*",
            "Condition": {
              

Logs from the connector:

...
[Worker-08244672269d6f804] [2022-07-17 06:49:02,474] INFO Successfully logged in. (org.apache.kafka.common.security.authenticator.AbstractLogin:61)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,773] WARN The configuration 'producer.sasl.jaas.config' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,773] WARN The configuration 'group.id' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,774] WARN The configuration 'listeners.https.ssl.truststore.password' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,774] WARN The configuration 'plugin.path' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,776] WARN The configuration 'producer.sasl.client.callback.handler.class' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,776] WARN The configuration 'consumer.sasl.mechanism' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,776] WARN The configuration 'consumer.ssl.truststore.location' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,776] WARN The configuration 'rest.extension.classes' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,776] WARN The configuration 'listeners.https.ssl.key.password' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,777] WARN The configuration 'producer.ssl.truststore.location' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,777] WARN The configuration 'status.storage.replication.factor' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,777] WARN The configuration 'sasl.jaas.config' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,777] WARN The configuration 'sasl.client.callback.handler.class' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,777] WARN The configuration 'offset.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,777] WARN The configuration 'consumer.security.protocol' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,777] WARN The configuration 'rest.advertised.listener' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,778] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,778] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,778] WARN The configuration 'consumer.sasl.jaas.config' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,778] WARN The configuration 'config.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,778] WARN The configuration 'listeners' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,778] WARN The configuration 'producer.security.protocol' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,779] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,781] WARN The configuration 'status.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,781] WARN The configuration 'listeners.https.ssl.keystore.location' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,781] WARN The configuration 'listeners.https.ssl.keystore.password' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,785] WARN The configuration 'producer.sasl.mechanism' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,786] WARN The configuration 'config.storage.replication.factor' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,787] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,787] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,787] WARN The configuration 'ssl.truststore.location' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,787] WARN The configuration 'listeners.https.ssl.truststore.location' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,788] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,788] WARN The configuration 'offset.storage.replication.factor' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,788] WARN The configuration 'consumer.sasl.client.callback.handler.class' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,789] INFO Kafka version: 2.7.1 (org.apache.kafka.common.utils.AppInfoParser:119)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,789] INFO Kafka commitId: unknown (org.apache.kafka.common.utils.AppInfoParser:120)
[Worker-08244672269d6f804] [2022-07-17 06:49:02,789] INFO Kafka startTimeMs: 1658040542789 (org.apache.kafka.common.utils.AppInfoParser:121)
[Worker-08244672269d6f804] [2022-07-17 06:49:05,478] INFO [AdminClient clientId=adminclient-1] Failed authentication with <bootstrap cluster url>/INTERNAL_IP ([4c85d6b5-7f33-451a-b6d3-a49218c6f3ff]: Access denied) (org.apache.kafka.common.network.Selector:616)
[Worker-08244672269d6f804] [2022-07-17 06:49:05,482] ERROR [AdminClient clientId=adminclient-1] Connection to node -1 (<bootstrap cluster url>/INTERNAL_IP) failed authentication due to: [4c85d6b5-7f33-451a-b6d3-a49218c6f3ff]: Access denied (org.apache.kafka.clients.NetworkClient:771)
[Worker-08244672269d6f804] [2022-07-17 06:49:05,483] WARN [AdminClient clientId=adminclient-1] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager:232)
[Worker-08244672269d6f804] org.apache.kafka.common.errors.SaslAuthenticationException: [4c85d6b5-7f33-451a-b6d3-a49218c6f3ff]: Access denied
[Worker-08244672269d6f804] [2022-07-17 06:49:05,497] INFO App info kafka.admin.client for adminclient-1 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[Worker-08244672269d6f804] [2022-07-17 06:49:05,497] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
[Worker-08244672269d6f804] org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1658040572795, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
[Worker-08244672269d6f804] Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: fetchMetadata
[Worker-08244672269d6f804] [2022-07-17 06:49:05,498] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
[Worker-08244672269d6f804] org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1658040575485, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
...
[Worker-08dcfd7ddef0e8ded] [2022-07-17 06:43:49,802] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[Worker-08dcfd7ddef0e8ded] [2022-07-17 06:43:49,803] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[Worker-08dcfd7ddef0e8ded] [2022-07-17 06:43:49,803] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[Worker-08dcfd7ddef0e8ded] [2022-07-17 06:43:49,805] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:86)
[Worker-08dcfd7ddef0e8ded] org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
[Worker-08dcfd7ddef0e8ded] 	at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
[Worker-08dcfd7ddef0e8ded] 	at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
[Worker-08dcfd7ddef0e8ded] 	at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:97)
[Worker-08dcfd7ddef0e8ded] 	at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)
[Worker-08dcfd7ddef0e8ded] Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: [e031d219-c0dd-497b-b176-a87da3b17d8a]: Access denied
[Worker-08dcfd7ddef0e8ded] 	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
[Worker-08dcfd7ddef0e8ded] 	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
[Worker-08dcfd7ddef0e8ded] 	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
[Worker-08dcfd7ddef0e8ded] 	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
[Worker-08dcfd7ddef0e8ded] 	at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
[Worker-08dcfd7ddef0e8ded] 	... 3 more
[Worker-08dcfd7ddef0e8ded] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: [e031d219-c0dd-497b-b176-a87da3b17d8a]: Access denied
[Worker-08dcfd7ddef0e8ded] MSK Connect encountered errors and failed.
...
已提问 2 年前3117 查看次数
1 回答
0

Is your MSK cluster public or within a VPC? Make sure you have reviewed the documentation sub-sections under this - https://docs.aws.amazon.com/msk/latest/developerguide/client-access.html. Have you ensured that the security group of the EC2 machine where your Splunk is running is allowed access to the right ports in the security group of your MSK cluster. Also look at the "Unable to access cluster ...." sections in this troubleshooting guide - https://docs.aws.amazon.com/msk/latest/developerguide/troubleshooting.html

Also have you attached the role that allows access to kafka to the EC2 machine where your Splunk is running?

profile pictureAWS
专家
已回答 2 年前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则