¿Cómo me conecto a mi clúster de Amazon MSK mediante el Kafka-Kinesis-Connector?

4 minutos de lectura
0

Cuando intento usar el Kafka-Kinesis-Connector para conectarme a Amazon Managed Streaming para Apache Kafka (Amazon MSK), recibo un mensaje de error. ¿Cómo me conecto a mi clúster de Amazon MSK mediante el Kafka-Kinesis-Connector?

Breve descripción

Para conectarse a su clúster de MSK mediante el Kafka-Kinesis-Connector, su configuración debe cumplir los siguientes requisitos:

  • Una suscripción de AWS activa.
  • Una nube virtual privada (VPC) que sea visible tanto para la máquina cliente como para el clúster de MSK. El clúster de MSK y el cliente deben residir en la misma VPC.
  • Conectividad a los servidores MSK y Apache Zookeeper.
  • Dos subredes asociadas a su VPC.
  • Temas creados en MSK para enviar y recibir mensajes desde el servidor.

Resolución

Crear el archivo de su proyecto

1.    Clone el proyecto kafka-kinesis-connector para descargar el Kafka-Kinesis-Connector.

2.    Utilice el comando mvn package para crear el archivo amazon-kinesis-kafka-connector-X.X.X.jar en el directorio de destino:

[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package
..
......

[INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.822 s
[INFO] Finished at: 2020-02-19T13:01:31Z
[INFO] Final Memory: 26M/66M
[INFO] ------------------------------------------------------------------------

El Kafka-Kinesis-Connector busca las credenciales en el siguiente orden: variables de entorno, propiedades del sistema java y el archivo de perfil de credenciales.

3.    Actualice su configuración con los ajustes DefaultAWSCredentailsProviderChain:

[ec2-user@ip-10-0-0-71 target]$ aws configure

Este comando garantiza que la clave de acceso adjunta al usuario de AWS Identity and Access Management (IAM) tenga los permisos mínimos requeridos. El comando aws configure también garantiza que haya una política disponible para acceder a Amazon Kinesis Data Streams o a Amazon Kinesis Data Firehose. Para obtener más información sobre cómo configurar las credenciales de AWS, consulte Trabajar con credenciales de AWS.

Nota: Si utiliza un kit de desarrollo de Java (JDK), también puede usar la clase EnvironmentVariableCredentialsProvider para proporcionar credenciales.

4.    Si utiliza Kinesis Data Streams, entonces actualice su política de la siguiente manera:

{
     "Version": "2012-10-17",
     "Statement": [{
          "Sid": "Stmt123",
          "Effect": "Allow",
          "Action": [
               "kinesis:DescribeStream",
               "kinesis:PutRecord",
               "kinesis:PutRecords",
               "kinesis:GetShardIterator",
               "kinesis:GetRecords",
               "kinesis:ListShards",
               "kinesis:DescribeStreamSummary",
               "kinesis:RegisterStreamConsumer"
          ],
          "Resource": [
               "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
          ]
     }]
}

Si utiliza Kinesis Data Firehose, actualice su política para que se parezca al siguiente ejemplo:

{
     "Version": "2012-10-17",
     "Statement": [{
          "Effect": "Allow",
          "Action": [
               "firehose:DeleteDeliveryStream",
               "firehose:PutRecord",
               "firehose:PutRecordBatch",
               "firehose:UpdateDestination"
          ],
          "Resource": [
               "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
          ]
     }]
}

Para obtener más información sobre la configuración de la secuencia de entrega de Kinesis Data Firehose, consulte Ajustes de la configuración y del archivo de credenciales.

Configuración del conector

Nota: Puede configurar el Kafka-Kinesis-Connector para publicar mensajes desde MSK. Los mensajes se pueden publicar en los siguientes destinos: Amazon Simple Storage Service (Amazon S3), Amazon Redshift o Amazon OpenSearch Service.

1.    Si está configurando Kinesis Data Streams, puede configurar el conector con los siguientes valores:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
streamName=YOUR_STREAM_NAME
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if
# threshold for outstanding records have reached
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms)
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregation=true

-o-

Si está configurando un tipo diferente de transmisión, configure las propiedades de la secuencia de entrega de Kinesis Data Firehose de la siguiente manera:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
batch=true
batchSize=500
batchSizeInBytes=3670016
deliveryStream=YOUR_DELIVERY_STREAM_NAME

2.    Configure las propiedades del trabajador para el modo independiente o distribuido:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=offset.log

Para obtener más información sobre el modo independiente o el modo distribuido de Kafka-Kinesis-Connector, consulte Kafka Connect en el sitio web de Apache.

3.    Copie el archivo amazon-kinesis-kafka-connector-0.0.X.jar a su directorio y exporte classpath.

Nota: También puede añadir el archivo amazon-kinesis-kafka-connector-0.0.X.jar al directorio JAVA_HOME/lib/ext.

4.    Ejecute kafka-kinesis-connector mediante la siguiente sintaxis de comandos:

[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

kinesis-kafka-streams-connecter.properties

Información relacionada

Creación de un clúster de Amazon MSK

OFICIAL DE AWS
OFICIAL DE AWSActualizada hace 3 años