Failure Recovery in Flink
Let's explore an algorithm used by Flink to recover from failure andAS the guarantees provided by Flink.
As mentioned previously, stream processing applications in Flink are supposed to be long-lived. So there must be an efficient way to recover from failures without repeating a lot of work. For this purpose, Flink periodically checkpoints the operators’ state and the position of the consumed stream to generate this state. In case of a failure, an application can be restarted from the latest checkpoint and continue processing from there.
All this is achieved via an algorithm similar to the Chandy-Lamport algorithm for distributed snapshots, called Asynchronous Barrier Snapshotting (ABS).
Asynchronous Barrier Snapshotting (ABS)
The ABS algorithm operates slightly differently for acyclic and cyclic graphs, so we will examine the first case here, which is a bit simpler.
Working
The algorithm works in the following way:
-
The Job Manager periodically injects some control records in the stream, referred to as stage barriers. These records are supposed to divide the stream into stages. At the end of a stage, the set of operator states reflects the whole execution history up to the associated barrier. Thus it can be used for a snapshot.
-
When a source task receives a barrier, it takes a snapshot of its current state and then broadcasts the barrier to all its outputs.
-
When a non-source task receives a barrier from one of its inputs, it blocks that input until it has received a barrier from all the inputs. It then takes a snapshot of its current state and broadcasts the barrier to its outputs. Finally, it unblocks its inputs. This blocking guarantees that the checkpoint contains the state after processing all the elements before the barrier and no elements after the barrier.
Note: The snapshot taken while the inputs are blocked are logical , where the actual, physical snapshot is happening asynchronously in the background. One way to achieve this is through copy-on-write techniques. This is done to reduce the duration of this blocking phase to start processing data again as quickly as possible.
- Once the background copy process is completed, each task acknowledges the checkpoint back to the Job Manager.The checkpoint is considered complete after the job manager receives the acknowledgement from all the tasks and can be used for recovery if a failure happens later. At this point, the Job Manager notifies all the tasks that the checkpoint is complete so that they can perform any cleanup or bookkeeping logic required.
Subtle points in the checkpoint algorithm
There are two subtle points in the checkpoint algorithm and the recovery process.
During recovery, tasks will be reset to the last checkpoint and start processing again from the first element after the checkpoint was taken. It means that any state that might have been produced by elements after the last checkpoint will be essentially discarded so that each element is processed exactly-once. However, this raises the following questions:
-
How is the state produced after the last checkpoint is discarded in practice if it persisted in the operator’s state?
-
What happens to
that interact with external systems and records that might have been emitted after the last checkpoint in case of a failure?sink tasks explained later in this lesson -
What happens with sources that do not support the replay of records?
The answers to all these questions partially rely on a core characteristic of the checkpoint algorithm.
Phases of ABS
The ABS algorithm has a two-phase commit protocol.
- In the first phase, the job manager instructes all the tasks to create a checkpoint.
- In the second phase, the job manager informs them that all the tasks successfully created a checkpoint.
Storing operator’s state
The state of an operator can be stored in different ways, such as in the operator’s memory, in an embedded key-value store, or in an external datastore.
If that datastore supports multi-version concurrency control (MVCC), then all the updates to the state are stored under a version that corresponds to the next checkpoint. During recovery, updates that were performed after the last checkpoint are automatically ignored since reads will return the version corresponding to the last checkpoint.
If the datastore does not support MVCC, all the state changes are maintained temporarily in local storage as a write-ahead-log (WAL), which will be committed to the datastore during the second phase of the checkpoint protocol.
Integration of Flink with other systems
Flink can also integrate with various other systems such as Kafka, RabbitMQ, etc., to retrieve input data from (sources) or send output data to (sinks). Each one of them provides different capabilities…
Integration with Kafka
Kafka provides an offset-based interface, which makes it very easy to replay data records in case of recovery from a failure. A sink task keeps track of the offset of each checkpoint and starts reading from that offset during recovery. However, message queues do not provide this interface, so Flink uses alternative methods to provide the same guarantees.
Note: In the case of RabbitMQ, messages are acknowledged and removed from the queue only after the associated checkpoint is complete, during the second phase of the protocol
Similarly, a sink needs to coordinate with the checkpoint protocol to provide exactly-once guarantee. Kafka is a system that can support this through the use of its transactional client.
When the sink creates a checkpoint, the flush()
operation is called part of the checkpoint. After the checkpoint completion notification is received from the Job Manager in all operators, the sink calls Kafka’s commitTransaction
method.
Note: Flink provides an abstract class called
TwoPhaseCommitSinkFunction
that provides the basic methods that need to be implemented by a sink that wants to provide these guarantees (i.e.beginTransaction
,preCommit
,commit
,abort
).
Guarantees provided by Flink
- Flink provides exactly-once processing semantics even across failures depending on the types of sources and sinks used.
- The user can also optionally downgrade to at-least-once processing semantics, which can provide increased performance.
- The exactly- once guarantees apply to stream records and local state produced using the Flink APIs. If an operator performs additional side effects on systems external to Flink, then no guarantees are provided for them.
- Flink does not provide ordering guarantees after any form of repartitioning or broadcasting. And the responsibility of dealing with out-of-order records is left to the operator implementation.
Now that you’ve gone through the important concepts of distributed data processing systems, test your knowledge by interacting with the AI widget below. The AI will ask a total of six questions focused on the workings of MapReduce, Apache Spark, and Apache Flink. To get started, say hello to Edward in the widget below, and it will lead the way.
Get hands-on with 1400+ tech skills courses.