What happened?
In Apache SeaTunnel version 2.3.9, the Kafka connector implementation contained a potential memory leak risk. When users configured streaming jobs to read data from Kafka, even with a read rate limit (read_limit.rows_per_second)
set, the system could still experience continuous memory growth until an OOM (Out Of Memory) occurred.
What's the key issue?
In real deployments, users observed the following phenomena:
- Running a Kafka-to-HDFS streaming job on an 8-core, 12G memory SeaTunnel Engine cluster
- Although
read_limit.rows_per_second=1
was configured, memory usage soared from 200MB to 5GB within 5 minutes - After stopping the job, memory was not released; upon resuming, memory kept growing until OOM
- Ultimately, worker nodes restarted
Root Cause Analysis
Through code review, it was found that the root cause lay in the createReader
method of the KafkaSource
class, where elementsQueue
was initialized as an unbounded queue:
elementsQueue = new LinkedBlockingQueue<>();
This implementation had two critical issues:
-
Unbounded Queue:
LinkedBlockingQueue
without a specified capacity can theoretically grow indefinitely. When producer speed far exceeds consumer speed, memory continuously grows. -
Ineffective Rate Limiting: Although users configured
read_limit.rows_per_second=1
, this limit did not actually apply to Kafka data reading, causing data to accumulate in the memory queue.
Solution
The community fixed this issue via PR#9041. The main improvements include:
-
Introducing a Bounded Queue: Replacing
LinkedBlockingQueue
with a fixed-sizeArrayBlockingQueue
-
Configurable Queue Size: Adding a
queue.size
configuration parameter, allowing users to adjust as needed -
Safe Default Value: Setting
DEFAULT_QUEUE_SIZE=1000
as the default queue capacity
Core implementation changes:
public class KafkaSource {
private static final String QUEUE_SIZE_KEY = "queue.size";
private static final int DEFAULT_QUEUE_SIZE = 1000;
public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
SourceReader.Context readerContext) {
int queueSize = kafkaSourceConfig.getInt(QUEUE_SIZE_KEY, DEFAULT_QUEUE_SIZE);
BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue =
new ArrayBlockingQueue<>(queueSize);
// ...
}
}
Best Practice Recommendations
For users of the SeaTunnel Kafka connector, it is recommended to:
- Upgrade Version: Use the SeaTunnel version containing this fix
-
Configure Properly: Set an appropriate
queue.size
value according to business needs and data characteristics - Monitor Memory: Even with a bounded queue, monitor system memory usage
-
Understand Rate Limiting: The
read_limit.rows_per_second
parameter applies to downstream processing, not Kafka consumption
Summary
This fix not only resolved the memory overflow risk but also improved system stability and configurability. By introducing bounded queues and configurable parameters, users can better control system resource usage and avoid OOM caused by data backlog. It also reflects the virtuous cycle of open-source communities continuously improving product quality through user feedback.
Top comments (0)