<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Ceyhun Kerti</title>
    <description>The latest articles on DEV Community by Ceyhun Kerti (@ceyhunkerti).</description>
    <link>https://dev.to/ceyhunkerti</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F171351%2F59c64bfc-2cbf-4fd0-abff-d63c29302d56.jpeg</url>
      <title>DEV Community: Ceyhun Kerti</title>
      <link>https://dev.to/ceyhunkerti</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/ceyhunkerti"/>
    <language>en</language>
    <item>
      <title>Many to Many Relations in Ecto</title>
      <dc:creator>Ceyhun Kerti</dc:creator>
      <pubDate>Thu, 23 Apr 2020 13:09:23 +0000</pubDate>
      <link>https://dev.to/ceyhunkerti/many-to-many-relations-in-ecto-33n4</link>
      <guid>https://dev.to/ceyhunkerti/many-to-many-relations-in-ecto-33n4</guid>
      <description>&lt;p&gt;I believe this post will be helpful to the ones, searching for a many to many relationship implementation in phoenix/ecto.&lt;/p&gt;

&lt;p&gt;I can't express enough my thoughts about elixir and surrounding ecosystem like &lt;a href="https://www.phoenixframework.org/" rel="noopener noreferrer"&gt;phoenix&lt;/a&gt; and &lt;a href="https://hexdocs.pm/ecto/Ecto.html" rel="noopener noreferrer"&gt;ecto&lt;/a&gt;, all i can say is it's a great pleasure to learn and be the part of the community even as a beginner. I have additional info about the community and resources at the end of the post, so let's get straight to the point and try to implement a many-to-many relation.&lt;/p&gt;

&lt;p&gt;Suppose we have the classical blog application and our model is like;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fu65na64qezrwov2zox8d.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2Fu65na64qezrwov2zox8d.png" alt="blog model" width="580" height="262"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Sure we have other columns like username, name etc.. in users or comment body in comments. I've skipped these additional attributes for the sake of simplicity.&lt;/p&gt;

&lt;p&gt;Our migrations will look like this;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# users&lt;/span&gt;

&lt;span class="k"&gt;defmodule&lt;/span&gt; &lt;span class="no"&gt;MyApp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Repo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Migrations&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;AddUsersTable&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="kn"&gt;use&lt;/span&gt; &lt;span class="no"&gt;Ecto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Migration&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;change&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;create&lt;/span&gt; &lt;span class="n"&gt;table&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:users&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="n"&gt;add&lt;/span&gt; &lt;span class="ss"&gt;:name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
      &lt;span class="n"&gt;add&lt;/span&gt; &lt;span class="ss"&gt;:username&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;null:&lt;/span&gt; &lt;span class="no"&gt;false&lt;/span&gt;
      &lt;span class="n"&gt;add&lt;/span&gt; &lt;span class="ss"&gt;:password_hash&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;null:&lt;/span&gt; &lt;span class="no"&gt;false&lt;/span&gt;
      &lt;span class="n"&gt;add&lt;/span&gt; &lt;span class="ss"&gt;:email&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;null:&lt;/span&gt; &lt;span class="no"&gt;false&lt;/span&gt;

      &lt;span class="n"&gt;timestamps&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;

    &lt;span class="n"&gt;create&lt;/span&gt; &lt;span class="n"&gt;unique_index&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:users&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:username&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="n"&gt;create&lt;/span&gt; &lt;span class="n"&gt;unique_index&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:users&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:email&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# posts&lt;/span&gt;

&lt;span class="k"&gt;defmodule&lt;/span&gt; &lt;span class="no"&gt;MyApp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Repo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Migrations&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;CreatePosts&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="kn"&gt;use&lt;/span&gt; &lt;span class="no"&gt;Ecto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Migration&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;change&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;create&lt;/span&gt; &lt;span class="n"&gt;table&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:posts&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="n"&gt;add&lt;/span&gt; &lt;span class="ss"&gt;:title&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
      &lt;span class="n"&gt;add&lt;/span&gt; &lt;span class="ss"&gt;:owner_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;references&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:users&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;on_delete:&lt;/span&gt; &lt;span class="ss"&gt;:nothing&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="ss"&gt;null:&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;

      &lt;span class="n"&gt;timestamps&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# comments&lt;/span&gt;

&lt;span class="k"&gt;defmodule&lt;/span&gt; &lt;span class="no"&gt;MyApp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Repo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Migrations&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;CreateComments&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="kn"&gt;use&lt;/span&gt; &lt;span class="no"&gt;Ecto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Migration&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;change&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;create&lt;/span&gt; &lt;span class="n"&gt;table&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:comments&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="n"&gt;add&lt;/span&gt; &lt;span class="ss"&gt;:body&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
      &lt;span class="n"&gt;add&lt;/span&gt; &lt;span class="ss"&gt;:post_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;references&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:posts&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;on_delete:&lt;/span&gt; &lt;span class="ss"&gt;:delete_all&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
      &lt;span class="n"&gt;add&lt;/span&gt; &lt;span class="ss"&gt;:owner_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;references&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:users&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;on_delete:&lt;/span&gt; &lt;span class="ss"&gt;:nothing&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="ss"&gt;null:&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;

      &lt;span class="n"&gt;timestamps&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;

  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# comment_likes&lt;/span&gt;

&lt;span class="k"&gt;defmodule&lt;/span&gt; &lt;span class="no"&gt;MyApp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Repo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Migrations&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;AddCommnetLikeTable&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
  &lt;span class="kn"&gt;use&lt;/span&gt; &lt;span class="no"&gt;Ecto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Migration&lt;/span&gt;

  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;change&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;create&lt;/span&gt; &lt;span class="n"&gt;table&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:comment_likes&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;primary_key:&lt;/span&gt; &lt;span class="no"&gt;false&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="n"&gt;add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:comment_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;references&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:comments&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;on_delete:&lt;/span&gt; &lt;span class="ss"&gt;:delete_all&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="ss"&gt;primary_key:&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
      &lt;span class="n"&gt;add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;references&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:users&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;on_delete:&lt;/span&gt; &lt;span class="ss"&gt;:delete_all&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="ss"&gt;primary_key:&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

      &lt;span class="n"&gt;timestamps&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;

    &lt;span class="n"&gt;create&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="n"&gt;unique_index&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:comment_likes&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="ss"&gt;:comment_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:user_id&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="ss"&gt;name:&lt;/span&gt; &lt;span class="ss"&gt;:comment_id_user_id_unique_index&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now we add the schema definitions;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# User.ex&lt;/span&gt;

 &lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="s2"&gt;"users"&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:password&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;virtual:&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:password_hash&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:username&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:email&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
    &lt;span class="n"&gt;many_to_many&lt;/span&gt; &lt;span class="ss"&gt;:liked_comments&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;Comment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;join_through:&lt;/span&gt; &lt;span class="no"&gt;CommentLike&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;on_replace:&lt;/span&gt; &lt;span class="ss"&gt;:delete&lt;/span&gt;

    &lt;span class="n"&gt;timestamps&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# User.ex&lt;/span&gt;

 &lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="s2"&gt;"users"&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:password&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;virtual:&lt;/span&gt; &lt;span class="no"&gt;true&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:password_hash&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:username&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:email&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
    &lt;span class="n"&gt;many_to_many&lt;/span&gt; &lt;span class="ss"&gt;:liked_comments&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;Comment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;join_through:&lt;/span&gt; &lt;span class="no"&gt;CommentLike&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;on_replace:&lt;/span&gt; &lt;span class="ss"&gt;:delete&lt;/span&gt;

    &lt;span class="n"&gt;timestamps&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Comment.ex&lt;/span&gt;

  &lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="s2"&gt;"comments"&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:body&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
    &lt;span class="n"&gt;belongs_to&lt;/span&gt; &lt;span class="ss"&gt;:post&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;Post&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;references:&lt;/span&gt; &lt;span class="ss"&gt;:id&lt;/span&gt;
    &lt;span class="n"&gt;belongs_to&lt;/span&gt; &lt;span class="ss"&gt;:owner&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;User&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;references:&lt;/span&gt; &lt;span class="ss"&gt;:id&lt;/span&gt;
    &lt;span class="n"&gt;many_to_many&lt;/span&gt; &lt;span class="ss"&gt;:liked_by&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;User&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;join_through:&lt;/span&gt; &lt;span class="no"&gt;CommentLike&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;on_replace:&lt;/span&gt; &lt;span class="ss"&gt;:delete&lt;/span&gt;

    &lt;span class="n"&gt;timestamps&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# Post.ex&lt;/span&gt;

  &lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="s2"&gt;"posts"&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;field&lt;/span&gt; &lt;span class="ss"&gt;:title&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:string&lt;/span&gt;
    &lt;span class="n"&gt;belongs_to&lt;/span&gt; &lt;span class="ss"&gt;:owner&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;User&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;references:&lt;/span&gt; &lt;span class="ss"&gt;:id&lt;/span&gt;
    &lt;span class="n"&gt;has_many&lt;/span&gt; &lt;span class="ss"&gt;:comments&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;Comment&lt;/span&gt;

    &lt;span class="n"&gt;timestamps&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;finally we must also have a schema for the relation itself, i have skipped this part while i was first implementing it, just because i thought it was auto done, and no need for it but this was a mistake and I've learned it is actually necessary. So lets create it like;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# comment_like.ex&lt;/span&gt;

  &lt;span class="nv"&gt;@primary_key&lt;/span&gt; &lt;span class="no"&gt;false&lt;/span&gt;
  &lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="s2"&gt;"comment_likes"&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;belongs_to&lt;/span&gt; &lt;span class="ss"&gt;:user&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;User&lt;/span&gt;
    &lt;span class="n"&gt;belongs_to&lt;/span&gt; &lt;span class="ss"&gt;:comment&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;Comment&lt;/span&gt;

    &lt;span class="n"&gt;timestamps&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;OK the model is set, and the rest is easy; I'll give a sample implementation for liking a comment and getting comment list along with the attribute, &lt;code&gt;is_liked&lt;/code&gt; which is if the current user is liked the comment or not.&lt;/p&gt;

&lt;p&gt;Liking a comment;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# comment_controller.ex&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;like&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;%{&lt;/span&gt;&lt;span class="s2"&gt;"comment_id"&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;comment_id&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="no"&gt;Context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;like_comment&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;assigns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;current_user&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;comment_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
      &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="ss"&gt;:ok&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt; &lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:ok&lt;/span&gt;
      &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="ss"&gt;:error&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt; &lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:error&lt;/span&gt;
    &lt;span class="k"&gt;end&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# context.ex&lt;/span&gt;


  &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;like_comment&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;current_user&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;comment_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;current_user&lt;/span&gt;
    &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;Repo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;preload&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:liked_comments&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;Ecto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Changeset&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;change&lt;/span&gt;
    &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;Ecto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="no"&gt;Changeset&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;put_assoc&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="ss"&gt;:liked_comments&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;get_comment!&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;comment_id&lt;/span&gt;&lt;span class="p"&gt;)])&lt;/span&gt;
    &lt;span class="o"&gt;|&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;Repo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;update&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Getting the list of comments for the post;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight elixir"&gt;&lt;code&gt;&lt;span class="c1"&gt;# context.ex&lt;/span&gt;

 &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="n"&gt;get_commnets&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;current_user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;post_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt;
    &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;from&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="no"&gt;Post&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="ss"&gt;join:&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;assoc&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;:comments&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
      &lt;span class="ss"&gt;left_join:&lt;/span&gt; &lt;span class="n"&gt;cl&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="no"&gt;CommentLike&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="ss"&gt;on:&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;cl&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;comment_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="ss"&gt;left_join:&lt;/span&gt; &lt;span class="n"&gt;u&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="no"&gt;User&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="ss"&gt;on:&lt;/span&gt; &lt;span class="n"&gt;cl&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;u&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="ow"&gt;and&lt;/span&gt; &lt;span class="n"&gt;u&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="o"&gt;^&lt;/span&gt;&lt;span class="n"&gt;current_user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="ss"&gt;where:&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="o"&gt;^&lt;/span&gt;&lt;span class="n"&gt;post_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="ss"&gt;select:&lt;/span&gt; &lt;span class="p"&gt;%{&lt;/span&gt;&lt;span class="ss"&gt;comment:&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="ss"&gt;is_liked:&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;is_nil&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;u&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)}&lt;/span&gt;

    &lt;span class="no"&gt;Repo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;all&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;end&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That's it.&lt;/p&gt;

&lt;p&gt;I strongly suggest &lt;code&gt;iex&lt;/code&gt;(elixir interactive shell) for trying, learning or debugging your ecto models. Just issue &lt;code&gt;iex -S mix&lt;/code&gt; in your project folder and you are ready to go, you can interact with your application in console. &lt;/p&gt;

&lt;p&gt;Lastly, elixir has a welcoming community. I've learned a lot from the posts and &lt;a href="https://discord.gg/u62wMu" rel="noopener noreferrer"&gt;discord channel&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;Here are a few resources for learning elixir and phoenix.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://elixir-lang.org/" rel="noopener noreferrer"&gt;Elixir home page&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://elixirschool.com/en/" rel="noopener noreferrer"&gt;Elixir school&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://elixirforum.com/t/elixir-blog-posts/150" rel="noopener noreferrer"&gt;Elixir blog&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://github.com/elixir-lang/elixir" rel="noopener noreferrer"&gt;Elixir repository&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://www.phoenixframework.org/" rel="noopener noreferrer"&gt;Phoenix&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://hexdocs.pm/ecto/Ecto.html" rel="noopener noreferrer"&gt;Ecto docs&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://elixircasts.io/" rel="noopener noreferrer"&gt;Elixir casts&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://discord.gg/u62wMu" rel="noopener noreferrer"&gt;Discord channel&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Thanks for reading i hope it'll help.&lt;/p&gt;

</description>
      <category>elixir</category>
      <category>ecto</category>
      <category>phoenix</category>
    </item>
    <item>
      <title>Streaming Analytics With Confluent Platform and Debezium</title>
      <dc:creator>Ceyhun Kerti</dc:creator>
      <pubDate>Mon, 21 Oct 2019 16:23:30 +0000</pubDate>
      <link>https://dev.to/ceyhunkerti/streaming-analytics-with-confluent-platform-and-debezium-2abb</link>
      <guid>https://dev.to/ceyhunkerti/streaming-analytics-with-confluent-platform-and-debezium-2abb</guid>
      <description>&lt;p&gt;This article is for those who wants to implement a real time analytics platform, but have the data stored initially in database. There are many documents on web describing, building a streaming platform, but most of them assume that the data is being ingested to Kafka in the first place. Unfortunately that's not the case for some of us.&lt;br&gt;
Our example architectural scenario is like this;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F0vc4kck29fk2i14vr5r1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F0vc4kck29fk2i14vr5r1.png" alt="Example Scenario" width="800" height="339"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Our data is initially stored in Oracle database, in other words application layer persists the user transactions directly to database instead of sending them to a queue like Kafka. So how do we convert the static data to a stream? Lucky for us there are some methods that utilizes the CDC(change data capture) logic. Since in our example scenario the database is Oracle we will only talk about the options for Oracle. Hopefully there are even more options for other databases for capturing real time data. &lt;/p&gt;

&lt;p&gt;Currently there are two options for capturing data from Oracle database and the third one is on the way that we will talk about briefly later in this article.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://www.oracle.com/middleware/technologies/goldengate.html" rel="noopener noreferrer"&gt;Oracle GoldenGate&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.oracle.com/cd/E11882_01/server.112/e16545/xstrm_intro.htm#XSTRM72647" rel="noopener noreferrer"&gt;Oracle XStream API&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  GoldenGate
&lt;/h3&gt;

&lt;p&gt;Oracle had acquired GoldenGate back in 2009 and positioned the software in their fusion middleware tier. Basically GoldenGate is a cdc tool that reads the oracle redo logs and replicates the data. Since it is reading from the redo logs it has no extra load on the database. Before GG some of the developers were performing trigger operations on the source systems and replicating the data over dblink to another oracle database, as expected this method had a serious drawbacks including performance, security and stability. A couple of years ago Oracle has released Oracle GoldenGate Big Data Adapter which you can configure a Kafka adapter and integrate GG with Kafka. That being said this adapter is tightly coupled with their BigData Appliance platform.&lt;/p&gt;
&lt;h3&gt;
  
  
  XStream API
&lt;/h3&gt;

&lt;p&gt;XStream API is released as a new feature with Oracle database 11g Release 2 (11.2). Since then this feature is included in every other successor versions. XStream API basically consists of oracle database components, such as inbound and outbound servers, and an application programming interface API. This interface enable client applications to receive data changes from an Oracle database and send data changes to an Oracle database. Received changes can be shared with other other systems, including non Oracle products, file systems or third party software applications of any kind.&lt;/p&gt;

&lt;p&gt;So which one should we use? The answer depends on your current architecture. If you are using Oracle BDA you may consider using GG Big Data adapter. Otherwise you may check out XStream API and learn how it works. One important thing should be noted here is; sadly both of the approaches require Oracle GoldenGate licence. Yes you read it correct, even if you use XStream you should have a GG licence.&lt;/p&gt;
&lt;h3&gt;
  
  
  Debezium
&lt;/h3&gt;

&lt;p&gt;Alright lets get back to our sample scenario and ask ourselves; i don't have BDA and don't want to install GG, so which one should i use? Actually we do not use either of them directly. To come to our rescue, there is an open source &lt;a href="https://debezium.io/" rel="noopener noreferrer"&gt;RedHat Debezium&lt;/a&gt; platform for change data capture from various databases including Oracle. Debezium has a plugin for Oracle database which uses XStream API and simplifies many things for us. IMHO it is much more simpler then setting up GG and integrating with Kafka. But remember that, since it uses XStream API, you should have GG licence in any case. As we said before there is an upcoming third option ,aside from GG and XStream Api, inside Debezium which utilizes Oracle LogMiner that does not require a Oracle licence. This feature is being implemented as of writing this article, you may check &lt;a href="https://github.com/debezium/debezium-incubator/tree/master/debezium-connector-oracle" rel="noopener noreferrer"&gt;github repo&lt;/a&gt; and &lt;a href="https://gitter.im/debezium/dev" rel="noopener noreferrer"&gt;gitter&lt;/a&gt; channel for more information. Especially i recommend gitter channel for any kind of issues and questions for Debezium. You can contact directly to product owners and developers, who are very friendly and helpful.&lt;/p&gt;
&lt;h3&gt;
  
  
  Integration
&lt;/h3&gt;

&lt;p&gt;So let's get start to integrate Oracle, Debezium and Confluent. First of all we need to prepare our database so that we can receive change records. &lt;/p&gt;

&lt;p&gt;Enable GoldenGate replication and archive log mode, you may need to change below paths according to your installation.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;sqlplus sys as sysdba
alter system &lt;span class="nb"&gt;set &lt;/span&gt;db_recovery_file_dest_size &lt;span class="o"&gt;=&lt;/span&gt; 5G&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="nb"&gt;mkdir&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; /u01/oracle/oradata/recovery_area
alter system &lt;span class="nb"&gt;set &lt;/span&gt;db_recovery_file_dest &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'/u01/oracle/oradata/recovery_area'&lt;/span&gt; &lt;span class="nv"&gt;scope&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;spfile&lt;span class="p"&gt;;&lt;/span&gt;
alter system &lt;span class="nb"&gt;set &lt;/span&gt;&lt;span class="nv"&gt;ENABLE_GOLDENGATE_REPLICATION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nb"&gt;true&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
shutdown immediate
startup mount
alter database archivelog&lt;span class="p"&gt;;&lt;/span&gt;
alter database open&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="nt"&gt;--Should&lt;/span&gt; show &lt;span class="s2"&gt;"Database log mode: Archive Mode"&lt;/span&gt;
archive log list
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create &lt;strong&gt;xstrm&lt;/strong&gt; and &lt;strong&gt;xstrmadmin&lt;/strong&gt; users for managing connections to oracle.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;TABLESPACE&lt;/span&gt; &lt;span class="n"&gt;xstream_adm_tbs&lt;/span&gt; &lt;span class="n"&gt;DATAFILE&lt;/span&gt; &lt;span class="s1"&gt;'/u01/oracle/oradata/xstream_adm_tbs.dbf'&lt;/span&gt; &lt;span class="k"&gt;SIZE&lt;/span&gt; &lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="n"&gt;M&lt;/span&gt; &lt;span class="n"&gt;REUSE&lt;/span&gt; &lt;span class="n"&gt;AUTOEXTEND&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;MAXSIZE&lt;/span&gt; &lt;span class="n"&gt;UNLIMITED&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;USER&lt;/span&gt; &lt;span class="n"&gt;xstrmadmin&lt;/span&gt; &lt;span class="n"&gt;IDENTIFIED&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;xsa&lt;/span&gt;
&lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="n"&gt;TABLESPACE&lt;/span&gt; &lt;span class="n"&gt;xstream_adm_tbs&lt;/span&gt;
&lt;span class="n"&gt;QUOTA&lt;/span&gt; &lt;span class="n"&gt;UNLIMITED&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;xstream_adm_tbs&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;GRANT&lt;/span&gt; &lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;SESSION&lt;/span&gt; &lt;span class="k"&gt;TO&lt;/span&gt; &lt;span class="n"&gt;xstrmadmin&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;TABLESPACE&lt;/span&gt; &lt;span class="n"&gt;xstream_tbs&lt;/span&gt; &lt;span class="n"&gt;DATAFILE&lt;/span&gt; &lt;span class="s1"&gt;'/u01/oracle/oradata/xstream_tbs.dbf'&lt;/span&gt; &lt;span class="k"&gt;SIZE&lt;/span&gt; &lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="n"&gt;M&lt;/span&gt; &lt;span class="n"&gt;REUSE&lt;/span&gt; &lt;span class="n"&gt;AUTOEXTEND&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;MAXSIZE&lt;/span&gt; &lt;span class="n"&gt;UNLIMITED&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;USER&lt;/span&gt; &lt;span class="n"&gt;xstrm&lt;/span&gt; &lt;span class="n"&gt;IDENTIFIED&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;xs&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="n"&gt;TABLESPACE&lt;/span&gt; &lt;span class="n"&gt;xstream_tbs&lt;/span&gt; &lt;span class="n"&gt;QUOTA&lt;/span&gt; &lt;span class="n"&gt;UNLIMITED&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;xstream_tbs&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;


&lt;span class="k"&gt;GRANT&lt;/span&gt; &lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;SESSION&lt;/span&gt; &lt;span class="k"&gt;TO&lt;/span&gt; &lt;span class="n"&gt;xstrm&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;GRANT&lt;/span&gt; &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;V_&lt;/span&gt;&lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="k"&gt;DATABASE&lt;/span&gt; &lt;span class="k"&gt;to&lt;/span&gt; &lt;span class="n"&gt;xstrm&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;GRANT&lt;/span&gt; &lt;span class="n"&gt;FLASHBACK&lt;/span&gt; &lt;span class="k"&gt;ANY&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="k"&gt;TO&lt;/span&gt; &lt;span class="n"&gt;xstrm&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;


&lt;span class="k"&gt;BEGIN&lt;/span&gt;
   &lt;span class="n"&gt;DBMS_XSTREAM_AUTH&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GRANT_ADMIN_PRIVILEGE&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="n"&gt;grantee&lt;/span&gt;                 &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="s1"&gt;'xstrmadmin'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="n"&gt;privilege_type&lt;/span&gt;          &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="s1"&gt;'CAPTURE'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
      &lt;span class="n"&gt;grant_select_privileges&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;TRUE&lt;/span&gt;
   &lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="k"&gt;END&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create oracle outbound server with the tables you want to track.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;DECLARE&lt;/span&gt;
  &lt;span class="n"&gt;tables&lt;/span&gt;  &lt;span class="n"&gt;DBMS_UTILITY&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UNCL_ARRAY&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="n"&gt;schemas&lt;/span&gt; &lt;span class="n"&gt;DBMS_UTILITY&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UNCL_ARRAY&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;BEGIN&lt;/span&gt;
  &lt;span class="n"&gt;schemas&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'my_schema'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="n"&gt;tables&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'ABC_CUSTOMER_PROPOSAL'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="n"&gt;tables&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'ABC_INPUT_PARSE'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
  &lt;span class="n"&gt;tables&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'ABC_PAR_VALUE'&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

  &lt;span class="n"&gt;DBMS_XSTREAM_ADM&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CREATE_OUTBOUND&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="k"&gt;server_name&lt;/span&gt;     &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt;  &lt;span class="s1"&gt;'dbzxout'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;table_names&lt;/span&gt;     &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt;  &lt;span class="n"&gt;tables&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;schema_names&lt;/span&gt;    &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt;  &lt;span class="n"&gt;schemas&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="k"&gt;END&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Alright that is it for oracle part. Now lets download the required software.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Download &lt;a href="https://www.confluent.io" rel="noopener noreferrer"&gt;Conflunet&lt;/a&gt; platform&lt;/li&gt;
&lt;li&gt;Download &lt;a href="https://www.oracle.com/database/technologies/instant-client.html" rel="noopener noreferrer"&gt;Oracle Instant Client&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Download &lt;a href="https://www.oracle.com/technetwork/java/javase/downloads/index.html" rel="noopener noreferrer"&gt;JDK&lt;/a&gt; if you don't already have&lt;/li&gt;
&lt;li&gt;Download &lt;a href="https://insomnia.rest/" rel="noopener noreferrer"&gt;Insomnia Rest Client&lt;/a&gt; and install it&lt;/li&gt;
&lt;li&gt;Download &lt;a href="https://www.oracle.com/database/technologies/appdev/jdbc-downloads.html" rel="noopener noreferrer"&gt;Oracle JDBC Driver&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Download following jar files from &lt;a href="https://maven.apache.org/" rel="noopener noreferrer"&gt;maven&lt;/a&gt;; 

&lt;ul&gt;
&lt;li&gt;ANTLR 4 Runtime &lt;/li&gt;
&lt;li&gt;Debezium Connector For Oracle &lt;/li&gt;
&lt;li&gt;Debezium Core&lt;/li&gt;
&lt;li&gt;Debezium ANTLR DDL Parsers&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;p&gt;Now create a directory that will contain all the work we'll do here; BTW you can extract above downloaded files anywhere you like, but for simpliciy we will put them all under stream-demo folder.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;mkdir stream-demo&lt;/code&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Extract JDK under &lt;code&gt;stream-demo/jdk&lt;/code&gt; &lt;/li&gt;
&lt;li&gt;Extract Confuent to &lt;code&gt;stream-demo/confluent&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Extract Instant Clinet to &lt;code&gt;stream-demo/instant-client&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Create a directory &lt;code&gt;stream-demo/confluent/share/java/kafka-connect-debezium&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Place the following jars under &lt;code&gt;stream-demo/confluent/share/java/kafka-connect-debezium&lt;/code&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;4 Runtime &lt;/li&gt;
&lt;li&gt;Debezium Connector For Oracle &lt;/li&gt;
&lt;li&gt;Debezium Core &lt;/li&gt;
&lt;li&gt;Debezium ANTLR DDL Parsers &lt;/li&gt;
&lt;li&gt;Oracle JDBC Driver&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;&lt;p&gt;Also copy &lt;code&gt;xstreams.jar&lt;/code&gt; from &lt;code&gt;stream-demo/instant-client&lt;/code&gt; to &lt;code&gt;stream-demo/confluent/share/java/kafka-connect-debezium&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;

&lt;li&gt;&lt;p&gt;Add &lt;code&gt;LD_LIBRARY_PATH&lt;/code&gt; and &lt;code&gt;JAVA_HOME&lt;/code&gt; to your &lt;code&gt;.profile&lt;/code&gt; like this;&lt;br&gt;&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;

&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;LD_LIBRARY_PATH&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nv"&gt;$LD_LIBRARY_PATH&lt;/span&gt;:/full/path/to/stream-demo/instant-client
&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;JAVA_HOME&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;/full/path/to/stream-demo/jdk
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Save and quit then execute; &lt;code&gt;source ~/.profile&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Go to &lt;code&gt;stream-demo/confluent/bin&lt;/code&gt; and execute &lt;code&gt;./confluent start&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Ok we are almost done; &lt;br&gt;
Open Insomnia rest client and create a work-space.&lt;br&gt;
Add a new post request with the following payload&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"gds"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="nl"&gt;"config"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"connector.class"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.debezium.connector.oracle.OracleConnector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"tasks.max"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"database.server.name"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"ORCL"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"database.hostname"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"localhost"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"database.port"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1530"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"database.user"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"xstrmadm"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"database.password"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"oracle"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"database.dbname"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"ORCL"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"database.tablename.case.insensitive"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"database.oracle.version"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"11"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"table.whitelist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"ORCL.MY_SCHEMA.ABC_INPUT_PARSE,ORCL.MY_SCHEMA.ABC_PAR_VALUE,ORCL.MY_SCHEMA.ABC_CUSTOMER_PROPOSAL"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"database.out.server.name"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"dbzxout"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"database.history.kafka.bootstrap.servers"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"localhost:9092"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"database.history.kafka.topic"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"schema-changes.inventory"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"decimal.handling.mode"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"precise"&lt;/span&gt;&lt;span class="w"&gt;
 &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Change the above parameters according to your installation.&lt;/p&gt;

&lt;p&gt;The URL for the post request is;&lt;br&gt;
&lt;code&gt;http://localhost:8083/connectors&lt;/code&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Send the post request. &lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Send a get request to &lt;code&gt;http://localhost:8083/connectors/gds/status&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;You should see something like;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"gds"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"connector"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"state"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"RUNNING"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"worker_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"127.0.1.1:8083"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"tasks"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"state"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"RUNNING"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"worker_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"127.0.1.1:8083"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"source"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Ok now open ksql client;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;cd &lt;/span&gt;stream-demo/confluent/bin
./ksql
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If your tables are receiving some some &lt;code&gt;dml&lt;/code&gt; already, you should see the topics by executing;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;LIST TOPICS;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;If you do not see anything try to insert, update or delete some records to your tables and then exec &lt;code&gt;LIST TOPICS;&lt;/code&gt; again.&lt;/p&gt;

&lt;p&gt;We have integrated &lt;code&gt;oracle&lt;/code&gt; and &lt;code&gt;kafka&lt;/code&gt;. Rest of the work is relatively easier. If we look our example architecture there is a python layer in which we read &lt;code&gt;kafka&lt;/code&gt; topics with &lt;a href="https://github.com/confluentinc/confluent-kafka-python" rel="noopener noreferrer"&gt;confluent python client&lt;/a&gt; and create streams from the topics; An example stream is created like this;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;streams.ksql_client&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;KSQLClient&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;admin&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ConfluentAdmin&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CustomerMaxSysId&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;KSQLClient&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ConfluentAdmin&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;kwargs&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getLogger&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;streams.gr_large_none&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;super&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;kwargs&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;create&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;S_CUSTOMER_MAX_SYS_ID&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Creating stream &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;is_stream_exists&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Stream already exists: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;

        &lt;span class="n"&gt;q&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
            create stream &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; (
                process_time    bigint,
                customer_no     varchar,
                sys_id          varchar,
                proposal_no     varchar,
                customer_type   varchar,
                flow_id         integer
            )
            WITH (kafka_topic=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;CUSTOMER_MAX_SYS_ID&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;, value_format=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;json&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;, timestamp=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;process_time&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;)
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ksql_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ksql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;q&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Created stream &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We also use &lt;a href="https://github.com/bryanyang0528/ksql-python" rel="noopener noreferrer"&gt;python ksql client&lt;/a&gt; to interact with ksql-rest api. You can also use rest api directly but using the ksql client api makes it more expressive.&lt;/p&gt;

&lt;p&gt;Without getting into full detail in this layer we create streams both from kafka topics and by joining streams. An example stream-stream join can be like this;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;streams.ksql_client&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;KSQLClient&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;admin&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ConfluentAdmin&lt;/span&gt;

&lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;GeneralLimitOutZero&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;KSQLClient&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ConfluentAdmin&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;kwargs&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getLogger&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;streams.gr_large_none&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;super&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;__init__&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;kwargs&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;create&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;S_GENERAL_LIMIT_OUT_ZERO&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Creating stream &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;is_stream_exists&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
            &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Stream already exists: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;

        &lt;span class="n"&gt;q&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
            create stream &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;
            WITH (kafka_topic=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;, value_format=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;JSON&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;)
            as
            select
                s.customer_no,
                s.sys_id,
                s.proposal_id,
                o.out_value
            from
                S_GR_LARGE_MODEL_ACTION s
            inner join S_OUTPUT_PARSE o within 15 days
                on s.sys_id = o.ref_no
            where
                s.out_value  = &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;0&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt; and
                o.out_param  = &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;LIMIT&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;

        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ksql_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ksql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;q&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Created stream &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This layer is like an &lt;code&gt;ETL&lt;/code&gt; process in a standard &lt;code&gt;DWH&lt;/code&gt; except we use data streams instead of static data.&lt;/p&gt;

&lt;p&gt;Next after creating streams, we can listen for data and take necessary actions or just report them in a reporting layer.&lt;/p&gt;

&lt;p&gt;In our scenario we store the received data in redis. We use redis &lt;a href="https://github.com/andymccurdy/redis-py" rel="noopener noreferrer"&gt;python client&lt;/a&gt;. And also we publish our received events to a redis channel.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;        &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;gds:alert:gr_large_none:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;customer_no&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;sys_id&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;proposal_id&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;__set_ttl&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Setting key &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connection&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;hmset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;channel&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;channel:gds:alert:gr_large_none&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Publishing key to &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;channel&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;self&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;connection&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;channel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can publish events to a kafka topic instead of redis channel. We have chosen redis channel here so the next layers only interacts with redis instead of both redis and kafka.&lt;/p&gt;

&lt;p&gt;The next and the final part is listening redis channels and producing actions; In this layer we have used &lt;code&gt;go&lt;/code&gt; and before you ask there is no special purpose of choosing &lt;code&gt;go&lt;/code&gt; over &lt;code&gt;python&lt;/code&gt;, we have just wanted to demonstrate a heterogeneous, flexible environment with different technologies.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;Run&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;

    &lt;span class="n"&gt;logrus&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Attaching redis channels ..."&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;pubsub&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Subscribe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;channelNames&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;...&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;pubsub&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;pubsub&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Receive&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nb"&gt;panic&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;ch&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;pubsub&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;anomalies&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;collectAnomalies&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;channel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;keys&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;anomalies&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;anomalyLimit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;channel&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;notify&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;channel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;anomalies&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;channel&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;{}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;ch&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;ca&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;channelAlias&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;
        &lt;span class="n"&gt;logrus&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ca&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;" : Received message "&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ok&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;anomalies&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="n"&gt;ok&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;keys&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="n"&gt;anomalies&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;keys&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="n"&gt;anomalies&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;util&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AppendUniqueStr&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;anomalies&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;logrus&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Debug&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ca&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;" : Number of anomalies "&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;anomalies&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;]))&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;anomalies&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;anomalyLimit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;notify&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;anomalies&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
            &lt;span class="n"&gt;anomalies&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;{}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Conclusion
&lt;/h3&gt;

&lt;p&gt;The basic part of this scenario is actually integrating &lt;code&gt;oracle&lt;/code&gt; and &lt;code&gt;kafka&lt;/code&gt; using &lt;code&gt;debezium&lt;/code&gt;. The later steps can be implemented in many different ways depending the case.&lt;/p&gt;

&lt;p&gt;Thanks for reading.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>debezium</category>
      <category>oracle</category>
    </item>
  </channel>
</rss>
