如何使用 Kafka-Kinesis-Connector 连接我的 Amazon MSK 集群?

2 分钟阅读
0

当我尝试使用 Kafka-Kinesis-Connector 连接 Amazon Managed Streaming for Apache Kafka(Amazon MSK)时,我收到一条错误消息。

简短描述

先决条件:

  • 您有有效的 AWS 订阅。
  • 您的虚拟私有云(VPC)对客户端计算机和 MSK 集群均可见。MSK 集群和客户端必须位于同一 VPC 中。
  • 您可以连接到 Amazon MSK 和 Apache Zookeeper 服务器。
  • 有两个子网与您的 VPC 关联。
  • 您在 MSK 中创建了主题来发送和接收来自服务器的消息。

解决方法

生成您的项目文件

  1. 要下载 Kafka-Kinesis-Connector,请从 GitHub 网站克隆 kafka-kinesis-Connector 项目
  2. 要在目标目录中构建 Amazon-kinesis-Kafka-Connector-X.x.x.jar 文件,请运行 mvn package 命令:
    [ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package..
    ......
    
    [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 28.822 s
    [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M
    [INFO] ------------------------------------------------------------------------
    Kafka-Kinesis-Connector 按以下顺序查找凭证:环境变量、Java 系统属性和凭证配置文件文件。
  3. 要将您的配置更新为 DefaultAWSCredentailsProviderChain 设置,请运行以下命令:
    [ec2-user@ip-10-0-0-71 target]$ aws configure
    前面的命令确保附加到 AWS 身份和访问管理(IAM)用户的访问密钥具有所需的最低权限。AWS Config 命令还确保有可用的策略来访问 Amazon Kinesis Data Streams 或 Amazon Kinesis Data Firehose。有关设置 AWS 凭证的更多信息,请参阅为适用于 Java 的 AWS SDK 提供临时凭证
    **注意:**如果您使用 Java 开发套件(JDK),那么您也可以使用 EnvironmentVariableCredentialsProvider 类来提供凭证。
  4. 如果您使用 Kinesis 数据流,请更新您的策略,使其与以下示例类似:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Sid": "Stmt123",
              "Effect": "Allow",
              "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:PutRecord",
                   "kinesis:PutRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:RegisterStreamConsumer"
              ],
              "Resource": [
                   "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
              ]
         }]
    }
    如果您使用 Kinesis Data Firehose,请更新您的政策,使其与以下示例类似:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Effect": "Allow",
              "Action": [
                   "firehose:DeleteDeliveryStream",
                   "firehose:PutRecord",
                   "firehose:PutRecordBatch",
                   "firehose:UpdateDestination"
              ],
              "Resource": [
                   "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
              ]
         }]
    }
    有关 Kinesis Data Firehose 传输流设置的更多信息,请参阅Configuration and credential file settings

配置连接器

**注意:**您可以将 Kafka-Kinesis-Connector 配置为从 MSK 发布消息。您可以将消息发布到以下目的地: mazon Simple Storage Service(Amazon S3)、Amazon Redshift 或 Amazon OpenSearch Service。

  1. 如果您正在设置 Kinesis 数据流,请使用以下值配置连接器:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    streamName=YOUR_STREAM_NAME
    usePartitionAsHashKey=false
    flushSync=true
    # Use new Kinesis Producer for each Partition
    singleKinesisProducerPerPartition=true
    # Whether to block new records from putting onto Kinesis Producer if
    # threshold for outstanding records have reached
    pauseConsumption=true
    outstandingRecordsThreshold=500000
    # If outstanding records on producers are beyond threshold sleep for following period (in ms)
    sleepPeriod=1000
    # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
    sleepCycles=10
    # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
    # All kinesis producer configuration have not been exposed
    maxBufferedTime=1500
    maxConnections=1
    rateLimit=100
    ttl=60000
    metricsLevel=detailed
    metricsGranuality=shard
    metricsNameSpace=KafkaKinesisStreamsConnector
    aggregation=true

    如果您要设置其他类型的直播,请使用以下值配置 Kinesis Data Firehose 传输流属性:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    batch=true
    batchSize=500
    batchSizeInBytes=3670016
    deliveryStream=YOUR_DELIVERY_STREAM_NAME
  2. 独立分布式模式配置工作器属性:

    bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.key.converter=org.apache.kafka.connect.storage.StringConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    internal.key.converter.schemas.enable=true
    internal.value.converter.schemas.enable=true
    offset.storage.file.filename=offset.log

    有关 Kafka-Kinesis-Connector 的独立分布式模式的更多信息,请参阅 Apache 网站上的 Kafka Connect

  3. Amazon-Kinesis-Kafka-Connector-0.0.x.jar 文件复制到您的目录中,然后导出类路径
    **注意:**您也可以将 Amazon-Kinesis-Kafka-Connector-0.0.x.jar 文件添加到 JAVA_HOME/lib/ext 目录中。

  4. 要运行 Kafka-Kinesis-Connector,请使用以下命令语法:

    [ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    
    kinesis-kafka-streams-connecter.properties

相关信息

Creating an Amazon MSK cluster

AWS 官方
AWS 官方已更新 1 年前