Wie stelle ich mithilfe des Kafka-Kinesis-Connectors eine Verbindung zu meinem Amazon MSK-Cluster her?

Lesedauer: 4 Minute
0

Wenn ich versuche, den Kafka-Kinesis-Connector zu verwenden, um eine Verbindung mit Amazon Managed Streaming for Apache Kafka (Amazon MSK) herzustellen, erhalte ich eine Fehlermeldung. Wie stelle ich mithilfe des Kafka-Kinesis-Connectors eine Verbindung zu meinem Amazon MSK-Cluster her?

Kurzbeschreibung

Um über den Kafka-Kinesis-Connector eine Verbindung zu Ihrem MSK-Cluster herzustellen, muss Ihr Setup die folgenden Anforderungen erfüllen:

  • Ein aktives AWS-Abonnement.
  • Eine virtuelle private Cloud (VPC), die sowohl für den Client-Computer als auch für den MSK-Cluster sichtbar ist. Der MSK-Cluster und der Client müssen sich in derselben VPC befinden.
  • Konnektivität zu MSK- und Apache Zookeeper-Servern.
  • Zwei Subnetze, die Ihrer VPC zugeordnet sind.
  • In MSK erstellte Themen zum Senden und Empfangen von Nachrichten vom Server.

Lösung

Erstellen Sie Ihre Projektdatei

1.    Klonen Sie das Projekt kafka-kinesis-connector, um den Kafka-Kinesis-Connector herunterzuladen.

2.    Verwenden Sie den Befehl mvn package, um die Datei amazon-kinesis-kafka-connector-x.x.jar im Zielverzeichnis zu erstellen:

[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package
..
......

[INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.822 s
[INFO] Finished at: 2020-02-19T13:01:31Z
[INFO] Final Memory: 26M/66M
[INFO] ------------------------------------------------------------------------

Der Kafka-Kinesis-Connector sucht in der folgenden Reihenfolge nach Anmeldeinformationen: Umgebungsvariablen, Java-Systemeigenschaften und die Anmeldeinformationsprofildatei.

3.    Aktualisieren Sie Ihre Konfiguration auf die Einstellung DefaultAWSCredentailsProviderChain:

[ec2-user@ip-10-0-0-71 target]$ aws configure

Dieser Befehl stellt sicher, dass der dem AWS Identity and Access Management (IAM)-Benutzer zugeordnete Zugriffsschlüssel über die erforderlichen Mindestberechtigungen verfügt. Der Befehl aws configure stellt außerdem sicher, dass eine Richtlinie für den Zugriff auf Amazon Kinesis Data Streams oder Amazon Kinesis Data Firehose verfügbar ist. Weitere Informationen zum Festlegen von AWS-Anmeldeinformationen finden Sie unter Arbeiten mit AWS-Anmeldeinformationen.

Hinweis: Wenn Sie ein Java Development Kit (JDK) verwenden, können Sie auch die Klasse EnvironmentVariableCredentialsProvider verwenden, um Anmeldeinformationen bereitzustellen.

4.    Wenn Sie Kinesis Data Streams verwenden, aktualisieren Sie Ihre Richtlinie wie folgt:

{
     "Version": "2012-10-17",
     "Statement": [{
          "Sid": "Stmt123",
          "Effect": "Allow",
          "Action": [
               "kinesis:DescribeStream",
               "kinesis:PutRecord",
               "kinesis:PutRecords",
               "kinesis:GetShardIterator",
               "kinesis:GetRecords",
               "kinesis:ListShards",
               "kinesis:DescribeStreamSummary",
               "kinesis:RegisterStreamConsumer"
          ],
          "Resource": [
               "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
          ]
     }]
}

Wenn Sie Kinesis Data Firehose verwenden, aktualisieren Sie Ihre Richtlinie so, dass sie wie im folgenden Beispiel aussieht:

{
     "Version": "2012-10-17",
     "Statement": [{
          "Effect": "Allow",
          "Action": [
               "firehose:DeleteDeliveryStream",
               "firehose:PutRecord",
               "firehose:PutRecordBatch",
               "firehose:UpdateDestination"
          ],
          "Resource": [
               "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
          ]
     }]
}

Weitere Informationen zu den Kinesis Data Firehose Lieferstream-Einstellungen finden Sie unter Konfiguration und Einstellungen für Anmeldeinformationsdateien.

Konfiguration des Connectors

Hinweis: Sie können den Kafka-Kinesis-Connector so konfigurieren, dass er Nachrichten von MSK veröffentlicht. Nachrichten können an den folgenden Zielen veröffentlicht werden: Amazon Simple Storage Service (Amazon S3), Amazon Redshift oder Amazon OpenSearch Service.

1.    Wenn Sie Kinesis Data Streams einrichten, können Sie den Connector mit den folgenden Werten konfigurieren:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
streamName=YOUR_STREAM_NAME
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if
# threshold for outstanding records have reached
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms)
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregation=true

-oder-

Wenn Sie einen anderen Streamtyp einrichten, konfigurieren Sie die Eigenschaften des Kinesis Data Firehose Lieferstreams wie folgt:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
batch=true
batchSize=500
batchSizeInBytes=3670016
deliveryStream=YOUR_DELIVERY_STREAM_NAME

2.    Konfigurieren Sie die Worker-Eigenschaften entweder für den eigenständigen oder den verteilten Modus:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=offset.log

Weitere Informationen zum eigenständigen oder verteilten Modus von Kafka-Kinesis-Connector finden Sie unter Kafka Connect auf der Apache-Website.

3.    Kopieren Sie die Datei amazon-kinesis-kafka-connector-0.0.x.jar in Ihr Verzeichnis und exportieren Sie den Klassenpfad.

Hinweis: Sie können die Datei amazon-kinesis-kafka-connector-0.0.x.jar auch zum Verzeichnis JAVA\ _HOME/lib/ext hinzufügen.

4.    Führen Sie den kafka-kinesis-connector aus, indem Sie die folgende Befehlssyntax verwenden:

[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

kinesis-kafka-streams-connecter.properties

Ähnliche Informationen

Einen Amazon MSK-Cluster erstellen

AWS OFFICIAL
AWS OFFICIALAktualisiert vor 3 Jahren