Wie stelle ich mithilfe des Kafka-Kinesis-Connectors eine Verbindung zu meinem Amazon MSK-Cluster her?
Wenn ich versuche, den Kafka-Kinesis-Connector zu verwenden, um eine Verbindung mit Amazon Managed Streaming for Apache Kafka (Amazon MSK) herzustellen, erhalte ich eine Fehlermeldung. Wie stelle ich mithilfe des Kafka-Kinesis-Connectors eine Verbindung zu meinem Amazon MSK-Cluster her?
Kurzbeschreibung
Um über den Kafka-Kinesis-Connector eine Verbindung zu Ihrem MSK-Cluster herzustellen, muss Ihr Setup die folgenden Anforderungen erfüllen:
- Ein aktives AWS-Abonnement.
- Eine virtuelle private Cloud (VPC), die sowohl für den Client-Computer als auch für den MSK-Cluster sichtbar ist. Der MSK-Cluster und der Client müssen sich in derselben VPC befinden.
- Konnektivität zu MSK- und Apache Zookeeper-Servern.
- Zwei Subnetze, die Ihrer VPC zugeordnet sind.
- In MSK erstellte Themen zum Senden und Empfangen von Nachrichten vom Server.
Lösung
Erstellen Sie Ihre Projektdatei
1. Klonen Sie das Projekt kafka-kinesis-connector, um den Kafka-Kinesis-Connector herunterzuladen.
2. Verwenden Sie den Befehl mvn package, um die Datei amazon-kinesis-kafka-connector-x.x.jar im Zielverzeichnis zu erstellen:
[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package .. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z [INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
Der Kafka-Kinesis-Connector sucht in der folgenden Reihenfolge nach Anmeldeinformationen: Umgebungsvariablen, Java-Systemeigenschaften und die Anmeldeinformationsprofildatei.
3. Aktualisieren Sie Ihre Konfiguration auf die Einstellung DefaultAWSCredentailsProviderChain:
[ec2-user@ip-10-0-0-71 target]$ aws configure
Dieser Befehl stellt sicher, dass der dem AWS Identity and Access Management (IAM)-Benutzer zugeordnete Zugriffsschlüssel über die erforderlichen Mindestberechtigungen verfügt. Der Befehl aws configure stellt außerdem sicher, dass eine Richtlinie für den Zugriff auf Amazon Kinesis Data Streams oder Amazon Kinesis Data Firehose verfügbar ist. Weitere Informationen zum Festlegen von AWS-Anmeldeinformationen finden Sie unter Arbeiten mit AWS-Anmeldeinformationen.
Hinweis: Wenn Sie ein Java Development Kit (JDK) verwenden, können Sie auch die Klasse EnvironmentVariableCredentialsProvider verwenden, um Anmeldeinformationen bereitzustellen.
4. Wenn Sie Kinesis Data Streams verwenden, aktualisieren Sie Ihre Richtlinie wie folgt:
{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
Wenn Sie Kinesis Data Firehose verwenden, aktualisieren Sie Ihre Richtlinie so, dass sie wie im folgenden Beispiel aussieht:
{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
Weitere Informationen zu den Kinesis Data Firehose Lieferstream-Einstellungen finden Sie unter Konfiguration und Einstellungen für Anmeldeinformationsdateien.
Konfiguration des Connectors
Hinweis: Sie können den Kafka-Kinesis-Connector so konfigurieren, dass er Nachrichten von MSK veröffentlicht. Nachrichten können an den folgenden Zielen veröffentlicht werden: Amazon Simple Storage Service (Amazon S3), Amazon Redshift oder Amazon OpenSearch Service.
1. Wenn Sie Kinesis Data Streams einrichten, können Sie den Connector mit den folgenden Werten konfigurieren:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
-oder-
Wenn Sie einen anderen Streamtyp einrichten, konfigurieren Sie die Eigenschaften des Kinesis Data Firehose Lieferstreams wie folgt:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
2. Konfigurieren Sie die Worker-Eigenschaften entweder für den eigenständigen oder den verteilten Modus:
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
Weitere Informationen zum eigenständigen oder verteilten Modus von Kafka-Kinesis-Connector finden Sie unter Kafka Connect auf der Apache-Website.
3. Kopieren Sie die Datei amazon-kinesis-kafka-connector-0.0.x.jar in Ihr Verzeichnis und exportieren Sie den Klassenpfad.
Hinweis: Sie können die Datei amazon-kinesis-kafka-connector-0.0.x.jar auch zum Verzeichnis JAVA\ _HOME/lib/ext hinzufügen.
4. Führen Sie den kafka-kinesis-connector aus, indem Sie die folgende Befehlssyntax verwenden:
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
Ähnliche Informationen
Relevanter Inhalt
- AWS OFFICIALAktualisiert vor einem Jahr
- AWS OFFICIALAktualisiert vor 2 Jahren
- AWS OFFICIALAktualisiert vor 4 Monaten
- AWS OFFICIALAktualisiert vor 10 Monaten