使用KCL和Java协助处理Kinesis Records的帮助

0

【以下的问题经过翻译处理】 我应该如何使用 KCL 处理 Java 中的实际记录?

我正在按照提供的指导 https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html,我可以连接到数据流,我可以得到可用记录数,但是该示例未显示的是如何实际获取记录(Json 字符串)。从示例中我可以看到我可以使用 r.data() 来获取记录的数据,它作为只读的 ByteBuffer 出现,我可以使用 StandardCharsets.US_ASCII.decode(r. data()).toString(),但是生成的字符串肯定是经过编码的,我尝试进行 Base64 解码,但出现错误 java.lang.IllegalArgumentException: Illegal base64 character 3f。那么获取有效载荷的最简单方法是什么?下面是我的 processRecords 方法:

public void processRecords(ProcessRecordsInput processRecordsInput) {
    try {
        System.out.println("Processing " + processRecordsInput.records().size() + " record(s)");
        processRecordsInput.records().forEach((r) -> {
            try {
                Decoder dec = Base64.getDecoder();
                String myString = StandardCharsets.US_ASCII.decode(r.data()).toString();
                byte[] bt = dec.decode(myString);
 
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    } catch (Throwable t) {
        System.out.println("Caught throwable while processing records. Aborting.");
        Runtime.getRuntime().halt(1);
    } finally {
    }
}

从这里我可以得到“myString”,但是当我到达“bt”时,我得到了显示的异常。我还没有找到解释如何获取记录的单一资源。我使用aws kinesis put-record --stream-name testStream --partition-key 1 --data {"somedata":"This Data"}将记录发布到kinesis

profile picture
专家
已提问 5 个月前53 查看次数
1 回答
0

【以下的回答经过翻译处理】 问题出在生产者没有生成正确的数据,使用String myString = StandardCharsets.US_ASCII.decode(r.data()).toString();可以得到预期的字符串。

profile picture
专家
已回答 5 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则