Comment me connecter à mon cluster Amazon MSK à l'aide du connecteur Kafka-Kinesis ?
Lorsque j'essaie d'utiliser le connecteur Kafka-Kinesis pour me connecter à Amazon Managed Streaming for Apache Kafka (Amazon MSK), je reçois un message d'erreur. Comment me connecter à mon cluster Amazon MSK à l'aide du connecteur Kafka-Kinesis ?
Brève description
Pour vous connecter à votre cluster MSK à l'aide du connecteur Kafka-Kinesis, votre configuration doit répondre aux exigences suivantes :
- Un abonnement AWS actif.
- Un Virtual Private Cloud (VPC) visible à la fois de l'ordinateur client et du cluster MSK. Le cluster MSK et le client doivent résider dans le même VPC.
- Connectivité aux serveurs MSK et Apache Zookeeper.
- Deux sous-réseaux associés à votre VPC.
- Rubriques créées dans MSK pour envoyer et recevoir des messages du serveur.
Résolution
Création de votre fichier de projet
1. Clonez le projet kafka-kinesis-connector pour télécharger Kafka-Kinesis-Connector.
2. Utilisez la commande mvn package pour créer le fichier amazon-kinesis-kafka-connector-X.X.X.jar dans le répertoire cible :
[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package .. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z [INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
Le connecteur Kafka-Kinesis recherche les informations d'identification dans l'ordre suivant: les variables d'environnement, les propriétés système Java et le fichier de profil d'informations d'identification.
3. Mettez à jour votre configuration vers le paramètre DefaultAWSCredentailsProviderChain :
[ec2-user@ip-10-0-0-71 target]$ aws configure
Cette commande garantit que la clé d'accès attachée à l'utilisateur AWS Identity and Access Management (IAM) dispose des autorisations minimales requises. La commande aws configure garantit également qu'une stratégie est disponible pour accéder à Amazon Kinesis Data Streams ou Amazon Kinesis Data Firehose. Pour plus d'informations sur la configuration des informations d'identification AWS, consultez Utilisation des informations d'identification AWS.
Remarque : si vous utilisez un kit de développement Java (JDK), vous pouvez également utiliser la classe EnvironmentVariableCredentialsProvider pour fournir des informations d'identification.
4. Si vous utilisez Kinesis Data Streams, mettez à jour votre stratégie comme suit :
{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
Si vous utilisez Kinesis Data Firehose, mettez à jour votre stratégie pour qu'elle ressemble à l'exemple suivant :
{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
Pour en savoir plus sur les paramètres de flux de diffusion de Kinesis Data Firehose, consultez Paramètres du fichier de configuration et d'identification.
Configuration du connecteur
Remarque : vous pouvez configurer le connecteur Kafka-Kinesis pour publier des messages à partir de MSK. Les messages peuvent être publiés vers les destinations suivantes : Amazon Simple Storage Service (Amazon S3), Amazon Redshift ou Amazon OpenSearch Service.
1. Si vous configurez Kinesis Data Streams, vous pouvez configurer le connecteur avec les valeurs suivantes :
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
-ou-
Si vous configurez un autre type de flux, configurez les propriétés du flux de diffusion Kinesis Data Firehose comme suit :
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
2. Configurez les propriétés de travail pour le mode autonome ou distribué :
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
Pour plus d'informations sur le mode autonome ou distribué de Kafka-Kinesis-Connector, consultez Kafka Connect sur le site web Apache.
3. Copiez le fichier amazon-kinesis-kafka-connector-0.0.X.jar vers votre répertoire et exportez chemin de classe.
Remarque : vous pouvez également ajouter le fichier amazon-kinesis-kafka-connector-0.0.X.jar au répertoire JAVA_HOME/lib/ext.
4. Exécutez le connecteur kinesis-kafka connector à l'aide de la syntaxe de commande suivante :
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
Informations connexes
Contenus pertinents
- demandé il y a 10 moislg...
- demandé il y a 8 moislg...
- demandé il y a 2 moislg...
- demandé il y a 2 moislg...
- AWS OFFICIELA mis à jour il y a un an
- AWS OFFICIELA mis à jour il y a un an
- Comment résoudre les erreurs lorsque j'essaie de créer un connecteur à l'aide d'Amazon MSK Connect ?AWS OFFICIELA mis à jour il y a un an
- AWS OFFICIELA mis à jour il y a un an