KCL1.x Java don't consuming records

0

I am using KCL 1.14.8 for Java. My Kinesis only have one shard, and I put a record into Kinesis every 30 seconds. For each record, the RecordProcessor will process it, processing each record may take several seconds(<10 seconds). This is worker configuration.

configuration.withInitialPositionInStream(InitialPositionInStream.LATEST)
                .withRegionName("eu-west-1")
                .withMaxRecords(10)
                .withIdleTimeBetweenReadsInMillis(1000L)
                .withCallProcessRecordsEvenForEmptyRecordList(false)
                .withRetryGetRecordsInSeconds(1)
                .withFailoverTimeMillis(60_000);

When the application start, It don't consuming records at all. the logs show this:

Aug 21, 2022 @ 21:24:14.943	Skipping shard sync due to the reason - Hash range is complete.	
Aug 21, 2022 @ 21:24:14.785	Number of pending leases to clean before the scan : 0	
Aug 21, 2022 @ 21:24:00.144	Sleeping ...	
Aug 21, 2022 @ 21:24:00.144	Current stream shard assignments: shardId-000000000000	
Aug 21, 2022 @ 21:23:14.911	Elected leaders: xxxxx-187a-4ceb-9714-e39ab1c7bb71	
Aug 21, 2022 @ 21:23:14.785	Number of pending leases to clean before the scan : 0	
Aug 21, 2022 @ 21:22:59.128	Sleeping ...	
Aug 21, 2022 @ 21:22:59.128	Current stream shard assignments: shardId-000000000000	
Aug 21, 2022 @ 21:22:14.937	Skipping shard sync due to the reason - Hash range is complete.	
Aug 21, 2022 @ 21:22:14.785	Number of pending leases to clean before the scan : 0	
Aug 21, 2022 @ 21:21:58.114	Current stream shard assignments: shardId-000000000000	
Aug 21, 2022 @ 21:21:58.114	Sleeping ...	
Aug 21, 2022 @ 21:21:14.785	Number of pending leases to clean before the scan : 0

please help.

  • ".withLogWarningForTaskAfterMillis" help me find the issue. my processor is blocked by waiting another task finish.

shawn
gefragt vor 2 Jahren108 Aufrufe
Keine Antworten

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen