The checkpoint or savepoint in my Amazon Kinesis Data Analytics application is failing.
Short description
Checkpointing is the method that is used for implementing fault tolerance in Amazon Kinesis Data Analytics for Apache Flink. Your application not being optimized or properly provisioned might result in checkpoint failures.
Some of the major causes for checkpoint failures are the following:
- For Rocks DB, Apache Flink reads files from the local storage and writes to the remote persistent storage, that is Amazon Simple Storage Service (Amazon S3). The performance of the local disk and upload rate might impact checkpointing and result in checkpoint failures.
- Savepoint and checkpoint states are stored in a service-owned Amazon S3 bucket that's fully managed by AWS. These states are accessed whenever an application fails over. Transient server errors or latency in this S3 bucket might lead to checkpoint failures.
- A process function that you created where the function communicates with an external resource, such as Amazon DynamoDB, during checkpointing might result in checkpoint failures.
- Failure due to serialization of the state, such as serializer mismatch with the incoming data, can cause checkpoint failures.
- The number of Kinesis Processing Units (KPUs) provisioned for the application might not be sufficient. To find the allocated KPUs, use the following calculation:
Allocated KPUs for the application = Parallelism / ParallelismPerKPU
- Larger application state sizes might lead to an increase in checkpoint latency. This is because, the task manager takes more time to save the checkpoint, which can result in an out-of-memory exception.
- A skewed distribution of state could result in one task manager handling more data compared to other task managers. Even if sufficient KPUs(resources) are provisioned, one or more overloaded task managers can cause an out-of-memory exception.
- A high cardinality indicates that there is a large number of unique keys in the incoming data. If the job uses the KeyBy operator for partitioning the incoming data, and the key on which the data is partitioned has high cardinality, slow checkpointing might occur. This might eventually result in checkpoint failures.
Resolution
- The size of your application state might increase rapidly, causing an increase in the checkpoint size and duration. You can monitor these values using the Amazon CloudWatch metrics lastCheckPointDuration and lastCheckpointSize. For more information, see Application metrics.
- Increase the parallelism of the operator that processes more data. You can define the parallelism for an individual operator, data source, or data sink by calling the setParallelism() method.
- Tune the values of Parallelism and ParallelismPerKPU for optimum utilization of KPUs. Be sure that automatic scaling isn't turned off for your Amazon Kinesis Data Analytics application. The value of the maxParallelism parameter allows you to scale to a desired number of KPUs. For more information, see Application Scaling in Kinesis Data Analytics for Apache Flink.
- Define TTL on the state to make sure that the state is periodically cleaned.
- Optimize the code to allow for better partitioning strategy. You can use rebalancing partitioning to help distribute the data evenly. This method uses a round robin method for distribution.
- Optimize the code to reduce the window size so that the cardinality of the number of keys in that window is reduced.
Related information
Apache Flink documentation for Checkpoints
Implementing fault tolerance in Kinesis Data Analytics for Apache Flink