Introduction
When you are working with Kafka, there are plenty of ways out there to move data from individual topics to other storage mediums. One of the simplest methods is to use Kafka Connect, as it allows setting up these data pipelines with minimal configuration and a no-code setup (almost). Whether replicating topics between clusters or pushing data to S3, all of that is achievable with plugins from Confluent like MirrorMaker2 and S3 Sink Connector, among other,s for corresponding systems.
However, I recently ran into a situation where I needed to move volumes of streaming data from S3 into Kafka using MSK Connect (a managed solution from AWS for Kafka Connect). I needed the ability to selectively pull data from S3 and not the whole data set. For this, there are plugins available, but it came with a catch of not being free to use. I did manage to find some options with other providers, but not as viable as the ones Confluent publishes. I also needed to perform some data transformations as the streams of data in S3 were not in the format the consumers expected.
When you hit a wall (paywall in this case), you get creative. Enter Flink.
Flink for S3 to Kafka ETL
Apache Flink is a distributed processing engine capable of handling both bounded and unbounded streams. If you haven’t worked with Flink before or need a refresher on the fundamentals, I’d recommend checking out this article, where I walk through the core concepts.
Using Flink as an ETL layer between S3 and Kafka offers several advantages over MSK Connect with commercial connectors:
- Selective data ingestion: Choose exactly which files to process from a specified date range and specific folders.
- Data transformation: Handle multiple data formats and standardize them before pushing to Kafka
- Unified processing: Leverage Flink’s rich API for complex transformations if needed
The Flow
The flow is straightforward. Files land in an S3 bucket, Flink reads from S3 using the FileSource API, applies transformations and filtering logic, and pushes the processed events to the intended Kafka topics.
Here’s what the setup looks like:
- Source: S3 bucket containing your data files (JSON, CSV, Parquet, etc.)
- Processing layer: Flink job running on AWS Managed Service for Apache Flink (MSF)
- Sink: Kafka topic on AWS MSK
Implementation
Let’s assume there is data related to an e-commerce store that needs to be pushed into Kafka topics for a replay. There are two specific data feeds: order & payment. The order stream is gzip-compressed, and the payment data stream needs to be transformed to only pull a subset of the attributes from the JSON.
The data in S3 was already partitioned by the year, month, day & hour. Using the FileSource connector, the data can be selectively pulled from S3. What filtering you could achieve with commercial S3 source connectors, the FileSource Connector on Flink gives you all the control you need and some more.
The resulting DataStream is then subjected to the custom transformer logic via a map operator. The ability to apply specific transformation logic per feed is what this setup excels in. The order feed just needs to be gzip-decompressed, whereas the payment feed requires selective pulling of some data elements.
The transformed DataStream is then Sinked-to the designated Kafka topic.
// Create FileSource for reading from S3
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path(s3BasePath))
.setFileEnumerator(
() -> new org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator() {
private boolean isFiltered(org.apache.flink.core.fs.Path path) {
String pathStr = path.getPath();
boolean matches = dateFilterPattern.matcher(pathStr).find();
return !matches;
}
})
.build();
// Create data stream from S3
DataStream<String> s3Stream = env.fromSource(
fileSource,
WatermarkStrategy.noWatermarks(),
"S3-Historical-Data-Source-" + feedName.toUpperCase());
// Transform the payloads before pushing to Kafka
DataStream<String> transformedStream;
if (transformerEnabled) {
DataTransformer transformer = createTransformer();
System.out.println("Using transformer: " + transformer.getName());
// Apply transformation
transformedStream = s3Stream
.map(new TransformMapFunction(transformer))
.name("Transform-" + feedName.toUpperCase())
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value != null && !value.trim().isEmpty();
}
})
.name("Filter-Null-Records");
}
// Create KafkaSink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers(bootstrapServers)
.setKafkaProducerConfig(kafkaSinkProperties)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topicName)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// Write to Kafka
transformedStream.sinkTo(kafkaSink).name("Kafka-Sink-" + topicName);
Local testing
You can get the two jobs up and running in your local environment to simulate the whole behaviour using Docker.
Become a member
Kafka UI gives you a nice view of seeing all the incoming message streams from the intended topics.
With LocalStack, the job can pull data from an actual S3 bucket so that you can test the whole flow and evaluate how things would work in the cloud environment.
Here is a link to the Github repo.
What it also solves
Multiple data formats is the other pain point I had to deal with because the streams of data ingested into S3 were from multiple sources, with none of them having a consistent structure. Some feeds might be in JSON, others in CSV or compressed with GZIP. The limitation with Kafka Connect is that you cannot apply any kind of transformation to the data.
I was able to transform each of the data streams with specific transformer logic. Going one step further, you can even leverage Flink’s state management if you need to maintain context across multiple files or perform stateful transformations. This level of flexibility is hard to achieve with connector-based solutions.
Cost considerations
While MSK Connect itself is relatively affordable, when running at scale, the costs can go up quite fast. Coupled with that, if you have licensing costs for the connectors, then you are paying a lot more than what you had imagined.
With Flink, you pay for the compute resources (KPUs) that MSF provisions. Depending on your workload, this can result in significant cost savings as you fine-tune your processing logic.
When to use Flink vs MSK Connect
Flink as an ETL layer makes sense when:
- You need advanced filtering or transformation capabilities. -You’re dealing with multiple data formats that require normalization.
- Commercial licensing costs are prohibitive.
- You want the flexibility to extend processing logic over time.
MSK Connect might still be the better choice for:
- Simple, straightforward S3-to-Kafka ingestion with no transformations.
- Teams without Flink expertise who prefer managed connector solutions.
- Workloads where the free open-source connectors meet all requirements.
Wrapping up
Before making a decision, mapping out your requirements should be step one. I’d personally prefer going with a managed solution that requires no code management, even if it means paying a little extra. While giving the additional layer of controls, going with the Flink approach does introduce an additional effort for code & infrastructure maintenance. You get regex-based filtering, multi-format transformation support, and the ability to scale processing logic as your requirements evolve.
If you’re already using Flink for other stream processing tasks, adding S3-to-Kafka ETL into your existing infrastructure is a natural fit. And if you’re new to Flink, this could be a great entry point to explore its capabilities.
If you run into any questions or want to share your experience, feel free to reach out!



Top comments (0)