<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Mohammad Arab Anvari</title>
    <description>The latest articles on DEV Community by Mohammad Arab Anvari (@anvaari).</description>
    <link>https://dev.to/anvaari</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F921549%2F7dd9882c-cec8-4fb2-bd09-779f3a21f16e.jpeg</url>
      <title>DEV Community: Mohammad Arab Anvari</title>
      <link>https://dev.to/anvaari</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/anvaari"/>
    <language>en</language>
    <item>
      <title>How to Upgrade Kafka from 1.1.1 with Zero-Downtime: An Applicable Approach</title>
      <dc:creator>Mohammad Arab Anvari</dc:creator>
      <pubDate>Fri, 29 Mar 2024 08:25:54 +0000</pubDate>
      <link>https://dev.to/anvaari/how-to-upgrade-kafka-from-111-with-zero-downtime-an-applicable-approach-3n05</link>
      <guid>https://dev.to/anvaari/how-to-upgrade-kafka-from-111-with-zero-downtime-an-applicable-approach-3n05</guid>
      <description>&lt;p&gt;As a data engineer or, more specifically, data platform engineer, a service with high dependency may be handed over to you. Upgrading such a service is a terrifying process. Suppose that service is Kafka, and it's the main component of your data stack at the company. However, the solution isn't ignoring the complexity because every bug fix or new feature can save you from downtime and help you increase the performance of the services. So, what is the solution? How can we ensure all services that depend on Kafka work fine after the upgrade? In this post, I will share my experience through this process.&lt;/p&gt;

&lt;h2&gt;
  
  
  Main concerns
&lt;/h2&gt;

&lt;p&gt;When we talk about services like Kafka, we know many producers and consumers are in between. So, what happens to them after an upgrade? Do they continue to produce/consume? What about the schema registry and other components that depend on Kafka? So, one of the main concerns is the healthiness of the dependent element.&lt;br&gt;
Also, we want to upgrade Kafka for two significant versions; how should we check deprecated configs? Should I read all the changelogs one by one? There is a better approach that minimizes the time spent and the probability of downtime. &lt;/p&gt;

&lt;h2&gt;
  
  
  Proposed approach
&lt;/h2&gt;

&lt;p&gt;Honestly, every time I think about Docker, I wonder what a beautiful tool this is :D You know? Amazingly, you can independently set up a whole stack in a separate network with tools like docker-compose. &lt;/p&gt;

&lt;p&gt;A better approach is to use Docker to simulate production services in a safe environment. We can set up a whole stack with the same configs but fewer resources, simulate upgrades, and check each component's behavior.&lt;/p&gt;

&lt;h2&gt;
  
  
  Applied approach for Kafka
&lt;/h2&gt;

&lt;p&gt;To simulate the upgrade process for Kafka, I am supposed to create a stack including these components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Zookeeper Instances -&amp;gt; Coordinator for Kafka Cluster&lt;/li&gt;
&lt;li&gt;Kafka Instances -&amp;gt; Main component&lt;/li&gt;
&lt;li&gt;Schema Registry -&amp;gt; Persist schema of produced messages&lt;/li&gt;
&lt;li&gt;Kafka UI -&amp;gt; Monitor Kafka cluster and see incoming messages in topics&lt;/li&gt;
&lt;li&gt;Producers -&amp;gt; Python code to produce data into Kafka topic in Avro format.&lt;/li&gt;
&lt;li&gt;Consumer -&amp;gt; Python code to consume data produced by &lt;code&gt;Producer&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Clickhouse -&amp;gt; Analytical database to store data coming from Kafka&lt;/li&gt;
&lt;li&gt;Postgres -&amp;gt; OLTP database stores transactional data&lt;/li&gt;
&lt;li&gt;Postgres Producer -&amp;gt; Python code, which Inserts one record every 0.1 seconds into the &lt;code&gt;Postgres&lt;/code&gt; database&lt;/li&gt;
&lt;li&gt;Debezium -&amp;gt; Capture each change in &lt;code&gt;Postgres&lt;/code&gt; and send it to the corresponding Kafka topic in Avro format.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Now, it's time to prepare the appropriate &lt;code&gt;docker-compose.yaml&lt;/code&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Implement detail
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Zookeeper&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Image: &lt;a href="https://hub.docker.com/_/zookeeper"&gt;Official Image&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Configs:

&lt;ul&gt;
&lt;li&gt;Mount &lt;code&gt;zoo.cfg&lt;/code&gt; into the container&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_dataFileManagement"&gt;&lt;code&gt;myid&lt;/code&gt; and data directory&lt;/a&gt; created using &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/zookeeper/zookeeper_conf_creator.py"&gt;&lt;code&gt;zookeeper_conf_creator.py&lt;/code&gt;&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Kafka&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Image: &lt;a href="https://hub.docker.com/r/bitnami/kafka"&gt;Bitnami Image&lt;/a&gt;

&lt;ul&gt;
&lt;li&gt;Image customized by &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/kafka/Dockerfile-Kafka"&gt;&lt;code&gt;Dockerfile-Kafka&lt;/code&gt;&lt;/a&gt; (&lt;a href="https://hub.docker.com/r/bitnami/kafka"&gt;https://hub.docker.com/r/bitnami/kafka&lt;/a&gt;)&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;Configs:

&lt;ul&gt;
&lt;li&gt;Set as an environment variable&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;server.properties&lt;/code&gt; converted to &lt;code&gt;server.env&lt;/code&gt; using &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/kafka/kafka_env_creator.py"&gt;&lt;code&gt;kafka_env_creator.py&lt;/code&gt;&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;This image didn't support &lt;code&gt;SCRAM-SHA&lt;/code&gt; for authentication. So &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/kafka/libkafka.sh"&gt;&lt;code&gt;libkafka.sh&lt;/code&gt;&lt;/a&gt; (which is bitnami's Kafka library), rewritten.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Schema Registry&lt;/strong&gt;:

&lt;ul&gt;
&lt;li&gt;Image: &lt;a href="https://hub.docker.com/r/confluentinc/cp-schema-registry"&gt;Official Image&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Configs:

&lt;ul&gt;
&lt;li&gt;Set as an environment variable&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;schema-registry.properties&lt;/code&gt; converted to &lt;code&gt;schema-registry.env&lt;/code&gt; using &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/schema-registry/schema_registry_config_creator.py"&gt;&lt;code&gt;schema_registry_config_creator.py&lt;/code&gt;&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Kafka UI&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Image: &lt;a href="https://hub.docker.com/r/provectuslabs/kafka-ui"&gt;Official Image&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Configs:

&lt;ul&gt;
&lt;li&gt;Set as an environment variable&lt;/li&gt;
&lt;li&gt;Directly in &lt;code&gt;docker-compose.yaml&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;*&lt;em&gt;Producer and Consumer *&lt;/em&gt;

&lt;ul&gt;
&lt;li&gt;Image: &lt;a href="https://hub.docker.com/_/python"&gt;Official Python Image&lt;/a&gt;

&lt;ul&gt;
&lt;li&gt;Image customized by &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/python-producer-consumer/Dockerfile-Python"&gt;&lt;code&gt;Dockerfile-Python&lt;/code&gt;&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;Code: &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/python-producer-consumer/producer.py"&gt;&lt;code&gt;producer.py&lt;/code&gt;&lt;/a&gt; and &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/python-producer-consumer/consumer.py"&gt;&lt;code&gt;consumer.py&lt;/code&gt;&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Configs: Set as environment variables directly in &lt;code&gt;docker-compose.yaml&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Clickhouse&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Image: &lt;a href="https://hub.docker.com/r/clickhouse/clickhouse-server"&gt;Official Image&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Tables: Tables DDL defined &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/tree/main/clickhouse/schemas"&gt;here&lt;/a&gt; and then mounted into &lt;code&gt;/docker-entrypoint-initdb.d&lt;/code&gt;

&lt;ul&gt;
&lt;li&gt;For each table in &lt;code&gt;Postgres&lt;/code&gt; three tables are defined here:

&lt;ul&gt;
&lt;li&gt;Base table -&amp;gt; data persist here&lt;/li&gt;
&lt;li&gt;Kafka table -&amp;gt; read data from kafka&lt;/li&gt;
&lt;li&gt;Materialize view -&amp;gt; ship data from the Kafka table into the base table.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;Configs: Default configs used only &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/clickhouse/configs/kafka.xml"&gt;&lt;code&gt;kafka.xml&lt;/code&gt;&lt;/a&gt; mounted into &lt;code&gt;/etc/clickhouse-server/config.d/kafka.xml&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Logs: For debug purposes, logs mounted into &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/tree/main/clickhouse/log"&gt;local directory&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Postgres&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Image: &lt;a href="https://quay.io/debezium/example-postgres"&gt;Debezium Example Image&lt;/a&gt;

&lt;ul&gt;
&lt;li&gt;This Postgres contains sample sale data.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Postgres Producer&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Same as &lt;code&gt;Producer&lt;/code&gt; and &lt;code&gt;Consumer&lt;/code&gt; but &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/python-producer-consumer/postgres-producer.py"&gt;this code&lt;/a&gt; used&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Debezium&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Image: &lt;a href="https://hub.docker.com/r/debezium/connect"&gt;Official Image&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Configs:

&lt;ul&gt;
&lt;li&gt;Set as an environment variable&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;kafka-connect.properties&lt;/code&gt; converted to &lt;code&gt;kafka-connect.env&lt;/code&gt; using &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/debezium/kafka_connect_config_generator.py"&gt;&lt;code&gt;kafka_connect_config_generator.py&lt;/code&gt;&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Some Extra Containers&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;kafka-setup-user&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;It uses the same image as &lt;code&gt;Kafka&lt;/code&gt;; it runs after &lt;code&gt;kafka1&lt;/code&gt; becomes healthy. Some users are created after this container runs (exit with status 0). See them &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/kafka-setup-user/entrypoint.sh"&gt;here&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;It needs one Kafka broker and also a Zookeeper cluster because &lt;code&gt;SCRAM-SHA&lt;/code&gt; needs to persist on Zookeeper.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;strong&gt;kafka-setup-topic&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;It uses the same image as &lt;code&gt;Kafka&lt;/code&gt; and creates some topics. See the list &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/tree/main/kafka-setup-topic"&gt;here&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;strong&gt;submit-connector&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;It use &lt;a href="https://hub.docker.com/r/curlimages/curl"&gt;curl image&lt;/a&gt; to submit &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/submit-connector/register-postgres.json"&gt;this connector&lt;/a&gt; into &lt;code&gt;Debezium&lt;/code&gt;. The connector captures the changes in &lt;code&gt;Postgres&lt;/code&gt;, sends events to &lt;code&gt;Kafka&lt;/code&gt;, and then &lt;code&gt;Clickhouse&lt;/code&gt; consumes the data into appropriate tables.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Some Extra Notes:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;The version of all containers defined in the &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation/blob/main/.env"&gt;&lt;code&gt;.env&lt;/code&gt; file&lt;/a&gt;. You can change them from this file.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Container dependencies are defined accurately. So, if one container depends on another to come up, appropriate &lt;code&gt;healthcheck&lt;/code&gt; and &lt;code&gt;depends_on&lt;/code&gt; conditions are defined for it.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If you take a look at the &lt;code&gt;healthcheck&lt;/code&gt; of containers, for example, kafka, you see this command:&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;   &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; /dev/tcp/kafka1/9092&lt;span class="o"&gt;)&lt;/span&gt; &amp;amp;&amp;gt;/dev/null &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="nb"&gt;exit &lt;/span&gt;0 &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="nb"&gt;exit &lt;/span&gt;1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This shell script helps check the TCP port in a container without &lt;code&gt;telnet&lt;/code&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Simulation Process
&lt;/h3&gt;

&lt;p&gt;To run the simulation, you can follow &lt;a href="https://github.com/snapp-incubator/kafka-upgrade-simulation?tab=readme-ov-file#test-process"&gt;these steps&lt;/a&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Result
&lt;/h3&gt;

&lt;p&gt;All tests were successful. By successful, I mean the producer can still produce messages without errors, and consumers can consume messages without errors. No other criteria were investigated; you can define your metrics for this simulation. Only one problem was seen in this process.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Problems:&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;In &lt;code&gt;Setup Kafka User&lt;/code&gt;: &lt;code&gt;java.lang.ClassNotFoundException: kafka.security.auth.SimpleAclAuthorizer&lt;/code&gt; occurred

&lt;ol&gt;
&lt;li&gt;It deprecated after 2.4.0. See &lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-504+-+Add+new+Java+Authorizer+Interface"&gt;here&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Doc recommends to use &lt;code&gt;kafka.security.authorizer.AclAuthorizer&lt;/code&gt; instead. It's fully compatible with deprecated class, so it was replaced in docker-compose and it worked&lt;/li&gt;
&lt;/ol&gt;


&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Conclusion&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;As there is the official document for upgrading from any version to 3.6.1 (and another previous version), there is no obstacle in this process. Also, our test shows this process works, and we can upgrade our Kafka to whatever version we want.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;This article is a suggestion for the best approach for upgrading highly dependent services. We talked about the details of implementing this process, and then, as we saw in the Result section, one problem was found before upgrading so we can upgrade our Kafka cluster seamlessly, with zero-downtime :)&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>devops</category>
      <category>upgrade</category>
      <category>dataplatform</category>
    </item>
    <item>
      <title>Empowering Your Kafka Connectors: A Guide to Connector Guardian</title>
      <dc:creator>Mohammad Arab Anvari</dc:creator>
      <pubDate>Tue, 27 Feb 2024 18:15:06 +0000</pubDate>
      <link>https://dev.to/anvaari/empowering-your-kafka-connectors-a-guide-to-connector-guardian-2bfb</link>
      <guid>https://dev.to/anvaari/empowering-your-kafka-connectors-a-guide-to-connector-guardian-2bfb</guid>
      <description>&lt;p&gt;Hi there :)&lt;/p&gt;

&lt;p&gt;In this post, I want to introduce you to Connector Guardian. If you've ever found yourself grappling with the management of Kafka Connect connectors, you're in for a treat. Connector Guardian is tailor-made to simplify your life as a developer or operator, providing efficient tools for the seamless management and maintenance of your Kafka Connectors.&lt;/p&gt;

&lt;h2&gt;
  
  
  How It Works
&lt;/h2&gt;

&lt;p&gt;Connector Guardian smoothly interacts with your Kafka Connect cluster using its &lt;a href="https://docs.confluent.io/platform/current/connect/references/restapi.html"&gt;REST API&lt;/a&gt;. In its initial release, version &lt;a href="https://github.com/snapp-incubator/connector-guardian/releases/tag/0.1.0"&gt;0.1.0&lt;/a&gt;, it harnesses the power of &lt;a href="https://github.com/jqlang/jq"&gt;jq&lt;/a&gt; for JSON parsing. Later, transitioning to version &lt;a href="https://github.com/snapp-incubator/connector-guardian/releases/tag/0.2.0"&gt;0.2.0&lt;/a&gt;, Connector Guardian adopts Python's built-in JSON library.&lt;/p&gt;

&lt;h2&gt;
  
  
  Features
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Auto Connector Restart&lt;/strong&gt;: Starting from &lt;a href="https://github.com/snapp-incubator/connector-guardian/releases/tag/0.1.0"&gt;V0.1.0&lt;/a&gt;, Connector Guardian monitors the status of connectors and tasks, automatically restarting them if they fail.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Restart Back Off&lt;/strong&gt;: Introduced in &lt;a href="https://github.com/snapp-incubator/connector-guardian/releases/tag/0.3.0"&gt;V0.3.0&lt;/a&gt;, this feature ensures that restarts occur at increasing time intervals. The initial restart happens immediately, and subsequent restarts are delayed exponentially. This approach allows for efficient issue resolution, even in the face of prolonged network outages. After a configurable number of restarts (&lt;code&gt;MAX_RESTART&lt;/code&gt;), the Guardian stops automatic restarting, leaving it to you for manual intervention.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  How to Add Guardian to Kafka Connect Cluster
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Container Image
&lt;/h3&gt;

&lt;p&gt;You can easily pull the Connector Guardian image from &lt;a href="https://hub.docker.com/r/anvaari/connector-guardian"&gt;Docker Hub&lt;/a&gt;, then run it with &lt;code&gt;docker run&lt;/code&gt; command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker run &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KAFKA_CONNECT_HOST&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;localhost &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KAFKA_CONNECT_PORT&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;8083 &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KAFKA_CONNECT_PROTO&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;http &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KAFKA_CONNECT_USER&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;''&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;KAFKA_CONNECT_PASS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s1"&gt;''&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;ENABLE_BACKOFF&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1 &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;MAX_RESTART&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;7 &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;EXPONENTIAL_RATIO&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;2 &lt;span class="se"&gt;\&lt;/span&gt;
  anvaari/connector-guardian
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Non-Cloud Environments
&lt;/h3&gt;

&lt;p&gt;For deployment on your server, use the provided &lt;a href="https://github.com/snapp-incubator/connector-guardian/blob/main/deploy/docker-compose.yaml"&gt;docker-compose&lt;/a&gt; file. Before deploying the image, ensure that you set the appropriate environment variables in &lt;a href="https://github.com/snapp-incubator/connector-guardian/blob/main/deploy/docker-compose.yaml"&gt;docker-compose.yaml&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;cd &lt;/span&gt;deploy
docker compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Kubernetes or OpenShift
&lt;/h3&gt;

&lt;p&gt;Utilize the provided Helm chart for deployment. Make sure to set the required environment variables in &lt;a href="https://github.com/snapp-incubator/connector-guardian/blob/main/deploy/chart/values.yaml"&gt;values.yaml&lt;/a&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;helm upgrade connector-guardian &lt;span class="nt"&gt;--install&lt;/span&gt; &lt;span class="nt"&gt;-n&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;your_namespace_name&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="nt"&gt;-f&lt;/span&gt; deploy/chart/values.yaml deploy/chart
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once deployed, Connector Guardian runs as a pod, executing &lt;code&gt;connector_guardian.py&lt;/code&gt; every 5 minutes.&lt;/p&gt;

&lt;h3&gt;
  
  
  Environment Variables
&lt;/h3&gt;

&lt;p&gt;To use the Docker image, &lt;a href="https://github.com/snapp-incubator/connector-guardian/blob/main/deploy/docker-compose.yaml"&gt;docker-compose&lt;/a&gt;, or &lt;a href="https://github.com/snapp-incubator/connector-guardian/blob/main/deploy/chart/"&gt;Helm chart&lt;/a&gt;, set the following environment variables:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;KAFKA_CONNECT_HOST&lt;/code&gt;: Default = &lt;code&gt;localhost&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;KAFKA_CONNECT_PORT&lt;/code&gt;: Default = &lt;code&gt;8083&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;KAFKA_CONNECT_PROTO&lt;/code&gt;: Default = &lt;code&gt;http&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;KAFKA_CONNECT_USER&lt;/code&gt;: Default = &lt;code&gt;''&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;KAFKA_CONNECT_PASS&lt;/code&gt;: Default = &lt;code&gt;''&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;ENABLE_BACKOFF&lt;/code&gt;: Default = &lt;code&gt;1&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;MAX_RESTART&lt;/code&gt;: Default = &lt;code&gt;7&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;EXPONENTIAL_RATIO&lt;/code&gt;: Default = &lt;code&gt;2&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  In The End ...
&lt;/h2&gt;

&lt;p&gt;Connector Guardian is your steadfast companion in the realm of Kafka Connect connectors. Whether you are a seasoned developer or an operations expert, this tool streamlines the management of your connectors, offering an automated approach to restarts and intelligent back-off mechanisms.&lt;/p&gt;

&lt;p&gt;As I continue to evolve Connector Guardian, I invite you to be part of this journey. Your feedback, suggestions, and contributions are not only valued but crucial in shaping the future of this open-source project. Let's work together to make Kafka Connector maintenance a seamless experience for all.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Get Involved!&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Contribute to the project on &lt;a href="https://github.com/snapp-incubator/connector-guardian"&gt;GitHub&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;Share your experiences and ideas in the &lt;a href="https://github.com/snapp-incubator/connector-guardian/issues"&gt;issues&lt;/a&gt; section.&lt;/li&gt;
&lt;li&gt;Spread the word - let others in your network know about Connector Guardian.&lt;/li&gt;
&lt;li&gt;Stay tuned for updates and new features!&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>kafkaconnect</category>
      <category>python</category>
      <category>opensource</category>
      <category>devops</category>
    </item>
    <item>
      <title>Getting Valuable Insights from ClickHouse Error Logs using ClickSight</title>
      <dc:creator>Mohammad Arab Anvari</dc:creator>
      <pubDate>Thu, 03 Aug 2023 10:10:18 +0000</pubDate>
      <link>https://dev.to/anvaari/getting-valuable-insights-from-clickhouse-error-logs-using-clicksight-2kkk</link>
      <guid>https://dev.to/anvaari/getting-valuable-insights-from-clickhouse-error-logs-using-clicksight-2kkk</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;When managing a production ClickHouse cluster, you might face numerous challenges. One of them is finding the root cause in case of a crash. During such incidents, querying system tables like &lt;a href="https://clickhouse.com/docs/en/operations/system-tables/errors" rel="noopener noreferrer"&gt;system.errors&lt;/a&gt; or &lt;a href="https://clickhouse.com/docs/en/operations/system-tables/text_log" rel="noopener noreferrer"&gt;system.text_log&lt;/a&gt; is not possible. Hence, we need to search into the &lt;code&gt;clickhouse-server.err.log&lt;/code&gt; file to identify the root cause and address it effectively. Although using Unix tools is the most efficient approach, we should aim to do it as quickly as possible.&lt;/p&gt;

&lt;p&gt;To gain better and faster insights from ClickHouse error logs, I developed an Ansible Playbook named ClickSight. This playbook performs log aggregation on ClickHouse logs from all specified nodes. ClickSight is available under the Apache license on GitHub as the &lt;a href="https://github.com/anvaari/ClickSight" rel="noopener noreferrer"&gt;ClickSight&lt;/a&gt; project. In the next section, we will see how to use it and customize it to suit your needs.&lt;/p&gt;

&lt;p&gt;It's essential to have a monitoring dashboard, such as &lt;a href="https://grafana.com/grafana/dashboards/14192-clickhouse/" rel="noopener noreferrer"&gt;this one&lt;/a&gt;, which can help us identify issues like a high number of mutations or high RAM usage. However, in complex situations, it might not be sufficient to find the root cause, and that's when error logs become crucial.&lt;/p&gt;

&lt;h2&gt;
  
  
  How ClickSight Works
&lt;/h2&gt;

&lt;p&gt;ClickSight leverages various useful Unix commands, such as &lt;code&gt;grep&lt;/code&gt;, &lt;code&gt;cat&lt;/code&gt;, &lt;code&gt;tail&lt;/code&gt;, &lt;code&gt;cut&lt;/code&gt;, and &lt;code&gt;sed&lt;/code&gt;, for working with text files. Additionally, Unix's ability to pass the output of one command to another using pipes (&lt;code&gt;|&lt;/code&gt;) allows us to chain commands together. For example, we can use &lt;code&gt;grep&lt;/code&gt; to find specific lines in a file and then use &lt;code&gt;cut&lt;/code&gt; to select all text after the &lt;code&gt;&amp;gt;&lt;/code&gt; character with &lt;code&gt;cat myfile.txt | grep "some arbitrary phrase" | cut -d"&amp;gt;" -f2&lt;/code&gt;. ClickSight harnesses the power of these Unix commands in an Ansible playbook, making log aggregation a breeze. With ClickSight, you only need to run the playbook, and the results will be available on your local machine.&lt;/p&gt;

&lt;h2&gt;
  
  
  Setting Up ClickSight
&lt;/h2&gt;

&lt;p&gt;To use ClickSight, you need at least one active ClickHouse server and Ansible installed on your system. It's recommended to install it on a Unix-based OS. Additionally, ensure that you have access to the ClickHouse server as a sudo user, or at least your user should have access to the &lt;code&gt;clickhouse-server.err.log&lt;/code&gt; file.&lt;/p&gt;

&lt;p&gt;For detailed setup instructions for ClickSight and Ansible, please refer to the &lt;a href="https://github.com/anvaari/ClickSight#perquisites" rel="noopener noreferrer"&gt;Prerequisites&lt;/a&gt; section of the repository.&lt;/p&gt;

&lt;h2&gt;
  
  
  Running ClickSight
&lt;/h2&gt;

&lt;p&gt;A comprehensive guide on how to run ClickSight can be found in the &lt;a href="https://github.com/anvaari/ClickSight#run-clicksight" rel="noopener noreferrer"&gt;Run ClickSight&lt;/a&gt; section of the repository. Feel free to follow that guide, and if you have any questions, don't hesitate to ask here or on GitHub.&lt;/p&gt;

&lt;h2&gt;
  
  
  Analyzing ClickHouse Error Logs
&lt;/h2&gt;

&lt;p&gt;ClickSight offers five modes of operation:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;All fatal errors in &lt;code&gt;clickhouse-server.err.log&lt;/code&gt; with details: In case of a crash, a &lt;code&gt;fatal&lt;/code&gt; error is likely present, explaining the cause of the crash.&lt;/li&gt;
&lt;li&gt;Timeline of errors: On production systems, there might be numerous errors every minute, making it challenging to track them in log files. ClickSight can help by displaying the timeline of errors extracted from ClickHouse log lines. The error names are based on the &lt;a href="https://clickhouse.com/docs/en/operations/system-tables/text_log" rel="noopener noreferrer"&gt;logger_name&lt;/a&gt;, providing valuable information about the error category.

&lt;ul&gt;
&lt;li&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fygm41n1p6flhfdni85fe.png" alt="Error timeline in ClickSight"&gt;&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;Timeline of errors associated with a &lt;code&gt;query_id&lt;/code&gt;: ClickHouse provides &lt;code&gt;query_id&lt;/code&gt; for some logs associated with executed queries, denoted by &lt;code&gt;{}&lt;/code&gt;. ClickSight can display the timeline of errors linked to specific &lt;code&gt;query_id&lt;/code&gt;s.

&lt;ul&gt;
&lt;li&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffn45m5dj4l2gdw14xal4.png" alt="Timeline of query_id associated errors in ClickSigh"&gt;&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;Timeline of errors associated with a specific &lt;code&gt;query_id&lt;/code&gt;: ClickSight allows you to view all errors and their occurrences associated with a specific &lt;code&gt;query_id&lt;/code&gt;.

&lt;ul&gt;
&lt;li&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fd92062a318r4fpor4rkc.png" alt="Timeline of errors for specific query_id in ClickSight"&gt;&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;Timeline of a specific error: Sometimes, we suspect a particular error and want to know when and how often it occurs. ClickSight can provide a detailed timeline for specific errors.

&lt;ul&gt;
&lt;li&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flodui966kv0tmz6hddf6.png" alt="Timeline of specific error with detail in ClickSight"&gt;&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;h2&gt;
  
  
  Contributing to ClickSight
&lt;/h2&gt;

&lt;p&gt;I welcome and appreciate contributions to the ClickSight project. Whether you want to report issues, suggest improvements, or submit new features, your input is valuable in making ClickSight even more useful.&lt;/p&gt;

&lt;h3&gt;
  
  
  1. Reporting Issues or Feature Requests
&lt;/h3&gt;

&lt;p&gt;If you encounter any problems while using ClickSight or have ideas for new features, please don't hesitate to report them. To do so:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Go to the ClickSight GitHub repository: &lt;a href="https://github.com/anvaari/ClickSight" rel="noopener noreferrer"&gt;ClickSight GitHub repository&lt;/a&gt;.&lt;/li&gt;
&lt;li&gt;Click on the "Issues" tab.&lt;/li&gt;
&lt;li&gt;Click on the green "New Issue" button.&lt;/li&gt;
&lt;li&gt;Provide a descriptive title and detailed description of the issue you encountered or the feature you want to suggest.&lt;/li&gt;
&lt;li&gt;If it's a bug, include steps to reproduce the problem.&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Contributing Code
&lt;/h3&gt;

&lt;p&gt;If you're a developer and want to contribute directly to the ClickSight codebase, follow these steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Fork the ClickSight repository to your GitHub account using the "Fork" button in the top-right corner.&lt;/li&gt;
&lt;li&gt;Clone the forked repository to your local development environment.&lt;/li&gt;
&lt;li&gt;Create a new branch for your contribution: &lt;code&gt;git checkout -b my-feature&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Make your changes and improvements.&lt;/li&gt;
&lt;li&gt;Test your changes thoroughly to ensure they work as expected.&lt;/li&gt;
&lt;li&gt;Commit your changes with clear and concise messages: &lt;code&gt;git commit -m "Add my awesome feature"&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Push the changes to your forked repository: &lt;code&gt;git push origin my-feature&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Create a pull request (PR) by navigating to the original ClickSight repository and clicking on "New Pull Request."&lt;/li&gt;
&lt;li&gt;Describe your changes in the PR, including any relevant information or context.&lt;/li&gt;
&lt;li&gt;I will review your PR, provide feedback, and work with you to merge the changes into the main ClickSight project.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;I'm Mohammad Anvaari, a Data Engineer at Snapp! I'm curious about data engineering and often write about my challenges and experiences on &lt;a href="https://dev.to/anvaari"&gt;my blog&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>ansible</category>
      <category>clickhous</category>
      <category>monitoring</category>
      <category>tutorial</category>
    </item>
    <item>
      <title>Estimate Disk/Service IOPS and Throughput</title>
      <dc:creator>Mohammad Arab Anvari</dc:creator>
      <pubDate>Wed, 12 Jul 2023 15:25:15 +0000</pubDate>
      <link>https://dev.to/anvaari/estimate-diskservice-iops-and-throughput-37a8</link>
      <guid>https://dev.to/anvaari/estimate-diskservice-iops-and-throughput-37a8</guid>
      <description>&lt;h1&gt;
  
  
  Introduction
&lt;/h1&gt;

&lt;p&gt;Sometimes we need to know the current status of the storage usage of a service in order to find any possible bottleneck in storage side. This tutorial describes how to measure current &lt;code&gt;IOPS&lt;/code&gt; and &lt;code&gt;Throughput&lt;/code&gt; on your server.&lt;/p&gt;

&lt;h1&gt;
  
  
  Max &lt;code&gt;IOPS&lt;/code&gt; and &lt;code&gt;Throughput&lt;/code&gt; of storage:
&lt;/h1&gt;

&lt;p&gt;In this section, we want to estimate the maximum &lt;code&gt;IOPS&lt;/code&gt; and &lt;code&gt;Throughput&lt;/code&gt; of storage.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;code&gt;dd&lt;/code&gt; command
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;dd &lt;/span&gt;&lt;span class="k"&gt;if&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;/dev/zero &lt;span class="nv"&gt;of&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;/tmp/disk_test_dd.file &lt;span class="nv"&gt;bs&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;100M &lt;span class="nv"&gt;count&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1 &lt;span class="nv"&gt;oflag&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;dsync
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;By running this command, &lt;code&gt;dd&lt;/code&gt; will read from &lt;code&gt;/dev/zero&lt;/code&gt; (a stream of null bytes) and write 100 megabytes of data to the file &lt;code&gt;/tmp/disk_test_dd.file&lt;/code&gt;. The write operation will be synchronized and physically written to the disk before &lt;code&gt;dd&lt;/code&gt; exits, thanks to the &lt;code&gt;oflag=dsync&lt;/code&gt; option. This command is often used to test the disk write performance or to create files filled with null bytes for various purposes.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Result:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;1+0 records &lt;span class="k"&gt;in
&lt;/span&gt;1+0 records out
104857600 bytes &lt;span class="o"&gt;(&lt;/span&gt;105 MB, 100 MiB&lt;span class="o"&gt;)&lt;/span&gt; copied, 0.22367 s, 469 MB/s
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;As you can see &lt;code&gt;Throughput&lt;/code&gt; of the disk was &lt;code&gt;469 MB/s&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  &lt;code&gt;fio&lt;/code&gt; command (Recommended)
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;fio &lt;span class="nt"&gt;--randrepeat&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1 &lt;span class="nt"&gt;--ioengine&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;libaio &lt;span class="nt"&gt;--direct&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1 &lt;span class="nt"&gt;--gtod_reduce&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1 &lt;span class="nt"&gt;--name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;test_disk_fio &lt;span class="nt"&gt;--filename&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;test_disk_fio &lt;span class="nt"&gt;--bs&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;4k &lt;span class="nt"&gt;--iodepth&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;64 &lt;span class="nt"&gt;--size&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;4G &lt;span class="nt"&gt;--readwrite&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;randrw &lt;span class="nt"&gt;--rwmixread&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;75
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;By running this command, you will initiate a &lt;code&gt;fio&lt;/code&gt; test that performs random read-write I/O operations with a block size of 4 kilobytes, using a 4-gigabyte test file/device. The test will use the &lt;code&gt;libaio&lt;/code&gt; I/O engine with direct I/O enabled. The I/O depth is set to 64, and the reads-to-writes ratio is 75:25. The test results will provide performance metrics and insights into the disk's I/O capabilities under these conditions.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Result:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;test_disk_fio: Laying out IO file &lt;span class="o"&gt;(&lt;/span&gt;1 file / 4096MiB&lt;span class="o"&gt;)&lt;/span&gt;
Jobs: 1 &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;f&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1&lt;span class="o"&gt;)&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;m&lt;span class="o"&gt;(&lt;/span&gt;1&lt;span class="o"&gt;)][&lt;/span&gt;100.0%][r&lt;span class="o"&gt;=&lt;/span&gt;128MiB/s,w&lt;span class="o"&gt;=&lt;/span&gt;41.0MiB/s][r&lt;span class="o"&gt;=&lt;/span&gt;32.8k,w&lt;span class="o"&gt;=&lt;/span&gt;10.7k IOPS][eta 00m:00s]
test_disk_fio: &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;groupid&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0, &lt;span class="nb"&gt;jobs&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1&lt;span class="o"&gt;)&lt;/span&gt;: &lt;span class="nv"&gt;err&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; 0: &lt;span class="nv"&gt;pid&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;68138: Mon May 15 10:52:23 2023
   &lt;span class="nb"&gt;read&lt;/span&gt;: &lt;span class="nv"&gt;IOPS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;37.1k, &lt;span class="nv"&gt;BW&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;145MiB/s &lt;span class="o"&gt;(&lt;/span&gt;152MB/s&lt;span class="o"&gt;)(&lt;/span&gt;3070MiB/21170msec&lt;span class="o"&gt;)&lt;/span&gt;
   bw &lt;span class="o"&gt;(&lt;/span&gt;  KiB/s&lt;span class="o"&gt;)&lt;/span&gt;: &lt;span class="nv"&gt;min&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;105184, &lt;span class="nv"&gt;max&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;246976, &lt;span class="nv"&gt;per&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;100.00%, &lt;span class="nv"&gt;avg&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;148570.26, &lt;span class="nv"&gt;stdev&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;22072.40, &lt;span class="nv"&gt;samples&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;42
   iops        : &lt;span class="nv"&gt;min&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;26296, &lt;span class="nv"&gt;max&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;61744, &lt;span class="nv"&gt;avg&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;37142.55, &lt;span class="nv"&gt;stdev&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;5518.10, &lt;span class="nv"&gt;samples&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;42
  write: &lt;span class="nv"&gt;IOPS&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;12.4k, &lt;span class="nv"&gt;BW&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;48.5MiB/s &lt;span class="o"&gt;(&lt;/span&gt;50.8MB/s&lt;span class="o"&gt;)(&lt;/span&gt;1026MiB/21170msec&lt;span class="o"&gt;)&lt;/span&gt;
   bw &lt;span class="o"&gt;(&lt;/span&gt;  KiB/s&lt;span class="o"&gt;)&lt;/span&gt;: &lt;span class="nv"&gt;min&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;36384, &lt;span class="nv"&gt;max&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;82448, &lt;span class="nv"&gt;per&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;100.00%, &lt;span class="nv"&gt;avg&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;49650.48, &lt;span class="nv"&gt;stdev&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;7312.82, &lt;span class="nv"&gt;samples&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;42
   iops        : &lt;span class="nv"&gt;min&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; 9096, &lt;span class="nv"&gt;max&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;20612, &lt;span class="nv"&gt;avg&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;12412.60, &lt;span class="nv"&gt;stdev&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1828.21, &lt;span class="nv"&gt;samples&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;42
  cpu          : &lt;span class="nv"&gt;usr&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;4.81%, &lt;span class="nv"&gt;sys&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;32.05%, &lt;span class="nv"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;72759, &lt;span class="nv"&gt;majf&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0, &lt;span class="nv"&gt;minf&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;153
  IO depths    : &lt;span class="nv"&gt;1&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.1%, &lt;span class="nv"&gt;2&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.1%, &lt;span class="nv"&gt;4&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.1%, &lt;span class="nv"&gt;8&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.1%, &lt;span class="nv"&gt;16&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.1%, &lt;span class="nv"&gt;32&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.1%, &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt;&lt;span class="nv"&gt;64&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;100.0%
     submit    : &lt;span class="nv"&gt;0&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0%, &lt;span class="nv"&gt;4&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;100.0%, &lt;span class="nv"&gt;8&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0%, &lt;span class="nv"&gt;16&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0%, &lt;span class="nv"&gt;32&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0%, &lt;span class="nv"&gt;64&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0%, &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt;&lt;span class="nv"&gt;64&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0%
     &lt;span class="nb"&gt;complete&lt;/span&gt;  : &lt;span class="nv"&gt;0&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0%, &lt;span class="nv"&gt;4&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;100.0%, &lt;span class="nv"&gt;8&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0%, &lt;span class="nv"&gt;16&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0%, &lt;span class="nv"&gt;32&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0%, &lt;span class="nv"&gt;64&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.1%, &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt;&lt;span class="nv"&gt;64&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0.0%
     issued rwt: &lt;span class="nv"&gt;total&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;785920,262656,0, &lt;span class="nv"&gt;short&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0,0,0, &lt;span class="nv"&gt;dropped&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0,0,0
     latency   : &lt;span class="nv"&gt;target&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0, &lt;span class="nv"&gt;window&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0, &lt;span class="nv"&gt;percentile&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;100.00%, &lt;span class="nv"&gt;depth&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;64

Run status group 0 &lt;span class="o"&gt;(&lt;/span&gt;all &lt;span class="nb"&gt;jobs&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;:
   READ: &lt;span class="nv"&gt;bw&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;145MiB/s &lt;span class="o"&gt;(&lt;/span&gt;152MB/s&lt;span class="o"&gt;)&lt;/span&gt;, 145MiB/s-145MiB/s &lt;span class="o"&gt;(&lt;/span&gt;152MB/s-152MB/s&lt;span class="o"&gt;)&lt;/span&gt;, &lt;span class="nv"&gt;io&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;3070MiB &lt;span class="o"&gt;(&lt;/span&gt;3219MB&lt;span class="o"&gt;)&lt;/span&gt;, &lt;span class="nv"&gt;run&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;21170-21170msec
  WRITE: &lt;span class="nv"&gt;bw&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;48.5MiB/s &lt;span class="o"&gt;(&lt;/span&gt;50.8MB/s&lt;span class="o"&gt;)&lt;/span&gt;, 48.5MiB/s-48.5MiB/s &lt;span class="o"&gt;(&lt;/span&gt;50.8MB/s-50.8MB/s&lt;span class="o"&gt;)&lt;/span&gt;, &lt;span class="nv"&gt;io&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1026MiB &lt;span class="o"&gt;(&lt;/span&gt;1076MB&lt;span class="o"&gt;)&lt;/span&gt;, &lt;span class="nv"&gt;run&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;21170-21170msec

Disk stats &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;read&lt;/span&gt;/write&lt;span class="o"&gt;)&lt;/span&gt;:
  sda: &lt;span class="nv"&gt;ios&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;785117/262419, &lt;span class="nv"&gt;merge&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;0/226, &lt;span class="nv"&gt;ticks&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1027296/246228, &lt;span class="nv"&gt;in_queue&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;1273668, &lt;span class="nv"&gt;util&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;99.60%
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;p&gt;As you see &lt;code&gt;fio&lt;/code&gt; provides more detailed result&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Mean &lt;code&gt;IOPS&lt;/code&gt; for read was  &lt;code&gt;37.1k&lt;/code&gt; and for write was &lt;code&gt;12.4k&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Mean &lt;code&gt;Throughput&lt;/code&gt; for read was &lt;code&gt;152MB/s&lt;/code&gt; and for write was &lt;code&gt;50.8MB/s&lt;/code&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h1&gt;
  
  
  Current &lt;code&gt;IOPS&lt;/code&gt; and &lt;code&gt;Throughput&lt;/code&gt; of storage:
&lt;/h1&gt;

&lt;p&gt;In this section, we want to estimate current &lt;code&gt;IOPS&lt;/code&gt; and &lt;code&gt;Throughput&lt;/code&gt; of storage.&lt;/p&gt;

&lt;h2&gt;
  
  
  With &lt;code&gt;iostat&lt;/code&gt;
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;iostat &lt;span class="nt"&gt;-xdmb&lt;/span&gt; 60 1440
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;By running this command, &lt;code&gt;iostat&lt;/code&gt; will start monitoring and displaying disk statistics for all disk partitions every 60 seconds. The statistics will include information about disk utilization, wait time, service time, and the timestamp when the statistics were collected. The monitoring will continue for approximately 1440 minutes (around 24 hours).&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt;&lt;br&gt;
You can run &lt;code&gt;iostat -xdmb 60 1440 &amp;gt;&amp;gt; iostat_res.txt&lt;/code&gt; to redirect results to a file.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Result:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;05/13/2023 08:22:19 PM
Device            r/s     w/s     rkB/s     wkB/s   rrqm/s   wrqm/s  %rrqm  %wrqm r_await w_await aqu-sz rareq-sz wareq-sz  svctm  %util
loop0            0.00    0.00      0.00      0.00     0.00     0.00   0.00   0.00    0.00    0.00   0.00     1.60     0.00   0.00   0.00
sda            444.91   84.11  26980.23   2880.85     3.43    63.74   0.76  43.11    0.31    1.74   0.10    60.64    34.25   0.15   8.08
sda1             0.00    0.00      0.09      0.00     0.00     0.00  11.23   0.00    0.22   12.28   0.00    41.62     0.50   0.13   0.00
sda2           444.91   84.11  26980.14   2880.85     3.43    63.74   0.76  43.11    0.31    1.74   0.10    60.64    34.25   0.15   8.08
sdb            897.80  739.67  96125.22  35384.91    41.59   944.47   4.43  56.08    0.02    0.03   0.03   107.07    47.84   0.03   5.62
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;You can see the detail of each column in &lt;a href="https://linux.die.net/man/1/iostat"&gt;&lt;code&gt;iostat&lt;/code&gt; Doc&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;At &lt;code&gt;05/13/2023 08:22:19 PM&lt;/code&gt;, &lt;code&gt;sda&lt;/code&gt; has &lt;code&gt;444.91&lt;/code&gt; &lt;code&gt;IOPS&lt;/code&gt; for reading and &lt;code&gt;84.11&lt;/code&gt; for write. (From &lt;code&gt;r/s&lt;/code&gt; and &lt;code&gt;w/s&lt;/code&gt; columns)&lt;/li&gt;
&lt;li&gt;As same, &lt;code&gt;sda&lt;/code&gt; has &lt;code&gt;26980.23 kB/s&lt;/code&gt; &lt;code&gt;Throughput&lt;/code&gt; for reading and &lt;code&gt;2880.85 kB/s&lt;/code&gt; for write. (From &lt;code&gt;rkB/s&lt;/code&gt; and &lt;code&gt;wkB/s&lt;/code&gt; columns)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt;&lt;br&gt;
This is a result of single 60 seconds. If you enter the command same as above, you will see 1440 tables like this after 1 day.&lt;/p&gt;

&lt;h1&gt;
  
  
  Analyze Output in &lt;code&gt;jupyter notebook&lt;/code&gt; [Optional]
&lt;/h1&gt;

&lt;p&gt;In case you have some servers, and you need to analyze disk activity on them, You can use provided Jupyter Notebook to get more insight into the disks on your servers.&lt;/p&gt;

&lt;p&gt;You can see and download the Jupyter Notebook from &lt;a href="https://nbviewer.org/github/anvaari/notebook-share/blob/main/iostat_analysis.ipynb"&gt;this link&lt;/a&gt;&lt;/p&gt;

&lt;h1&gt;
  
  
  Conclusion
&lt;/h1&gt;

&lt;p&gt;With provided commands and corresponding results, you can estimate needs of current service and see whether there is a bottleneck regarding to I/O or not.&lt;/p&gt;

&lt;h1&gt;
  
  
  References
&lt;/h1&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://www.howtouselinux.com/post/check-disk-iops-in-linux"&gt;Calculate &lt;code&gt;IOPS&lt;/code&gt; in linux&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://scoutapm.com/blog/understanding-disk-i-o-when-should-you-be-worried"&gt;Understanding I/O&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;Also &lt;a href="//chat.openai.com"&gt;ChatGPT&lt;/a&gt; :)&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>devops</category>
      <category>tutorial</category>
    </item>
    <item>
      <title>Introduction to Clickhouse at scale</title>
      <dc:creator>Mohammad Arab Anvari</dc:creator>
      <pubDate>Wed, 17 May 2023 08:48:08 +0000</pubDate>
      <link>https://dev.to/anvaari/introduction-to-clickhouse-at-scale-3nmm</link>
      <guid>https://dev.to/anvaari/introduction-to-clickhouse-at-scale-3nmm</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;In many cases we prefer to scale our services, we always prefer scale out against scale up because of the lower cost of scaling out.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Scaling out = adding more components in parallel to spread out a load. Scaling up = making a component bigger or faster so that it can handle more load. &lt;a href="https://packetpushers.net/scale-up-vs-scale-out/" rel="noopener noreferrer"&gt;Source&lt;/a&gt;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;In Clickhouse terminology, Scale out is equal to sharding.  And we use replicas to ensure availability. In this article, we will learn about  Cluster, Replicas, and Sharding in Clickhouse.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Replication used for data integrity and automatic failover. Sharding is used for horizontal scaling of the cluster. From Reference 1&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Sharding
&lt;/h2&gt;

&lt;p&gt;Suppose we have a table on a Clickhouse node (yellow table in &lt;code&gt;host1&lt;/code&gt;). After a while data become bigger and the request rate increased. Now we decide to scale the node, here &lt;code&gt;host2&lt;/code&gt; comes to the scene. We apply sharding to the table and send a second shard on &lt;code&gt;host2&lt;/code&gt;. &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;How to make a select query to the table?&lt;/strong&gt;

&lt;ol&gt;
&lt;li&gt;We can directly query a table in each host. In this case, we should know what data exist in each shard.&lt;/li&gt;
&lt;li&gt;We can create a &lt;code&gt;distributed table&lt;/code&gt;. It can create on each node, when we query to &lt;code&gt;distributed table&lt;/code&gt; it ingests data from the proper shard. It doesn't store data but has metadata about data in each shard.&lt;/li&gt;
&lt;/ol&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm636jg87a8ujly0a1uy9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fm636jg87a8ujly0a1uy9.png" alt="Clickhouse Sharding overview"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;How to insert data into the table?&lt;/strong&gt;

&lt;ol&gt;
&lt;li&gt;We can directly insert our data into the shards if we have a predefined schema or something same.&lt;/li&gt;
&lt;li&gt;Also we cant insert data into the &lt;code&gt;distributed table&lt;/code&gt; and it inserts data regarding the defined &lt;code&gt;sharding key&lt;/code&gt;. You can read more about sharding key in &lt;a href="https://clickhouse.com/docs/en/engines/table-engines/special/distributed#distributed-writing-data" rel="noopener noreferrer"&gt;Clickhouse Doc&lt;/a&gt;
&lt;/li&gt;
&lt;/ol&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Replication
&lt;/h2&gt;

&lt;p&gt;Suppose we run an application depending on Clickhouse or we need online analytics which uses Clickhouse. What happens if one node becomes down or a hardware issue rises? Certainly, we don't want that, so we need replication for each table. &lt;br&gt;
We can only have replicated tables for *MergeTree* tables. &lt;br&gt;
Clickhouse Keeper is who manages things about replication in Clickhouse. It is compatible with Zookeeper and it's a kind of alternative for Zookeeper which Clickhouse presents.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr7whbeonxuro6s21j20e.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr7whbeonxuro6s21j20e.png" alt="Clickhouse cluster"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Cluster
&lt;/h2&gt;

&lt;p&gt;A cluster is a way to manage sharding and replication between some nodes. We can have many cluster topologies for the same nodes. Every time we add a table to one cluster, It shards and replicates in a way defined in that cluster.&lt;/p&gt;

&lt;h2&gt;
  
  
  How to apply Sharding/Replication/Clustering in Clickhouse?
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Set up Clickhouse Cluster
&lt;/h3&gt;

&lt;p&gt;In the below picture, we have 4 nodes. We define a cluster named &lt;code&gt;cluster1&lt;/code&gt; (in the Clickhouse config file). Each table associated with this cluster will have 2 replicas and 2 shards (But they should be Replicated* tables)&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fi0pyj2z547ifbc9tizc0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fi0pyj2z547ifbc9tizc0.png" alt="Clickhouse cluster config"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Set up Clickhouse Keeper
&lt;/h3&gt;

&lt;p&gt;See &lt;a href="https://youtu.be/vBjCJtw_Ei0?t=600" rel="noopener noreferrer"&gt;This part&lt;/a&gt; of Reference 1&lt;/p&gt;

&lt;h3&gt;
  
  
  Create tables
&lt;/h3&gt;

&lt;p&gt;Every time we want to create a table with a &lt;code&gt;cluster1&lt;/code&gt; topology, we should use the &lt;code&gt;ON CLUSTER&lt;/code&gt; statement. For example, if we want to create the table:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6hkhou04hb2pkkx3ozm2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6hkhou04hb2pkkx3ozm2.png" alt="Clickhouse clustered table"&gt;&lt;/a&gt;&lt;br&gt;
The first parameter of the &lt;code&gt;ReplicatedMergeTree&lt;/code&gt; engine is a Clickhouse keeper path of the table and the second one is a replica name. &lt;strong&gt;Tables with the same path and different replica names are replicated&lt;/strong&gt; (Clickhouse keeper does these things).&lt;br&gt;
We should define &lt;code&gt;{shard}&lt;/code&gt; and &lt;code&gt;{replica}&lt;/code&gt; as &lt;a href="https://clickhouse.com/docs/en/operations/server-configuration-parameters/settings#macros" rel="noopener noreferrer"&gt;&lt;code&gt;macros&lt;/code&gt;&lt;/a&gt; in each node.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fz6vnipwybqxdcb3nt5c7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fz6vnipwybqxdcb3nt5c7.png" alt="Clickhouse macros"&gt;&lt;/a&gt;&lt;br&gt;
Once &lt;code&gt;mydb.my_table&lt;/code&gt; is created in each of &lt;code&gt;host1&lt;/code&gt;, &lt;code&gt;host2&lt;/code&gt;, &lt;code&gt;host3&lt;/code&gt;, or &lt;code&gt;host4&lt;/code&gt; it will create in all other nodes.&lt;br&gt;
And finally, we will create a &lt;code&gt;distributed table&lt;/code&gt; to better query on &lt;code&gt;my_db.my_table&lt;/code&gt; which has 2 shards.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fk71pmyzf7wo6pv3y7rmh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fk71pmyzf7wo6pv3y7rmh.png" alt="Clickhouse distributed table"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;Great video by Clickhouse in &lt;a href="https://www.youtube.com/watch?v=vBjCJtw_Ei0&amp;amp;t=1s" rel="noopener noreferrer"&gt;youtube&lt;/a&gt;
&lt;/li&gt;
&lt;/ol&gt;

</description>
      <category>dataenginnering</category>
      <category>clickhouse</category>
      <category>beginners</category>
    </item>
    <item>
      <title>Apply CDC From MySQL To Clickhouse on local environment</title>
      <dc:creator>Mohammad Arab Anvari</dc:creator>
      <pubDate>Mon, 15 May 2023 06:20:03 +0000</pubDate>
      <link>https://dev.to/anvaari/apply-cdc-from-mysql-to-clickhouse-on-local-environment-4e3h</link>
      <guid>https://dev.to/anvaari/apply-cdc-from-mysql-to-clickhouse-on-local-environment-4e3h</guid>
      <description>&lt;p&gt;The aim of this tutorial is to capture every change (delete, insert, and update) from the Mysql table and sync it with Clickhouse.&lt;/p&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Mysql&lt;/li&gt;
&lt;li&gt;Zookeeper&lt;/li&gt;
&lt;li&gt;Kafka&lt;/li&gt;
&lt;li&gt;Kafka-Connect&lt;/li&gt;
&lt;li&gt;Clickhouse&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We can set up all of these services with a simple docker-compose file(&lt;a href="https://github.com/debezium/debezium-examples/blob/main/tutorial/docker-compose-mysql.yaml" rel="noopener noreferrer"&gt;Source&lt;/a&gt;).&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;

&lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;2'&lt;/span&gt;
&lt;span class="na"&gt;services&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="na"&gt;  zookeeper&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="na"&gt;    image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}&lt;/span&gt;
&lt;span class="na"&gt;    ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="s"&gt;     - 2181:2181&lt;/span&gt;
&lt;span class="s"&gt;     - 2888:2888&lt;/span&gt;
&lt;span class="s"&gt;     - 3888:3888&lt;/span&gt;
&lt;span class="na"&gt;  kafka&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="na"&gt;    image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;quay.io/debezium/kafka:${DEBEZIUM_VERSION}&lt;/span&gt;
&lt;span class="na"&gt;    ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="s"&gt;     - 9092:9092&lt;/span&gt;
&lt;span class="na"&gt;    links&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="s"&gt;     - zookeeper&lt;/span&gt;
&lt;span class="na"&gt;    environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="s"&gt;     - ZOOKEEPER_CONNECT=zookeeper:2181&lt;/span&gt;
&lt;span class="na"&gt;  mysql&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="na"&gt;    image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}&lt;/span&gt;
&lt;span class="na"&gt;    ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="s"&gt;     - 3306:3306&lt;/span&gt;
&lt;span class="na"&gt;    environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="s"&gt;     - MYSQL_ROOT_PASSWORD=debezium&lt;/span&gt;
&lt;span class="s"&gt;     - MYSQL_USER=mysqluser&lt;/span&gt;
&lt;span class="s"&gt;     - MYSQL_PASSWORD=mysqlpw&lt;/span&gt;
&lt;span class="na"&gt;  connect&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="na"&gt;    image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;quay.io/debezium/connect:${DEBEZIUM_VERSION}&lt;/span&gt;
&lt;span class="na"&gt;    ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="s"&gt;     - 8083:8083&lt;/span&gt;
&lt;span class="na"&gt;    links&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="s"&gt;     - kafka&lt;/span&gt;
&lt;span class="s"&gt;     - mysql&lt;/span&gt;
&lt;span class="na"&gt;    environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="s"&gt;     - BOOTSTRAP_SERVERS=kafka:9092&lt;/span&gt;
&lt;span class="s"&gt;     - GROUP_ID=1&lt;/span&gt;
&lt;span class="s"&gt;     - CONFIG_STORAGE_TOPIC=my_connect_configs&lt;/span&gt;
&lt;span class="s"&gt;     - OFFSET_STORAGE_TOPIC=my_connect_offsets&lt;/span&gt;
&lt;span class="s"&gt;     - STATUS_STORAGE_TOPIC=my_connect_statuse&lt;/span&gt;
&lt;span class="na"&gt;  clickhouse&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="na"&gt;    image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;clickhouse/clickhouse-server:23.2.4.12&lt;/span&gt;
&lt;span class="na"&gt;    links&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="s"&gt;     - kafka&lt;/span&gt;
&lt;span class="na"&gt;    ulimits&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="na"&gt;      nofile&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="na"&gt;        soft&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;262144&lt;/span&gt;
&lt;span class="na"&gt;        hard&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;262144&lt;/span&gt;
&lt;span class="na"&gt;    ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
&lt;span class="s"&gt;      - 8123:8123&lt;/span&gt;
&lt;span class="s"&gt;      - 9000:9000&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;You can read more about the options of every service in &lt;a href="https://debezium.io/documentation/reference/stable/tutorial.html" rel="noopener noreferrer"&gt;this tutorial&lt;/a&gt;.&lt;br&gt;
After saving the yaml file as &lt;code&gt;docker-compose.yml&lt;/code&gt; :&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;DEBEZIUM_VERSION&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;2.2
docker compose up


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Now we have a &lt;code&gt;Mysql&lt;/code&gt; container which contains a simple database named &lt;code&gt;inventory&lt;/code&gt;, a &lt;code&gt;Kafka&lt;/code&gt; container, and &lt;code&gt;Zookeeper&lt;/code&gt; which manages a &lt;code&gt;Kafka&lt;/code&gt; cluster,   &lt;code&gt;connect&lt;/code&gt; instance which adds abilities of Kafka-Connectors to Kafka and also a &lt;code&gt;Clickhouse&lt;/code&gt; instance. Now we have all perquisites.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb0u5ra85sx9aum970qdd.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb0u5ra85sx9aum970qdd.jpeg" alt="debezium, clickhouse to mysql architecture"&gt;&lt;/a&gt;&lt;br&gt;
&lt;a href="https://medium.com/@hoptical/apply-cdc-from-mysql-to-clickhouse-d660873311c7" rel="noopener noreferrer"&gt;Image Source&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Deploy Debezium connector
&lt;/h2&gt;

&lt;p&gt;We can interact with &lt;code&gt;Kafka-Connect&lt;/code&gt; with &lt;a href="https://docs.confluent.io/platform/current/connect/references/restapi.html#connectors" rel="noopener noreferrer"&gt;Rest API&lt;/a&gt;. &lt;br&gt;
Base request : &lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

curl &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;Request_Type&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; localhost:8083/connectors/ 


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;See current connectors :&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

curl &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; GET &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; localhost:8083/connectors/


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Delete {my-conn} connector:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

curl &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; DELETE &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; localhost:8083/connectors/&lt;span class="o"&gt;{&lt;/span&gt;my-conn&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Add connector:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

curl &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; POST &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; localhost:8083/connectors/ &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{connector-config-as-json}'&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h3&gt;
  
  
  Config for MySQL Connector
&lt;/h3&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="w"&gt;

&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"mysql-connector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"config"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"tasks.max"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"connector.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.debezium.connector.mysql.MySqlConnector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"database.hostname"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"mysql"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"database.port"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"3306"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"database.user"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"root"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"database.password"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"debezium"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"database.include.list"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"inventory"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"table.include.list"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"inventory.orders"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"database.server.id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"1"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"message.key.columns"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"inventory.orders:order_number"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"schema.history.internal.kafka.bootstrap.servers"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"kafka:9092"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"schema.history.internal.kafka.topic"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"dbz.inventory.history"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"snapshot.mode"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"schema_only"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"topic.prefix"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"dbz.inventory.v2"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"transforms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"unwrap"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"transforms.unwrap.delete.handling.mode"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"rewrite"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"transforms.unwrap.type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.debezium.transforms.ExtractNewRecordState"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt; &lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;


&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;name&lt;/code&gt;: The name of the connector.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;config&lt;/code&gt;: The connector’s configuration.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;tasks.max&lt;/code&gt;: Only one task should operate at any one time. Because the MySQL connector reads the MySQL server’s &lt;code&gt;binlog&lt;/code&gt;, using a single connector task ensures proper order and event handling. The Kafka Connect service uses connectors to start one or more tasks that do the work, and it automatically distributes the running tasks across the cluster of Kafka Connect services. If any of the services stop or crash, those tasks will be redistributed to running services.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;connector.class&lt;/code&gt;: Type of connector, On of &lt;a href="https://debezium.io/documentation/reference/stable/connectors/index.html" rel="noopener noreferrer"&gt;These&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;database.hostname&lt;/code&gt;: The database host, which is the name of the Docker container running the MySQL server (&lt;code&gt;mysql&lt;/code&gt;). Docker manipulates the network stack within the containers so that each linked container can be resolved with /etc/hosts using the container name for the hostname. If MySQL were running on a normal network, you would specify the IP address or resolvable hostname for this value.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;database.user&lt;/code&gt; &amp;amp; &lt;code&gt;database.password&lt;/code&gt;: Username and password of mysql user with &lt;a href="https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-creating-user" rel="noopener noreferrer"&gt;these&lt;/a&gt; privileges. For this example, I use the root user and pass.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;database.include.list&lt;/code&gt;: Only changes in the inventory database will be detected.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;topic.prefix&lt;/code&gt;: A unique topic prefix. This name will be used as the prefix for all Kafka topics.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;schema.history.internal.kafka.bootstrap.servers&lt;/code&gt; &amp;amp; &lt;code&gt;schema.history.internal.kafka.topic&lt;/code&gt;: The connector will store the history of the database schemas in Kafka using this broker (the same broker to which you are sending events) and topic name. Upon restart, the connector will recover the schemas of the database that existed at the point in time in the &lt;code&gt;binlog&lt;/code&gt; when the connector should begin reading.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;transforms*&lt;/code&gt;: These transformations are needed to insert data in Clickhouse. More explanation &lt;a href="https://medium.com/@hoptical/apply-cdc-from-mysql-to-clickhouse-d660873311c7" rel="noopener noreferrer"&gt;here&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Full reference of configs for MySQL connector can be found &lt;a href="https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Consume Messages From Kafka
&lt;/h3&gt;

&lt;p&gt;We wanna see a list of topics in our Kafka broker. First, we should access bash inside the Kafka container : &lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;kafka-container-name&lt;span class="o"&gt;}&lt;/span&gt; /bin/bash


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Then:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

/kafka/bin/kafka-topics.sh &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; kafka:9092 &lt;span class="nt"&gt;--list&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Note that the topic corresponding to our &lt;code&gt;orders&lt;/code&gt; table in MySQL has such format: &lt;code&gt;{topic.prefix}.{database_name}.{table_name}&lt;/code&gt;. In this example, it turns to &lt;code&gt;dbz.inventory.v2.inventory.orders&lt;/code&gt;&lt;br&gt;
To consume all messages from a topic:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

 /kafka/bin/kafka-console-consumer.sh &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; kafka:9092 &lt;span class="nt"&gt;--topic&lt;/span&gt; dbz.inventory.v2.inventory.orders &lt;span class="nt"&gt;--from-beginning&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  Set Up Clickhouse Tables
&lt;/h2&gt;

&lt;p&gt;As mentioned in &lt;a href="https://clickhouse.com/docs/en/integrations/kafka#using-the-kafka-table-engine" rel="noopener noreferrer"&gt;this article&lt;/a&gt; in Clickhouse doc, we need 3 tables:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;A Table witch Kafka engine&lt;/li&gt;
&lt;li&gt;A Materialized View table&lt;/li&gt;
&lt;li&gt;A MergeTree table&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  Kafka Engine Table
&lt;/h3&gt;

&lt;p&gt;As mentioned in &lt;a href="https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka#table_engine-kafka-creating-a-table" rel="noopener noreferrer"&gt;the doc&lt;/a&gt; we should specify the format of message arriving from Kafka topic (one of &lt;a href="https://clickhouse.com/docs/en/interfaces/formats" rel="noopener noreferrer"&gt;these&lt;/a&gt;), We can use [[Kafka Schema Registry]] but here we wanna parse Json directly, So with help of solution provided in &lt;a href="https://yuhui-lin.github.io/post/2021-06-01_clickhouse-json/" rel="noopener noreferrer"&gt;this post&lt;/a&gt; we get message as &lt;code&gt;JSONString&lt;/code&gt; format and then parse it using Mat. View. &lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="nv"&gt;`default`&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka_orders&lt;/span&gt;
&lt;span class="p"&gt;(&lt;/span&gt;
&lt;span class="nv"&gt;`msg_json_str`&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;Kafka&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'kafka:9092'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'dbz.inventory.v2.inventory.orders'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'clickhouse'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'JSONAsString'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;a href="https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka" rel="noopener noreferrer"&gt;Full doc&lt;/a&gt; of Kafka engine in Clickhouse.&lt;/p&gt;

&lt;h3&gt;
  
  
  MergeTree Table
&lt;/h3&gt;

&lt;p&gt;As mentioned at the first of this article we wanna capture delete and update so we use &lt;code&gt;ReplacingMergeTree&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stream_orders&lt;/span&gt;
&lt;span class="p"&gt;(&lt;/span&gt;
&lt;span class="nv"&gt;`order_number`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`order_date`&lt;/span&gt; &lt;span class="nb"&gt;DATE&lt;/span&gt; &lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`purchaser`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt; &lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`quantity`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`product_id`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`__deleted`&lt;/span&gt; &lt;span class="k"&gt;Nullable&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;ENGINE&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ReplacingMergeTree&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order_number&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;SETTINGS&lt;/span&gt; &lt;span class="n"&gt;index_granularity&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;8192&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h3&gt;
  
  
  Mat. View
&lt;/h3&gt;

&lt;p&gt;We parse Json using &lt;a href="https://clickhouse.com/docs/en/sql-reference/functions/json-functions" rel="noopener noreferrer"&gt;JSONExtract functions&lt;/a&gt; in Clickhouse.&lt;br&gt;
We should consider that Debezium treats &lt;code&gt;DATE&lt;/code&gt; data type as a number of days since the &lt;code&gt;1970-01-01&lt;/code&gt; &lt;a href="https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types" rel="noopener noreferrer"&gt;Source&lt;/a&gt;. It's the cause of using &lt;code&gt;toDate&lt;/code&gt; with combination of &lt;code&gt;JSONExtractInt&lt;/code&gt;.&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;MATERIALIZED&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer__orders&lt;/span&gt; &lt;span class="k"&gt;TO&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stream_orders&lt;/span&gt;
&lt;span class="p"&gt;(&lt;/span&gt;
&lt;span class="nv"&gt;`order_number`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`order_date`&lt;/span&gt; &lt;span class="nb"&gt;DATE&lt;/span&gt; &lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`purchaser`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt; &lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`quantity`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`product_id`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`__deleted`&lt;/span&gt; &lt;span class="k"&gt;Nullable&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
&lt;span class="n"&gt;JSONExtractInt&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg_json_str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'payload'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'order_number'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;order_number&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;toDate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'1970-01-01'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="n"&gt;JSONExtractInt&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg_json_str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'payload'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'order_date'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;order_date&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;JSONExtractInt&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg_json_str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'payload'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'purchaser'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;purchaser&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;JSONExtractInt&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg_json_str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'payload'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'quantity'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;quantity&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;JSONExtractInt&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg_json_str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'payload'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'product_id'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;JSONExtractString&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg_json_str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'payload'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="s1"&gt;'__deleted'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;__deleted&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;kafka_orders&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h3&gt;
  
  
  A View (Optional)
&lt;/h3&gt;

&lt;p&gt;Clickhouse will merge &lt;code&gt;consumer__orders&lt;/code&gt; table in an irregular schedule so we can't see the latest version of data at all times. But we can use view to obtain this goal:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
&lt;span class="nv"&gt;`order_number`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`order_date_`&lt;/span&gt; &lt;span class="nb"&gt;DATE&lt;/span&gt; &lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`purchaser`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt; &lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`quantity`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="nv"&gt;`product_id`&lt;/span&gt; &lt;span class="n"&gt;Int16&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
&lt;span class="n"&gt;order_number&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="k"&gt;max&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order_date&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;order_date_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;argMax&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;purchaser&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;order_date&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;purchaser&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;argMax&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;quantity&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;order_date&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;quantity&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;argMax&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;order_date&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;product_id&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stream_orders&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="nv"&gt;`__deleted`&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'false'&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;order_number&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;We can also use &lt;code&gt;FINAL&lt;/code&gt; modified instead of &lt;code&gt;GROUP BY&lt;/code&gt; but it's not recommended in a production environment.&lt;/p&gt;

&lt;h2&gt;
  
  
  Troubleshooting
&lt;/h2&gt;

&lt;p&gt;In case of any error or even lack of data in tables, we should check Clickhouse server logs located in &lt;code&gt;/var/log/clickhouse-server/clickhouse-server.err.log&lt;/code&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://debezium.io/documentation/reference/stable/tutorial.html" rel="noopener noreferrer"&gt;Debezium Official Tutorial&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://medium.com/@hoptical/apply-cdc-from-mysql-to-clickhouse-d660873311c7" rel="noopener noreferrer"&gt;Blog Post By Hamed Karbasi&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>dataengineering</category>
      <category>cdc</category>
      <category>debezium</category>
      <category>tutorial</category>
    </item>
  </channel>
</rss>
