Apache Flink is a distributed stream processing framework that provides fault tolerance through a mechanism called "checkpointing".
I've shared information about checkpoint
shortly on following post.
https://dev.to/kination/analytics-dont-want-duplicated-data-so-get-it-exactly-once-with-flinkkafka-ga4
and here's more detail descriptions, for keeping fault-tolerance.
Checkpointing
allows Flink to recover from failures, by saving the state of streaming application periodically. Here's explanation of how Flink's fault tolerance works in detail, and code example to illustrate the concept.
How Flink Fault Tolerance Works
Checkpointing:
Flink periodically creates checkpoints which manages entire state of the streaming application. This checkpoint is some kind of "consistent snapshot" of state of all operators in the job, including positions in streaming input streams.
Checkpoints are stored in durable storage system (e.g., HDFS, S3, or a distributed file system), and you can change path or option through configuration.
Barriers:
Flink uses a mechanism called barriers
to align states across all operators in the logical graph
(dataflow graph).
This graph is a directed graph, where the nodes are "operators" and the edges define input/output-relationships of the operators and correspond to data streams or data sets. A logical graph is created by submitting jobs from a Flink Application.
Barriers are special records injected into the data stream that flow alongside the data.
When operator receives a barrier, it processes all records behind barrier, saves its state, and then forwards the barrier downstream.
State Backend:
Flink uses a state backend to store and manage the state of operators. These are types of backend which Flink supports
- MemoryStateBackend: Stores state in memory (not durable)
- FsStateBackend: Stores state in memory but writes checkpoints to a file system (hdfs, S3, etc.)
- RocksDBStateBackend: Stores state in RocksDB (on disk) and checkpoints to a file system
Recovery
In case of a failure, Flink restarts the job from the last completed checkpoint. The state of all operators is restored, and processing resumes from the point of the last checkpoint.
On code
Now let's see how this works based on code.
...
public class CommonStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("hdfs:///path/to/checkpoints"));
// Configure sample kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Add the Kafka consumer as a source
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Process the stream (e.g., word count)
DataStream<Tuple2<String, Integer>> wordCounts = stream
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
// Configure kafka sink
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
wordCounts.sinkTo(sink);
// Execute the job
env.execute("Execute Streaming Job");
}
// Tokenizer function to split sentences into words
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}
In this code, first you can find checkpoint definition from code env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
which enables checkpointing every 5 seconds with exactly-once semantics.
And from env.setStateBackend(new FsStateBackend("hdfs:///path/to/checkpoints"))
you can find out it defines state backend to store checkpoints in hdfs system.
This application has kafka connections for source/sink, which means it consumes streaming input from kafka topic "input-topic" inside broker, and produces to topic "output-topic" which is inside another broker.
Before producing after consuming, the Tokenizer
function splits sentences into words and emits (word, 1) tuples.
The keyBy(0).sum(1) operation performs a word count.
And of course, which this settings, Flink restarts the job from the last completed checkpoint when failure happens, and it ensures no data loss and exactly-once processing.
Top comments (0)