¿Cómo me conecto a mi clúster de Amazon MSK mediante el Kafka-Kinesis-Connector?
Cuando intento usar el Kafka-Kinesis-Connector para conectarme a Amazon Managed Streaming para Apache Kafka (Amazon MSK), recibo un mensaje de error. ¿Cómo me conecto a mi clúster de Amazon MSK mediante el Kafka-Kinesis-Connector?
Breve descripción
Para conectarse a su clúster de MSK mediante el Kafka-Kinesis-Connector, su configuración debe cumplir los siguientes requisitos:
- Una suscripción de AWS activa.
- Una nube virtual privada (VPC) que sea visible tanto para la máquina cliente como para el clúster de MSK. El clúster de MSK y el cliente deben residir en la misma VPC.
- Conectividad a los servidores MSK y Apache Zookeeper.
- Dos subredes asociadas a su VPC.
- Temas creados en MSK para enviar y recibir mensajes desde el servidor.
Resolución
Crear el archivo de su proyecto
1. Clone el proyecto kafka-kinesis-connector para descargar el Kafka-Kinesis-Connector.
2. Utilice el comando mvn package para crear el archivo amazon-kinesis-kafka-connector-X.X.X.jar en el directorio de destino:
[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package .. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z [INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
El Kafka-Kinesis-Connector busca las credenciales en el siguiente orden: variables de entorno, propiedades del sistema java y el archivo de perfil de credenciales.
3. Actualice su configuración con los ajustes DefaultAWSCredentailsProviderChain:
[ec2-user@ip-10-0-0-71 target]$ aws configure
Este comando garantiza que la clave de acceso adjunta al usuario de AWS Identity and Access Management (IAM) tenga los permisos mínimos requeridos. El comando aws configure también garantiza que haya una política disponible para acceder a Amazon Kinesis Data Streams o a Amazon Kinesis Data Firehose. Para obtener más información sobre cómo configurar las credenciales de AWS, consulte Trabajar con credenciales de AWS.
Nota: Si utiliza un kit de desarrollo de Java (JDK), también puede usar la clase EnvironmentVariableCredentialsProvider para proporcionar credenciales.
4. Si utiliza Kinesis Data Streams, entonces actualice su política de la siguiente manera:
{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
Si utiliza Kinesis Data Firehose, actualice su política para que se parezca al siguiente ejemplo:
{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
Para obtener más información sobre la configuración de la secuencia de entrega de Kinesis Data Firehose, consulte Ajustes de la configuración y del archivo de credenciales.
Configuración del conector
Nota: Puede configurar el Kafka-Kinesis-Connector para publicar mensajes desde MSK. Los mensajes se pueden publicar en los siguientes destinos: Amazon Simple Storage Service (Amazon S3), Amazon Redshift o Amazon OpenSearch Service.
1. Si está configurando Kinesis Data Streams, puede configurar el conector con los siguientes valores:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
-o-
Si está configurando un tipo diferente de transmisión, configure las propiedades de la secuencia de entrega de Kinesis Data Firehose de la siguiente manera:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
2. Configure las propiedades del trabajador para el modo independiente o distribuido:
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
Para obtener más información sobre el modo independiente o el modo distribuido de Kafka-Kinesis-Connector, consulte Kafka Connect en el sitio web de Apache.
3. Copie el archivo amazon-kinesis-kafka-connector-0.0.X.jar a su directorio y exporte classpath.
Nota: También puede añadir el archivo amazon-kinesis-kafka-connector-0.0.X.jar al directorio JAVA_HOME/lib/ext.
4. Ejecute kafka-kinesis-connector mediante la siguiente sintaxis de comandos:
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
Información relacionada
Contenido relevante
- OFICIAL DE AWSActualizada hace un año
- OFICIAL DE AWSActualizada hace un año
- OFICIAL DE AWSActualizada hace un año
- OFICIAL DE AWSActualizada hace un año