By using AWS re:Post, you agree to the Terms of Use
/Amazon Managed Streaming for Apache Kafka (Amazon MSK)/

Questions tagged with Amazon Managed Streaming for Apache Kafka (Amazon MSK)

Sort by most recent
  • 1
  • 90 / page

Browse through the questions and answers listed below or filter and sort to narrow down your results.

MSK Connect timed out while

Hi there, I setup MSK connect to push topics from my MSK cluster to the s3 bucket with lenses Plugin. The connector is running at the moment but I got the below error on cloudwatch logs. May I ask what the possible cause is? `Resetting offset for partition telemetry-dev-2 to position FetchPosition{offset=736334, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-2.non-prod-cluster.xjgva0.c3.kafka.ap-southeast-1.amazonaws.com:9092 (id: 2 rack: apse1-az2)], epoch=6}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:396) [Worker-0fb7a8004d7427620] [2022-03-28 11:25:07,748] INFO [lenseio-msk-non-prod-trans|task-1] [Consumer clientId=connector-consumer-lenseio-msk-non-prod-trans-1, groupId=connect-lenseio-msk-non-prod-trans] Resetting offset for partition telemetry-dev-2 to position FetchPosition{offset=736334, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-2.non-prod-cluster.xjgva0.c3.kafka.ap-southeast-1.amazonaws.com:9092 (id: 2 rack: apse1-az2)], epoch=6}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:396)` followed by another log as follows `sending LeaveGroup request to coordinator *******.kafka.ap-southeast-1.amazonaws.com:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired.` And below is the cluster configuration ``` auto.create.topics.enable=true default.replication.factor=2 min.insync.replicas=1 num.io.threads=8 num.network.threads=5 num.partitions=2 num.replica.fetchers=2 replica.lag.time.max.ms=30000 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 socket.send.buffer.bytes=102400 unclean.leader.election.enable=true zookeeper.session.timeout.ms=18000 log.retention.hours=-1```
0
answers
0
votes
0
views
asked 2 months ago

MSK Custom Configuration using Cloudformation

Hi AWS Users, I am trying to spin up a MSK cluster with a custom MSK configuration using my serverless app. I wrote the cloudformation template for the generation of the MSK Cluster and was able to successfully bring it up. I recently saw that AWS added cloudformation template of `AWS::MSK::Configuration`. [1] I was trying that out to create a custom configuration. The Configuration requires a `ServerProperties`key that is usually a PlainText in AWS console. An example of Server Properties: ``` auto.create.topics.enable=true default.replication.factor=2 min.insync.replicas=2 num.io.threads=8 num.network.threads=5 num.partitions=10 num.replica.fetchers=2 replica.lag.time.max.ms=30000 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 socket.send.buffer.bytes=102400 unclean.leader.election.enable=true zookeeper.session.timeout.ms=18000 ``` `AWS::MSK::Configuration` accepts base64 (api functionality) and I have been trying to implement this. I am using the cloudformation `Fn::Base64` functionality. e.g: ``` Resources: ServerlessMSKConfiguration: Type: AWS::MSK::Configuration Properties: ServerProperties: Fn::Base64: auto.create.topics.enable=true ``` This gives me back a 400 error during deploy. ``` Resource handler returned message: "[ClientRequestToken: xxxxx] Invalid request body (Service: Kafka, Status Code: 400, Request ID: 1139d840-c02d-4fdb-b68c-cee93673d89d, Extended Request ID: null)" (RequestToken: xxxx HandlerErrorCode: InvalidRequest) ``` Can someone please help me format this ServerProperties properly, not sure how to give the proper base64 string in the template. Any help is much appreciated. [1] - [https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-configuration.html](MSK::Configuration)
0
answers
0
votes
5
views
asked 2 months ago

MSK Connect Debezium Connector using AWS Glue Schema Registry for AVRO serialization

We are trying to integrate change data capture into our MSK cluster using the Debezium SQL Server plugin with AVRO serialization and AWS Glue Schema Registry. The connector appears to be working correctly as we can see the messages flowing into the Kafka topic and the AVRO schema is registered into our schema registry. However, when we attempt to deserialize the message from Kafka in a lambda function (MSK trigger), we are getting an error saying that the Schema cannot be found. We are using [this](https://github.com/awslabs/aws-glue-schema-registry) library to do the AVRO serializaton/deserialization in both the connector and our lambda function. We've added some additional logging to the library and have found that the schema version id that gets written into the message during serialization is different than the one that gets pulled out of the message during deserialization. Has anyone successfully used Debezium with MSK connect and AWS Glue Schema registry? Pertinent configuration for our MSK connector is provided below: ``` "key.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "false", "key.converter.avroRecordType": "GENERIC_RECORD", "key.converter.region": "AWS_REGION", "key.converter.registry.name": "SCHEMA_REGISTRY_NAME", "key.converter.schemaAutoRegistrationEnabled": "true", "value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter", "value.converter.schemas.enable": "true", "value.converter.avroRecordType": "GENERIC_RECORD", "value.converter.region": "AWS_REGION", "value.converter.registry.name": "SCHEMA_REGISTRY_NAME", "value.converter.schemaAutoRegistrationEnabled": "true", ```
0
answers
0
votes
4
views
asked 3 months ago

Can MSK Connect be made rack-aware?

We have a Kafka cluster in Amazon MSK that has 3 brokers in different availability zones of the same region. I want to set up a Kafka Connect connector that backs up all data from our Kafka brokers to Amazon S3, and I'm trying to do it with MSK Connect. I set up Confluent's S3 Sink Connector on MSK Connect and it works - everything is uploaded to S3 as expected. The problem is that it costs a fortune in data transfer charges - our AWS bills for MSK nearly double whenever the connector is running, with `EU-DataTransfer-Regional-Bytes` accounting for the entire increase. It seems that the connector is pulling messages from all three of our brokers, i.e. from three different AZs, and so we're getting billed for inter-AZ data transfer. This makes sense because by default it will read a partition from that partition's leader, which could be any of the three brokers. But if we were creating a normal consumer, not a connector, it would be possible to restrict the consumer to read all partitions from a specific broker: ``` "client.rack" : "euw1-az3" ``` ☝️ For a consumer in the euw1-az3 AZ, this setting makes the consumer read all partitions from the local broker, regardless of the partitions' leader - which avoids the need for inter-AZ data transfer and brings the bills down massively. My question is, is it possible to do something similar for a Kafka Connector? What config setting do I have to pass to the connector, or the worker, to make it only read from one specific broker/AZ? Is this possible with MSK Connect?
1
answers
0
votes
6
views
asked 4 months ago

MSK Connect on t3.small fails due to not-retryable SaslAuthenticationException - reconnect.backoff.ms worker configuration will not help - can AWS remove the connection limit?

Hello, we are encountering the same issues as e.g. https://github.com/aws/aws-msk-iam-auth/issues/28 regarding the `SaslAuthenticationException` while using MSK Connect with a kafka.t3.small instance. Setting `reconnect.backoff.ms` to e.g. 10000 ms will not resolve the issue, since the exception that is being thrown (`SaslAuthenticationException`) is not retryable (see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L808) and ultimately leads to a new creation of a client, not a reconnect. When would the reconnect take place? As I went through the implementation, what I see is: 1. that `startConnect()` in `ConnectDistributed` is calling the constructor of `Worker` 2. the constructor of `Worker` calls `ConnectUtils.lookupKafkaClusterId(config)` 3. that method calls `Admin.create(config.originals())` - which opens up a new connection 4. if you follow the calls from there, you will see that you end up not retrying upon obtaining `SaslAuthenticationException` (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L808) Even if the retry would work, several AdminClients are created, which all connect to the MSK cluster. Since this is not a reconnect, `reconnect.backoff.ms` settings cannot work for remediation. There is no mechanism in the Kafka code that would globally allow restricting these connections to happen only every x seconds. Unless I oversee something, MSK Connect should only work by chance with t3.small instances. This forces us to either: * not use IAM and go for SASL/SCRAM * use a kafka.m5.large instance and go from about 32 USD/Month to 151 USD/Month per instance - meaning 90 USD vs 450 USD in our case The limitation on the t3.small instance really limits what we want to achieve. The workaround presented [here](https://aws.amazon.com/premiumsupport/knowledge-center/msk-connector-connect-errors/) is not working and thus forces us to buy the larger instance. We have no need for a large instance and we don't want to have additional costs for simply using IAM for MSK Connect. **Can AWS remove the limit on the t3.small instance or present a different workaround? That would be great :) ** I cannot open a support case for this, since we don't have the required subscription and I believe that this could be of general interest. See parts of our logs using AWS MSK Connect: ```` [Worker-05ea3408948fa0a4c] [2022-01-01 22:41:53,059] INFO Creating Kafka admin client (org.apache.kafka.connect.util.ConnectUtils:49) [Worker-05ea3408948fa0a4c] [2022-01-01 22:41:53,061] INFO AdminClientConfig values: ... [Worker-05ea3408948fa0a4c] reconnect.backoff.max.ms = 10000 [Worker-05ea3408948fa0a4c] reconnect.backoff.ms = 10000 [Worker-05ea3408948fa0a4c] request.timeout.ms = 30000 [Worker-05ea3408948fa0a4c] retries = 2147483647 [Worker-05ea3408948fa0a4c] retry.backoff.ms = 10000 ... [Worker-05ea3408948fa0a4c] [2022-01-01 22:41:54,269] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:86) [Worker-05ea3408948fa0a4c] org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties. [Worker-05ea3408948fa0a4c] at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70) [Worker-05ea3408948fa0a4c] at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51) [Worker-05ea3408948fa0a4c] at org.apache.kafka.connect.runtime.Worker.<init>(Worker.java:140) [Worker-05ea3408948fa0a4c] at org.apache.kafka.connect.runtime.Worker.<init>(Worker.java:127) [Worker-05ea3408948fa0a4c] at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:118) [Worker-05ea3408948fa0a4c] at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80) [Worker-05ea3408948fa0a4c] Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: [e4afe53f-73b5-4b94-9ac3-30d737071e56]: Too many connects [Worker-05ea3408948fa0a4c] at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) [Worker-05ea3408948fa0a4c] at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) [Worker-05ea3408948fa0a4c] at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) [Worker-05ea3408948fa0a4c] at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) [Worker-05ea3408948fa0a4c] at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64) [Worker-05ea3408948fa0a4c] ... 5 more [Worker-05ea3408948fa0a4c] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: [e4afe53f-73b5-4b94-9ac3-30d737071e56]: Too many connects [Worker-05ea3408948fa0a4c] [2022-01-01 22:41:54,281] INFO Stopped http_0.0.0.08083@68631b1d{HTTP/1.1, (http/1.1)}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:381) [Worker-05ea3408948fa0a4c] [2022-01-01 22:41:54,283] INFO Stopped https_0.0.0.08443@611d0763{SSL, (ssl, http/1.1)}{0.0.0.0:8443} (org.eclipse.jetty.server.AbstractConnector:381) [Worker-05ea3408948fa0a4c] MSK Connect encountered errors and failed. ````
1
answers
4
votes
52
views
asked 5 months ago

MSK cluster timing out

I reported this before: We try to migrate to MSK, but at the point where I have some low load on the cluster representing the _idle_ state of our application with just a few messages per _minute_ total coming from about 30 clients the MSK brokers just start to fail with timeouts. ARN of the cluster: arn:aws:kafka:eu-west-1:499577160181:cluster/eks-20190111-production-201902-20190708/6e3e6674-9c9b-40bb-98b4-506edf76a6b2-4 Output from `kafka-broker-api-versions`: ``` b-1.....kafka.eu-west-1.amazonaws.com:9092 (id: 1 rack: subnet-0e2b1a418025f9b10) -> ERROR: org.apache.kafka.common.errors.DisconnectException b-3.....kafka.eu-west-1.amazonaws.com:9092 (id: 3 rack: subnet-000880b697986a0d8) -> ERROR: org.apache.kafka.common.errors.DisconnectException b-2.....kafka.eu-west-1.amazonaws.com:9092 (id: 2 rack: subnet-0f50b40038ca2bf62) -> ( Produce(0): 0 to 7 [usable: 7], Fetch(1): 0 to 10 [usable: 10], ListOffsets(2): 0 to 4 [usable: 4], Metadata(3): 0 to 7 [usable: 7], LeaderAndIsr(4): 0 to 1 [usable: 1], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 4], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 6 [usable: 6], OffsetFetch(9): 0 to 5 [usable: 5], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 3 [usable: 3], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 2], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 [usable: 0], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1] ) ``` I do not see anything particular in the CloudWatch metrics, except that network traffic went up when I started my services, and went down when presumably the timeouts started.
7
answers
0
votes
20
views
asked 3 years ago

New clusters have "BootstrapBrokerStringTls"

I'm trying to create MSK clusters, and since yesterday somewhen early afternoon in CEST a newly created cluster no longer has a "BootstrapBrokerString", but rather `aws kafka get-bootstrap-brokers` returns a response with only "BootstrapBrokerStringTls". This is clearly an unexpected API change, in a GA product. I would have expected that any move towards TLS-support (yeah! great! awesome!) would be announced, and would not affect existing documented things. Switching to TLS will be quite some work, so right now I rather would _not_ want to do that. How can I get back to the previous behavior? EDIT: I'm also looking at the AWS console now. The cluste says "TLS client authentication" is "Disabled", and enryption in transit between clients and brokers is "Only TLS encrypted traffic allowed". So I guess that makes sense that the client information only returns "TLS" entries. I looked at bit further, and it seems that the API behavior indeed changed, and the TLS options appeared. The default is said to be "TLS/Plaintext" at https://aws.amazon.com/msk/faqs/ (which probably would still produce a `BootstrapBrokerStringTls`), the actual default I saw looks like "TLS" though. I'm now trying to adapt at least my creation scripts to explicitly configure ClientBroker encryption as 'PLAINTEXT', and then will have to work out how to move towards a "both" situation. Edited by: ankon on Jun 21, 2019 11:48 AM: Added information for console output, and my next steps. Edited by: ankon on Jun 21, 2019 12:11 PM: Updated with more information from documentation where I could find references to the change.
3
answers
0
votes
0
views
asked 3 years ago
  • 1
  • 90 / page