如何使用 Kafka-Kinesis-Connector 連線至我的 Amazon MSK 叢集?

2 分的閱讀內容
0

當我嘗試使用 Kafka-Kinesis-Connector 與 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 連線時,我收到錯誤訊息。

簡短說明

先決條件:

  • 您擁有有效的 AWS 訂閱。
  • 您擁有對用戶端機器和 MSK 叢集都可見的虛擬私有雲端 (VPC)。MSK 叢集和用戶端必須位於相同的 VPC 中。
  • 您可以連接到 Amazon MSK 與 Apache Zookeeper 伺服器。
  • 有兩個與您的 VPC 相關聯的子網路。
  • 在 MSK 中建立了用於向伺服器傳送以及從中接收訊息的主題。

解決方法

建置您的專案檔案

  1. 若要下載 Kafka-Kinesis-Connector,請從 GitHub 網站克隆 kafka-kinesis-connector 專案。
  2. 若要在目標目錄中建置 amazon-kinesis-kafka-connector-X.X.X.jar 檔案,請執行 mvn package 命令:
    [ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package..
    ......
    
    [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 28.822 s
    [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M
    [INFO] ------------------------------------------------------------------------
    Kafka-Kinesis-Connector 會依下列順序尋找憑證:環境變數、Java 系統屬性和憑證設定檔。
  3. 若要將您的組態更新為 DefaultAWSCredentailsProviderChain 設定,請執行下列命令:
    [ec2-user@ip-10-0-0-71 target]$ aws configure
    上述命令可確保連接至 AWS Identity and Access Management (IAM) 使用者的存取金鑰具有最低的必要權限。aws configure 命令也可確保有可用於存取 Amazon Kinesis Data Streams 或 Amazon Kinesis Data Firehose 的政策。如需設定 AWS 憑證的詳細資訊,請參閱為適用於 Java 的 AWS SDK 提供臨時憑證
    **注意:**如果您使用 Java 開發套件 (JDK),也可以使用 EnvironmentVariableCredentialsProvider 類別來提供憑證。
  4. 如果您使用 Kinesis Data Streams,請將政策更新為類似以下範例的內容:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Sid": "Stmt123",
              "Effect": "Allow",
              "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:PutRecord",
                   "kinesis:PutRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:RegisterStreamConsumer"
              ],
              "Resource": [
                   "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
              ]
         }]
    }
    如果您使用 Kinesis Data Firehose,請將政策更新為類似以下範例的內容:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Effect": "Allow",
              "Action": [
                   "firehose:DeleteDeliveryStream",
                   "firehose:PutRecord",
                   "firehose:PutRecordBatch",
                   "firehose:UpdateDestination"
              ],
              "Resource": [
                   "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
              ]
         }]
    }
    如需 Kinesis Data Firehose 交付串流設定的詳細資訊,請參閱組態與憑證檔案設定

設定連接器

**注意:**您可以設定 Kafka-Kinesis-Connector 以從 MSK 發佈訊息。您可以將訊息發佈到下列目的地: Amazon Simple Storage Service (Amazon S3)、Amazon Redshift 或 Amazon OpenSearch Service。

  1. 如果您要設定 Kinesis Data Streams,則可以使用下列值來設定連接器:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    streamName=YOUR_STREAM_NAME
    usePartitionAsHashKey=false
    flushSync=true
    # Use new Kinesis Producer for each Partition
    singleKinesisProducerPerPartition=true
    # Whether to block new records from putting onto Kinesis Producer if
    # threshold for outstanding records have reached
    pauseConsumption=true
    outstandingRecordsThreshold=500000
    # If outstanding records on producers are beyond threshold sleep for following period (in ms)
    sleepPeriod=1000
    # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
    sleepCycles=10
    # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
    # All kinesis producer configuration have not been exposed
    maxBufferedTime=1500
    maxConnections=1
    rateLimit=100
    ttl=60000
    metricsLevel=detailed
    metricsGranuality=shard
    metricsNameSpace=KafkaKinesisStreamsConnector
    aggregation=true

    如果您要設定不同類型的串流,請使用下列值來設定 Kinesis Data Firehose 傳遞串流屬性:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    batch=true
    batchSize=500
    batchSizeInBytes=3670016
    deliveryStream=YOUR_DELIVERY_STREAM_NAME
  2. 設定獨立分散式模式的 Worker 屬性:

    bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.key.converter=org.apache.kafka.connect.storage.StringConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    internal.key.converter.schemas.enable=true
    internal.value.converter.schemas.enable=true
    offset.storage.file.filename=offset.log

    如需 Kafka-Kinesis-Connector 的獨立分散式模式的詳細資訊,請參閱 Apache 網站上的 Kafka Connect

  3. amazon-kinesis-kafka-connector-0.0.X.jar 檔案複製到您的目錄並匯出類別路徑
    **注意:**您也可以將 amazon-kinesis-kafka-connector-0.0.X.jar 檔案新增至 JAVA_HOME/lib/ext 目錄。

  4. 若要執行 Kafka-Kinesis-Connector,請使用下列命令語法:

    [ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    
    kinesis-kafka-streams-connecter.properties

相關資訊

建立 Amazon MS 叢集

AWS 官方
AWS 官方已更新 1 年前