Como me conecto ao meu cluster do Amazon MSK usando o Kafka-Kinesis-Connector?

4 minuto de leitura
0

Quando tento usar o Kafka-Kinesis-Connector para me conectar ao Amazon Managed Streaming for Apache Kafka (Amazon MSK), recebo uma mensagem de erro.

Breve descrição

Pré-requisitos:

  • Você tem uma assinatura ativa da AWS.
  • Uma nuvem privada virtual (VPC) visível tanto para a máquina cliente quanto para o cluster do MSK. O cluster e o cliente do MSK devem residir na mesma VPC.
  • Você tem conectividade com os servidores Amazon MSK e Apache Zookeeper.
  • Há duas sub-redes associadas a sua VPC.
  • Tópicos foram criados no MSK para enviar e receber mensagens do servidor.

Resolução

Crie seu arquivo de projeto

  1. Para baixar o Kafka-Kinesis-Connector, clone o projeto kafka-kinesis-connector no site do GitHub.
  2. Para criar o arquivo amazon-kinesis-kafka-connector-X.X.X.jar no diretório de destino, execute o comando mvn package:
    [ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package..
    ......
    
    [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 28.822 s
    [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M
    [INFO] ------------------------------------------------------------------------
    O Kafka-Kinesis-Connector procura credenciais na seguinte ordem: variáveis de ambiente, propriedades do sistema java e o arquivo de perfil de credenciais.
  3. Para atualizar sua configuração para a definição DefaultAWSCredentailsProviderChain, execute o seguinte comando:
    [ec2-user@ip-10-0-0-71 target]$ aws configure
    Esse comando garante que a chave de acesso anexada ao usuário do AWS Identity and Access Management (IAM) tenha as permissões mínimas necessárias. O comando AWS Config também garante que haja uma política disponível para acessar o Amazon Kinesis Data Streams ou o Amazon Kinesis Data Firehose. Para obter mais informações sobre a configuração de credenciais da AWS, consulte Fornecer credenciais temporárias para o AWS SDK para Java.
    **Observação:**Se estiver usando um Java Development Kit (JDK), também poderá usar a classe EnvironmentVariableCredentialsProvider para fornecer credenciais.
  4. Se você usa o Kinesis Data Streams, atualize sua política para que seja semelhante ao exemplo a seguir:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Sid": "Stmt123",
              "Effect": "Allow",
              "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:PutRecord",
                   "kinesis:PutRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:RegisterStreamConsumer"
              ],
              "Resource": [
                   "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
              ]
         }]
    }
    Se você usa o Kinesis Data Firehose, atualize sua política para se parecer com o exemplo a seguir:
    {
         "Version": "2012-10-17",
         "Statement": [{
              "Effect": "Allow",
              "Action": [
                   "firehose:DeleteDeliveryStream",
                   "firehose:PutRecord",
                   "firehose:PutRecordBatch",
                   "firehose:UpdateDestination"
              ],
              "Resource": [
                   "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
              ]
         }]
    }
    Para mais informações sobre as configurações do fluxo de entrega do Kinesis Data Firehose, consulte Configuration and credential file settings.

Configurar o conector

**Observação:**Você pode configurar o Kafka-Kinesis-Connector para publicar mensagens do MSK. Você pode publicar mensagens nos seguintes destinos: Amazon Simple Storage Service (Amazon S3), Amazon Redshift ou Amazon OpenSearch Service.

  1. Se estiver configurando o Kinesis Data Streams, poderá configurar o conector com os seguintes valores:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    streamName=YOUR_STREAM_NAME
    usePartitionAsHashKey=false
    flushSync=true
    # Use new Kinesis Producer for each Partition
    singleKinesisProducerPerPartition=true
    # Whether to block new records from putting onto Kinesis Producer if
    # threshold for outstanding records have reached
    pauseConsumption=true
    outstandingRecordsThreshold=500000
    # If outstanding records on producers are beyond threshold sleep for following period (in ms)
    sleepPeriod=1000
    # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
    sleepCycles=10
    # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
    # All kinesis producer configuration have not been exposed
    maxBufferedTime=1500
    maxConnections=1
    rateLimit=100
    ttl=60000
    metricsLevel=detailed
    metricsGranuality=shard
    metricsNameSpace=KafkaKinesisStreamsConnector
    aggregation=true

    Se estiver configurando um tipo diferente de stream, configure as propriedades do stream de entrega do Kinesis Data Firehose com os seguintes valores:

    name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
    tasks.max=1
    topics=YOUR_TOPIC_NAME
    region=us-east-1
    batch=true
    batchSize=500
    batchSizeInBytes=3670016
    deliveryStream=YOUR_DELIVERY_STREAM_NAME
  2. Configure as propriedades do trabalhador para o modo autônomo ou distribuído:

    bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.value.converter=org.apache.kafka.connect.storage.StringConverter
    #internal.key.converter=org.apache.kafka.connect.storage.StringConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    internal.key.converter.schemas.enable=true
    internal.value.converter.schemas.enable=true
    offset.storage.file.filename=offset.log

    Para mais informações sobre o modo autônomo ou distribuído do Kafka-Kinesis-Connector, consulte Kafka Connect no site da Apache.

  3. Copie o arquivo amazon-kinesis-kafka-connector-0.0.X.jar para o seu diretório e exporte o classpath.
    Observação: Você também pode adicionar o arquivo amazon-kinesis-kafka-connector-0.0.X.jar ao diretório JAVA_HOME/lib/ext.

  4. Execute o kafka-kinesis-connector usando a seguinte sintaxe de comando:

    [ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/
    
    kinesis-kafka-streams-connecter.properties

Informações relacionadas

Criar um cluster do Amazon MSK

AWS OFICIAL
AWS OFICIALAtualizada há um ano