This article is for those who wants to implement a real time analytics platform, but have the data stored initially in database. There are many documents on web describing, building a streaming platform, but most of them assume that the data is being ingested to Kafka in the first place. Unfortunately that's not the case for some of us.
Our example architectural scenario is like this;
Our data is initially stored in Oracle database, in other words application layer persists the user transactions directly to database instead of sending them to a queue like Kafka. So how do we convert the static data to a stream? Lucky for us there are some methods that utilizes the CDC(change data capture) logic. Since in our example scenario the database is Oracle we will only talk about the options for Oracle. Hopefully there are even more options for other databases for capturing real time data.
Currently there are two options for capturing data from Oracle database and the third one is on the way that we will talk about briefly later in this article.
GoldenGate
Oracle had acquired GoldenGate back in 2009 and positioned the software in their fusion middleware tier. Basically GoldenGate is a cdc tool that reads the oracle redo logs and replicates the data. Since it is reading from the redo logs it has no extra load on the database. Before GG some of the developers were performing trigger operations on the source systems and replicating the data over dblink to another oracle database, as expected this method had a serious drawbacks including performance, security and stability. A couple of years ago Oracle has released Oracle GoldenGate Big Data Adapter which you can configure a Kafka adapter and integrate GG with Kafka. That being said this adapter is tightly coupled with their BigData Appliance platform.
XStream API
XStream API is released as a new feature with Oracle database 11g Release 2 (11.2). Since then this feature is included in every other successor versions. XStream API basically consists of oracle database components, such as inbound and outbound servers, and an application programming interface API. This interface enable client applications to receive data changes from an Oracle database and send data changes to an Oracle database. Received changes can be shared with other other systems, including non Oracle products, file systems or third party software applications of any kind.
So which one should we use? The answer depends on your current architecture. If you are using Oracle BDA you may consider using GG Big Data adapter. Otherwise you may check out XStream API and learn how it works. One important thing should be noted here is; sadly both of the approaches require Oracle GoldenGate licence. Yes you read it correct, even if you use XStream you should have a GG licence.
Debezium
Alright lets get back to our sample scenario and ask ourselves; i don't have BDA and don't want to install GG, so which one should i use? Actually we do not use either of them directly. To come to our rescue, there is an open source RedHat Debezium platform for change data capture from various databases including Oracle. Debezium has a plugin for Oracle database which uses XStream API and simplifies many things for us. IMHO it is much more simpler then setting up GG and integrating with Kafka. But remember that, since it uses XStream API, you should have GG licence in any case. As we said before there is an upcoming third option ,aside from GG and XStream Api, inside Debezium which utilizes Oracle LogMiner that does not require a Oracle licence. This feature is being implemented as of writing this article, you may check github repo and gitter channel for more information. Especially i recommend gitter channel for any kind of issues and questions for Debezium. You can contact directly to product owners and developers, who are very friendly and helpful.
Integration
So let's get start to integrate Oracle, Debezium and Confluent. First of all we need to prepare our database so that we can receive change records.
Enable GoldenGate replication and archive log mode, you may need to change below paths according to your installation.
sqlplus sys as sysdba
alter system set db_recovery_file_dest_size = 5G;
mkdir -p /u01/oracle/oradata/recovery_area
alter system set db_recovery_file_dest = '/u01/oracle/oradata/recovery_area' scope=spfile;
alter system set ENABLE_GOLDENGATE_REPLICATION=true;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
--Should show "Database log mode: Archive Mode"
archive log list
Create xstrm and xstrmadmin users for managing connections to oracle.
CREATE TABLESPACE xstream_adm_tbs DATAFILE '/u01/oracle/oradata/xstream_adm_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER xstrmadmin IDENTIFIED BY xsa
DEFAULT TABLESPACE xstream_adm_tbs
QUOTA UNLIMITED ON xstream_adm_tbs;
GRANT CREATE SESSION TO xstrmadmin;
CREATE TABLESPACE xstream_tbs DATAFILE '/u01/oracle/oradata/xstream_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER xstrm IDENTIFIED BY xs DEFAULT TABLESPACE xstream_tbs QUOTA UNLIMITED ON xstream_tbs;
GRANT CREATE SESSION TO xstrm;
GRANT SELECT ON V_$DATABASE to xstrm;
GRANT FLASHBACK ANY TABLE TO xstrm;
BEGIN
DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
grantee => 'xstrmadmin',
privilege_type => 'CAPTURE',
grant_select_privileges => TRUE
);
END;
Create oracle outbound server with the tables you want to track.
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
schemas(1) := 'my_schema';
tables(1) := 'ABC_CUSTOMER_PROPOSAL';
tables(2) := 'ABC_INPUT_PARSE';
tables(3) := 'ABC_PAR_VALUE';
DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
server_name => 'dbzxout',
table_names => tables,
schema_names => schemas);
END;
Alright that is it for oracle part. Now lets download the required software.
- Download Conflunet platform
- Download Oracle Instant Client
- Download JDK if you don't already have
- Download Insomnia Rest Client and install it
- Download Oracle JDBC Driver
- Download following jar files from maven;
- ANTLR 4 Runtime
- Debezium Connector For Oracle
- Debezium Core
- Debezium ANTLR DDL Parsers
Now create a directory that will contain all the work we'll do here; BTW you can extract above downloaded files anywhere you like, but for simpliciy we will put them all under stream-demo folder.
mkdir stream-demo
- Extract JDK under
stream-demo/jdk
- Extract Confuent to
stream-demo/confluent
- Extract Instant Clinet to
stream-demo/instant-client
- Create a directory
stream-demo/confluent/share/java/kafka-connect-debezium
-
Place the following jars under
stream-demo/confluent/share/java/kafka-connect-debezium
- 4 Runtime
- Debezium Connector For Oracle
- Debezium Core
- Debezium ANTLR DDL Parsers
- Oracle JDBC Driver
Also copy
xstreams.jar
fromstream-demo/instant-client
tostream-demo/confluent/share/java/kafka-connect-debezium
Add
LD_LIBRARY_PATH
andJAVA_HOME
to your.profile
like this;
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/full/path/to/stream-demo/instant-client
export JAVA_HOME=/full/path/to/stream-demo/jdk
Save and quit then execute;
source ~/.profile
Go to
stream-demo/confluent/bin
and execute./confluent start
Ok we are almost done;
Open Insomnia rest client and create a work-space.
Add a new post request with the following payload
{
"name": "gds",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "ORCL",
"database.hostname" : "localhost",
"database.port" : "1530",
"database.user" : "xstrmadm",
"database.password" : "oracle",
"database.dbname" : "ORCL",
"database.tablename.case.insensitive": false,
"database.oracle.version": "11", "table.whitelist":"ORCL.MY_SCHEMA.ABC_INPUT_PARSE,ORCL.MY_SCHEMA.ABC_PAR_VALUE,ORCL.MY_SCHEMA.ABC_CUSTOMER_PROPOSAL",
"database.out.server.name" : "dbzxout",
"database.history.kafka.bootstrap.servers" : "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "precise"
}
}
Change the above parameters according to your installation.
The URL for the post request is;
http://localhost:8083/connectors
Send the post request.
Send a get request to
http://localhost:8083/connectors/gds/status
You should see something like;
{
"name": "gds",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
}
],
"type": "source"
}
Ok now open ksql client;
cd stream-demo/confluent/bin
./ksql
If your tables are receiving some some dml
already, you should see the topics by executing;
LIST TOPICS;
If you do not see anything try to insert, update or delete some records to your tables and then exec LIST TOPICS;
again.
We have integrated oracle
and kafka
. Rest of the work is relatively easier. If we look our example architecture there is a python layer in which we read kafka
topics with confluent python client and create streams from the topics; An example stream is created like this;
import logging
from streams.ksql_client import KSQLClient
from admin import ConfluentAdmin
class CustomerMaxSysId(KSQLClient, ConfluentAdmin):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger('streams.gr_large_none')
super().__init__(*args, **kwargs)
def create(self):
name = 'S_CUSTOMER_MAX_SYS_ID'
self.logger.info(f"Creating stream {name}")
if self.is_stream_exists(name):
self.logger.info(f"Stream already exists: {name}")
return
q = f"""
create stream {name} (
process_time bigint,
customer_no varchar,
sys_id varchar,
proposal_no varchar,
customer_type varchar,
flow_id integer
)
WITH (kafka_topic='CUSTOMER_MAX_SYS_ID', value_format='json', timestamp='process_time')
"""
self.ksql_client.ksql(q)
self.logger.info(f"Created stream {name}")
We also use python ksql client to interact with ksql-rest api. You can also use rest api directly but using the ksql client api makes it more expressive.
Without getting into full detail in this layer we create streams both from kafka topics and by joining streams. An example stream-stream join can be like this;
import logging
from streams.ksql_client import KSQLClient
from admin import ConfluentAdmin
class GeneralLimitOutZero(KSQLClient, ConfluentAdmin):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger('streams.gr_large_none')
super().__init__(*args, **kwargs)
def create(self):
name = 'S_GENERAL_LIMIT_OUT_ZERO'
self.logger.info(f"Creating stream {name}")
if self.is_stream_exists(name):
self.logger.info(f"Stream already exists: {name}")
return
q = f"""
create stream {name}
WITH (kafka_topic='{name}', value_format='JSON')
as
select
s.customer_no,
s.sys_id,
s.proposal_id,
o.out_value
from
S_GR_LARGE_MODEL_ACTION s
inner join S_OUTPUT_PARSE o within 15 days
on s.sys_id = o.ref_no
where
s.out_value = '0' and
o.out_param = 'LIMIT'
"""
self.ksql_client.ksql(q)
self.logger.info(f"Created stream {name}")
This layer is like an ETL
process in a standard DWH
except we use data streams instead of static data.
Next after creating streams, we can listen for data and take necessary actions or just report them in a reporting layer.
In our scenario we store the received data in redis. We use redis python client. And also we publish our received events to a redis channel.
key = f"gds:alert:gr_large_none:{customer_no}:{sys_id}:{proposal_id}"
self.__set_ttl(key)
self.logger.debug(f'Setting key {key}')
self.connection.hmset(key, record)
channel = 'channel:gds:alert:gr_large_none'
self.logger.debug(f'Publishing key to {channel}')
self.connection.publish(channel, key)
You can publish events to a kafka topic instead of redis channel. We have chosen redis channel here so the next layers only interacts with redis instead of both redis and kafka.
The next and the final part is listening redis channels and producing actions; In this layer we have used go
and before you ask there is no special purpose of choosing go
over python
, we have just wanted to demonstrate a heterogeneous, flexible environment with different technologies.
func Run() {
logrus.Debug("Attaching redis channels ...")
defer client.Close()
pubsub := client.Subscribe(channelNames()...)
defer pubsub.Close()
_, err := pubsub.Receive()
if err != nil {
panic(err)
}
ch := pubsub.Channel()
var anomalies = collectAnomalies()
for channel, keys := range anomalies {
if len(keys) >= anomalyLimit(channel) {
notify(channel, keys)
anomalies[channel] = []string{}
}
}
for message := range ch {
ca := channelAlias(message.Channel)
key := message.Payload
logrus.Debug(ca, " : Received message ", key)
keys, ok := anomalies[message.Payload]
if !ok {
keys = []string{key}
anomalies[message.Payload] = keys
}
anomalies[message.Channel] = util.AppendUniqueStr(anomalies[message.Channel], key)
logrus.Debug(ca, " : Number of anomalies ", len(anomalies[message.Channel]))
if len(anomalies[message.Channel]) >= anomalyLimit(message.Channel) {
notify(message.Channel, anomalies[message.Channel])
anomalies[message.Channel] = []string{}
}
}
}
Conclusion
The basic part of this scenario is actually integrating oracle
and kafka
using debezium
. The later steps can be implemented in many different ways depending the case.
Thanks for reading.
Top comments (0)