AWS announces preview of AWS Interconnect - multicloud
AWS announces AWS Interconnect – multicloud (preview), providing simple, resilient, high-speed private connections to other cloud service providers. AWS Interconnect - multicloud is easy to configure and provides high-speed, resilient connectivity with dedicated bandwidth, enabling customers to interconnect AWS networking services such as AWS Transit Gateway, AWS Cloud WAN, and Amazon VPC to other cloud service providers with ease.
如何使用 Kafka-Kinesis-Connector 连接我的 Amazon MSK 集群?
当我尝试使用 Kafka-Kinesis-Connector 连接 Amazon Managed Streaming for Apache Kafka(Amazon MSK)时,我收到一条错误消息。
简短描述
先决条件:
- 您有有效的 AWS 订阅。
- 您的虚拟私有云(VPC)对客户端计算机和 MSK 集群均可见。MSK 集群和客户端必须位于同一 VPC 中。
- 您可以连接到 Amazon MSK 和 Apache Zookeeper 服务器。
- 有两个子网与您的 VPC 关联。
- 您在 MSK 中创建了主题来发送和接收来自服务器的消息。
解决方法
生成您的项目文件
- 要下载 Kafka-Kinesis-Connector,请从 GitHub 网站克隆 kafka-kinesis-Connector 项目。
- 要在目标目录中构建 Amazon-kinesis-Kafka-Connector-X.x.x.jar 文件,请运行 mvn package 命令:
Kafka-Kinesis-Connector 按以下顺序查找凭证:环境变量、Java 系统属性和凭证配置文件文件。[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package.. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z[INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------ - 要将您的配置更新为 DefaultAWSCredentailsProviderChain 设置,请运行以下命令:
前面的命令确保附加到 AWS 身份和访问管理(IAM)用户的访问密钥具有所需的最低权限。AWS Config 命令还确保有可用的策略来访问 Amazon Kinesis Data Streams 或 Amazon Kinesis Data Firehose。有关设置 AWS 凭证的更多信息,请参阅为适用于 Java 的 AWS SDK 提供临时凭证。[ec2-user@ip-10-0-0-71 target]$ aws configure
**注意:**如果您使用 Java 开发套件(JDK),那么您也可以使用 EnvironmentVariableCredentialsProvider 类来提供凭证。 - 如果您使用 Kinesis 数据流,请更新您的策略,使其与以下示例类似:
如果您使用 Kinesis Data Firehose,请更新您的政策,使其与以下示例类似:{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
有关 Kinesis Data Firehose 传输流设置的更多信息,请参阅Configuration and credential file settings。{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
配置连接器
**注意:**您可以将 Kafka-Kinesis-Connector 配置为从 MSK 发布消息。您可以将消息发布到以下目的地: mazon Simple Storage Service(Amazon S3)、Amazon Redshift 或 Amazon OpenSearch Service。
-
如果您正在设置 Kinesis 数据流,请使用以下值配置连接器:
name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true如果您要设置其他类型的直播,请使用以下值配置 Kinesis Data Firehose 传输流属性:
name=YOUR_CONNECTER_NAMEconnector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME -
为独立或分布式模式配置工作器属性:
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log有关 Kafka-Kinesis-Connector 的独立或分布式模式的更多信息,请参阅 Apache 网站上的 Kafka Connect。
-
将 Amazon-Kinesis-Kafka-Connector-0.0.x.jar 文件复制到您的目录中,然后导出类路径。
**注意:**您也可以将 Amazon-Kinesis-Kafka-Connector-0.0.x.jar 文件添加到 JAVA_HOME/lib/ext 目录中。 -
要运行 Kafka-Kinesis-Connector,请使用以下命令语法:
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
相关信息
- 语言
- 中文 (简体)
