Help processing Kinessis Records with KCL and Java

0

How am I supposed to process the actual record in Java using KCL?

I'm following the guidance provided https://docs.aws.amazon.com/streams/latest/dev/kcl2-standard-consumer-java-example.html, I can connect to the data stream, I can get the number of records available, however what the example doesn't show is how to actually get the record (Json string). From the example I can see that I can use r.data() to get the record's data, it comes as a read only ByteBuffer, I can convert this to string by using StandardCharsets.US_ASCII.decode(r.data()).toString(), however the resulting string is definitely encoded, I have tried doing Base64 decoding but I get error java.lang.IllegalArgumentException: Illegal base64 character 3f. So what is the simplest way to get the payload? Below is my processRecords method:

public void processRecords(ProcessRecordsInput processRecordsInput) {
    try {
        System.out.println("Processing " + processRecordsInput.records().size() + " record(s)");
        processRecordsInput.records().forEach((r) -> {
            try {
                Decoder dec = Base64.getDecoder();
                String myString = StandardCharsets.US_ASCII.decode(r.data()).toString();
                byte[] bt = dec.decode(myString);
 
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    } catch (Throwable t) {
        System.out.println("Caught throwable while processing records. Aborting.");
        Runtime.getRuntime().halt(1);
    } finally {
    }
}

From here I can get myString but when I get to bt I get the exception shown. I have not found a single resource explaining how to get the record. I post the record to kinesis using aws kinesis put-record --stream-name testStream --partition-key 1 --data {"somedata":"This Data"}

rhaces
preguntada hace 2 años700 visualizaciones
1 Respuesta
0
Respuesta aceptada

The problem was in the producer which was not producing the correct data, using String myString = StandardCharsets.US_ASCII.decode(r.data()).toString(); would give you the String as expected.

rhaces
respondido hace 2 años

No has iniciado sesión. Iniciar sesión para publicar una respuesta.

Una buena respuesta responde claramente a la pregunta, proporciona comentarios constructivos y fomenta el crecimiento profesional en la persona que hace la pregunta.

Pautas para responder preguntas