In today's data-driven world, real-time streaming is essential for businesses to gain instant insights and respond swiftly to market changes. Technologies like Apache Flink, Kafka, Postgres, Elasticsearch, and Kibana combine to create a powerful stack for processing, storing, searching, and visualizing streaming data in real time.
This article explores the integration of these technologies to build a robust real-time streaming architecture for an e-commerce business.
GitHub :: https://github.com/snepar/flink-ecom
Introducing the Building Blocks
This concisely conveys the purpose of the following components.
Apache Kafka :: A distributed streaming platform that excels in handling real-time data feeds. It enables the seamless transmission of data across systems, ensuring high-throughput, low-latency, fault-tolerant communication.
Apache Flink :: A Powerful real-time stream engine (with APIs in Java/Scala/Python) for low-latency data analysis & transformations on massive data streams.
PostgreSQL (Postgres):: is an advanced open-source relational database known for its robustness, scalability, and SQL compliance. It supports complex queries, ACID transactions, and extensibility with custom functions and types.
Elasticsearch :: is a distributed, open-source search and analytics engine. It excels at full-text search, real-time data indexing, and querying. It's highly scalable and ideal for log and event data analysis.
Kibana :: is a powerful visualization tool for Elasticsearch. It provides real-time insights into data with interactive charts, graphs, and dashboards, making it easy to explore and analyze large datasets visually.
Data Flow Diagram (Architecture)
Goal
- Data Ingestion: using Kafka
- Streaming Extract: using Flink Source APIs
- Transform and Aggregate: using Flink SerDe and transformations
- Load: using Flink Sink APIs to Postgres and Elasticsearch
- Visualise: using Kibana
Let Us Begin With the Infrastructure
- Install the following
- Python 3 , pip
- Scala 2.12 , sbt
- Docker Desktop
- Docker Compose
Refer to this repository for the complete setup
https://github.com/snepar/flink-ecom-infraExecute command
docker compose up -d
Run the python file
main.py
to generate events to kafka
- Execute the kafka consumer from your docker service to verify if the events are flowing as expected
kafka-console-consumer --topic financial_transactions --bootstrap-server broker:9092
Set Up Apache Flink
I have used Flink 1.16.3
Run Flink Locally using ->
https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/
The Flink Project Deep Dive
build.sbt manages all the dependencies for the project. To connect to Kafka, Postgres, and Elasticsearch.
https://github.com/snepar/flink-ecom/blob/master/build.sbt
Flink Kafka Connector uses org.apache.flink.connector.kafka.source.KafkaSource
Example Snippet::
KafkaSource.builder[Transaction]()
.setBootstrapServers("localhost:9092")
.setTopics(topic)
.setGroupId("ecom-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new JSONValueDeserializationSchema())
.build()
JSON Deserializer is used to deserialze kafka payload to Transaction
Can be referred here :: https://github.com/snepar/flink-ecom/blob/master/src/main/scala/ecom/deserializer/JSONValueDeserializationSchema.scala
Important Fact: as we are dealing with Scala Case Classes
use DefaultScalaModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
mapper.registerModule(DefaultScalaModule)
see the data consumed from Kafka in Flink using transactionStream.print()
Flink-Postgres Sink
DDLs are Defined here : https://github.com/snepar/flink-ecom/tree/master/src/main/scala/ecom/generators/DDL
Aggregation Examples (Monthly)
def writeToDBsalesPerMonth(transactionStream: DataStream[Transaction]) = {
transactionStream.addSink(JdbcSink.sink(
DDL.SalesPerMonthSQL.createTable,
new JdbcStatementBuilder[Transaction] {
override def accept(t: PreparedStatement, u: Transaction): Unit = {
}
},
execOptions,
connOptions
))
transactionStream.map(transaction =>
{
val transactionDate = new Date(transaction.transactionDate.getTime);
val year = transactionDate.toLocalDate().getYear();
val month = transactionDate.toLocalDate().getMonth().getValue();
SalesPerMonth(year, month, totalSales = transaction.totalAmount)
}
).keyBy(spm=>(spm.year,spm.month)).reduce((acc,curr) => acc.copy(totalSales = acc.totalSales + curr.totalSales))
.addSink(JdbcSink.sink(
DDL.SalesPerMonthSQL.insertStmt,
new JdbcStatementBuilder[SalesPerMonth] {
override def accept(preparedStatement: PreparedStatement, salesPerMonth: SalesPerMonth): Unit = {
preparedStatement.setInt(1, salesPerMonth.year)
preparedStatement.setInt(2, salesPerMonth.month)
preparedStatement.setDouble(3, salesPerMonth.totalSales)
}
},
execOptions,
connOptions
))
}
Flink-Elastic Sink
Important :: While defining the emitter function, type description is required (it is not mentioned in the documentation examples somehow)
def writeToElastic(transactionStream: DataStream[Transaction]) = {
val sink: ElasticsearchSink[Transaction] = new Elasticsearch7SinkBuilder[Transaction]
.setHosts(new HttpHost("localhost", 9200, "http"))
.setBulkFlushMaxActions(2)
.setBulkFlushInterval(10L)
.setEmitter[Transaction]{
(transaction, context, indexer) => {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val json: String = mapper.writeValueAsString(transaction)
val indexRequest = Requests.indexRequest()
.index("transactions")
.id(transaction.transactionId)
.source(json, XContentType.JSON);
indexer.add(indexRequest)
}
}.build()
transactionStream.sinkTo(sink)
}
Flink Job Execution
-
KafkaPGESIntegrationEcom.scala
: Can be directly run from the IDE OR - Install a flink cluster and deploy using
$flink run -c ecom.KafkaPGESIntegrationEcom flink-ecom_2.12-0.1.jar
A Glimpse into Postgres SQL
Data on Elasticsearch
Indexing hereβs the structure of the transaction index on elasticsearch.
You can get this by running GET transactions
in the DevTools.
You can query them by running GET transactions/_search
Reindexing Data on Elasticsearch
To get a readable transaction date, we need to reindex into a different index. To reindex data on elasticsearch, we use the _reindex
function.
POST _reindex
{
"source": {"index": "transactions"},
"dest": {"index": "transaction_part1"},
"script": {"source":"""
ctx._source.transactionDate = new
Date(ctx._source.transactionDate).toString();
"""}
}
GET reindex/_search
However, using toString() does not give us much room to wiggle around the format. So we need to use a more robust way to format the data.
POST _reindex
{
"source": {"index": "transactions"},
"dest": {"index": "transaction_part2"},
"script": {"source":
"""SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
formatter.setTimeZone(TimeZone.getTimeZone('UTC'));
ctx._source.transactionDate = formatter.format (new
Date(ctx._source.transactionDate));"""
}
}
GET transaction_part2/_search
Dashboard-ing in Realtime With Kibana
Index on transaction_part2
Creating Donut Chart
Number Of Transactions
Top Brands
Top comments (0)