Kafka-Kinesis-Connector를 사용하여 Amazon MSK 클러스터에 연결하려면 어떻게 해야 합니까?
Kafka-Kinesis-Connector를 사용하여 Amazon Managed Streaming for Apache Kafka(Amazon MSK)에 연결하려고 하면 오류 메시지가 표시됩니다. Kafka-Kinesis-Connector를 사용하여 Amazon MSK 클러스터에 연결하려면 어떻게 해야 합니까?
간략한 설명
Kafka-Kinesis-Connector를 사용하여 MSK 클러스터에 연결하려면 다음 요구 사항에 따라 설정해야 합니다.
- 활성 AWS 구독.
- 클라이언트 머신과 MSK 클러스터 모두에 표시되는 VPC(가상 프라이빗 네트워크). MSK 클러스터와 클라이언트는 동일한 VPC에 상주해야 합니다.
- MSK 및 Apache Zookeeper 서버에 대한 연결.
- VPC에 연결된 서브넷 2개.
- 서버의 메시지를 송수신하기 위해 MSK에 생성된 주제.
해결 방법
프로젝트 파일 빌드
1. kafka-kinesis-connector 프로젝트를 복제하여 Kafka-Kinesis-Connector를 다운로드합니다.
2. mvn package 명령을 사용하여 대상 디렉터리에 amazon-kinesis-kafka-connector-X.X.X.jar 파일을 빌드합니다.
[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package .. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z [INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
Kafka-Kinesis-Connector는 환경 변수, Java 시스템 속성 및 자격 증명 프로파일 파일 순서로 자격 증명을 찾습니다.
3. 구성을 DefaultAWSCredentailsProviderChain 설정으로 업데이트합니다.
[ec2-user@ip-10-0-0-71 target]$ aws configure
이 명령은 AWS Identity and Access Management(IAM) 사용자에 연결된 액세스 키에 필요한 최소 권한이 있는지 확인합니다. 또한 aws configure 명령은 Amazon Kinesis Data Streams 또는 Amazon Kinesis Data Firehose에 액세스할 때 사용할 수 있는 정책이 있는지 확인합니다. AWS 자격 증명 설정에 대한 자세한 내용은 AWS 자격 증명 작업을 참조하세요.
참고: JDK(Java Development Kit)를 사용하는 경우 EnvironmentVariableCredentialsProvider 클래스를 사용하여 자격 증명을 제공할 수도 있습니다.
4. Kinesis Data Streams를 사용하는 경우 정책을 다음으로 업데이트합니다.
{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
Kinesis Data Firehose를 사용하는 경우 다음 예와 같이 정책을 업데이트합니다.
{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
Kinesis Data Firehose 전송 스트림 설정에 대한 자세한 내용은 구성 및 자격 증명 파일 설정을 참조하세요.
커넥터 구성
참고: MSK에서 메시지를 게시하도록 Kafka-Kinesis-Connector를 구성할 수 있습니다. 메시지를 다음 대상에 게시할 수 있습니다. Amazon Simple Storage Service(Amazon S3), Amazon Redshift, 또는 Amazon OpenSearch Service.
1. Kinesis Data Streams를 설정하는 경우 다음 값을 사용하여 커넥터를 구성할 수 있습니다.
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
-또는-
다른 유형의 스트림을 설정하는 경우 다음과 같이 Kinesis Data Firehose 전송 스트림 속성을 구성합니다.
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
2. 독립 실행형 또는 분산형 모드에 대한 작업자 속성을 구성합니다.
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
Kafka-Kinesis-Connector의 독립형 또는 분산 모드에 대한 자세한 내용은 Apache 웹 사이트에서 Kafka 연결을 참조하십시오.
3. amazon-kinesis-kafka-connector-0.0.X.jar 파일을 디렉터리에 복사하고 classpath를 내보냅니다.
참고: amazon-kinesis-kafka-connector-0.0.X.jar 파일을 JAVA_HOME/lib/ext 디렉터리에 추가할 수도 있습니다.
4. 다음 명령 구문을 사용하여 kafka-kinesis-connector를 실행합니다.
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
관련 정보
관련 콘텐츠
- 질문됨 3일 전lg...
- 질문됨 3달 전lg...
- 질문됨 일 년 전lg...
- 질문됨 7달 전lg...
- 질문됨 2달 전lg...
- AWS 공식업데이트됨 일 년 전
- AWS 공식업데이트됨 일 년 전
- AWS 공식업데이트됨 일 년 전