Kinesis Analytics python memory leak

0

Hello,

I created the simplest pyflink application to reproduce a memory leak we experience: It reads a Kinesis Stream, performs an aggregate then sinks to another Kinesis Stream. Over one week of running this sample, the "Container Memory" metric goes from 60% to 70% and the "Last Checkpoint size" keeps increasing from 128k to 800k. The full application behaves similarly and restarts the containers after 24 to 48 hours. Any clue if it something I can do on our side or is it a bug on AWS side? I'm running flink 1.18.1

Thanks for any help

This is the sample application:

import sys
import json
from pyflink.datastream import StreamExecutionEnvironment, DataStream, DataStreamSink
from pyflink.datastream.connectors.kinesis import (
    KinesisStreamsSink,
    PartitionKeyGenerator,
    FlinkKinesisConsumer,
)
from pyflink.datastream.window import (
    PurgingTrigger,
    ProcessingTimeTrigger,
    TumblingProcessingTimeWindows,
)
from pyflink.datastream.functions import AggregateFunction
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.common.types import Row
from pyflink.common.time import Time

print(f"Python_version is {sys.version}, {sys.version_info}")


def decode_signal(signal: str) -> Row:
    sg = json.loads(signal)
    result = {
        "watcher_uuid": sg["watcher"]["uuid"],
        "pokemon_value": sg["pokemon"]["value"],
        "alert_received_at": sg["alert_received_at"],
        "scenario": sg["scenario"],
    }
    return Row(**result)


def create_consumer(env: StreamExecutionEnvironment) -> DataStream:
    consumer_config = {
        "aws.region": "eu-west-1",
        "flink.stream.initpos": "LATEST",
        "flink.stream.recordpublisher": "EFO",
        "flink.stream.efo.consumername": "test-flink",
    }

    enriched_ds = env.add_source(
        FlinkKinesisConsumer(
            "enriched_signal_stream", SimpleStringSchema(), consumer_config
        )
    )
    enriched_ds = enriched_ds.map(
        lambda x: decode_signal(x),
        output_type=Types.ROW_NAMED(
            field_names=[
                "watcher_uuid",
                "pokemon_value",
                "alert_received_at",
                "scenario",
            ],
            field_types=[
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
                Types.STRING(),
            ],
        ),
    )
    return enriched_ds


# aggregate signals for the same watcher and pokemon
class AggregateSignalsFunction(AggregateFunction):
    def create_accumulator(self) -> dict:
        return Row(total_signals=0)

    def add(self, signal: dict, accumulator: dict) -> dict:
        accumulator["total_signals"] += 1
        return accumulator

    def get_result(self, accumulator: dict) -> dict:
        return accumulator

    def merge(self, acc_a, acc_b) -> dict:
        acc_a["total_signals"] += acc_b["total_signals"]
        return acc_a


def agregate_signals(consumer: DataStream) -> DataStream:
    aggregated_pokemon_watcher_ds = (
        consumer.key_by(
            lambda row: row["pokemon_value"] + "_" + row["watcher_uuid"],
            key_type=Types.STRING(),
        )
        .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
        .trigger(PurgingTrigger(ProcessingTimeTrigger()))
        .aggregate(
            AggregateSignalsFunction(),
            output_type=Types.ROW_NAMED(
                field_names=["total_signals"],
                field_types=[Types.INT()],
            ),
            accumulator_type=Types.ROW_NAMED(
                field_names=["total_signals"],
                field_types=[Types.INT()],
            ),
        )
        .name("aggregate_pokemon_watcher_signals_by_5min")
        .uid("aggregate_pokemon_watcher_signals_by_5min")
    )
    return aggregated_pokemon_watcher_ds


def create_sink() -> DataStreamSink:
    sink_properties = {
        # Required
        "aws.region": "eu-west-1",
    }

    kds_sink = (
        KinesisStreamsSink.builder()
        .set_kinesis_client_properties(sink_properties)
        .set_serialization_schema(SimpleStringSchema())
        .set_partition_key_generator(PartitionKeyGenerator.fixed())
        .set_stream_name("test_flink_sink")
        .set_fail_on_error(False)
        .set_max_batch_size(500)
        .set_max_in_flight_requests(50)
        .set_max_buffered_requests(10000)
        .set_max_batch_size_in_bytes(5 * 1024 * 1024)
        .set_max_time_in_buffer_ms(5000)
        .set_max_record_size_in_bytes(1 * 1024 * 1024)
        .build()
    )
    return kds_sink


def main():
    env = StreamExecutionEnvironment.get_execution_environment()
    # create streaming consumer from Kinesis Stream
    consumer = create_consumer(env)
    # agregate signals by pokemon watcher
    agregated_stream = agregate_signals(consumer)
    # create sink to Kinesis Stream
    sink = create_sink()
    # sink agregate to Kinesis Stream
    agregated_stream.map(
        lambda x: json.dumps(x.as_dict()), output_type=Types.STRING()
    ).sink_to(sink)

    try:
        env.execute_async("Analytics")
    except Exception as e:
        print(e)
        raise e


if __name__ == "__main__":
    main()
asked 4 days ago36 views
1 Answer
0

I would suggest you ensure proper resource management and avoid potential pitfalls such as retaining references to objects unnecessarily.

  • Use proper handling of resources, particularly in the main function where the streaming job is executed.
  • Ensure that state and accumulators do not retain unnecessary data.
  • Avoid using global variables that can hold references to large objects.
  • Make sure to log errors appropriately to identify potential issues early.
profile picture
EXPERT
answered 4 days ago
  • Thanks for your answer, however, I have the impression the code that I've shared here respects the principles you have listed and I hardly see how to make it even simpler. And it still leaks memory.

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions