DEV Community

Cover image for Creating a ClickHouse cluster - Part I: Sharding
NightGhost
NightGhost

Posted on • Edited on

Creating a ClickHouse cluster - Part I: Sharding

ClickHouse is an open-source column-oriented DBMS developed by Yandex, a Russian IT company. It's good for Big Data, business analytics and time series data. In this article I'll show you how to run ClickHouse in cluster mode.

Prerequisites

For this tutorial we'll need the official docker image for ClickHouse. Of course, Docker and docker-compose must be installed.

Cluster architecture

Any ClickHouse cluster consists of shards. Shard is a group of nodes storing the same data. If you have a table, its data will be distributed across shards.

Shards consist of replicas. Each replica of the shard stores the same data. All of them are synchronized with each other, so if one replica receives data, it broadcasts the data to the others.

Basically, we need database sharding for load distribution and database replication for fault tolerance.

To query sharded and replicated data tables, we need to use master nodes which form the cluster from subordinate nodes. Master nodes have a couple of distinctive features:

  • they store cluster configuration;

  • they have tables on Distributed engine which don't store any rows of data and are intended for querying tables on subordinate nodes and gathering data across the cluster.

Cluster configuration

Well, now onto node configuration. Each node in the cluster should have these fields initialized:

  • tcp_port - port for the clients to dial and establish a new connection (default value is 9000);

  • http_port - port for communicating with the clients using HTTP (default value is 8123);

  • listen_host - IP address to listen on (there can be many of them specified in the configuration file);

  • interserver_http_port - port for replicas to communicate with each other in the cluster.

Also you can assign values to the parameters such as max_concurrent_queries, max_connections and keep_alive_timeout at your own discretion.

And it's better to leave the default values for these parameters:

  • logger/log - path to the log file, default value is /var/log/clickhouse-server/clickhouse-server.log;

  • logger/errorlog - path to the error log file, default value is /var/log/clickhouse-server/clickhouse-server.err.log;

  • path - path to the data directory, default value is /var/lib/clickhouse/;

  • tmp_path - path to the temporary files directory, default value is /var/lib/clickhouse/tmp/;

  • users_config - path to the file with users, their access rights and settings profiles, default value is users.xml;

  • default_profile - profile with default settings, default value is default;

  • default_database - the database the client will be connected at first after dialing, default value is default.

Of course, master nodes are special and have to store the cluster configuration. Here's the configuration for a cluster with 3 shards and only 1 replica for each shard:

<remote_servers>
    <example_cluster>
        <shard>
            <replica>
                <host>ch-sub-1</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <replica>
                <host>ch-sub-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <replica>
                <host>ch-sub-3</host>
                <port>9000</port>
            </replica>
        </shard>
    </example_cluster>
</remote_servers>
Enter fullscreen mode Exit fullscreen mode

In the host field, an actual domain name is specified. example_cluster is the name of the cluster. You can specify as many clusters as you want in the remote_servers section.

Cluster deployment

Now we are ready to launch the system. I will do it using docker-compose:

version: "3.7"

services:

  ch-master:
    container_name: ch_master
    image: yandex/clickhouse-server:19.14.13.4
    depends_on:
      - ch-sub-1
      - ch-sub-2
      - ch-sub-3
    ports:
      - 9000:9000
      - 8123:8123
    volumes:
      - type: volume
        source: ch-master-data
        target: /var/lib/clickhouse
      - type: volume
        source: ch-master-logs
        target: /var/log/clickhouse-server
      - ./master-config.xml:/etc/clickhouse-server/config.xml

  ch-sub-1:
    container_name: ch_sub_1
    image: yandex/clickhouse-server:19.14.13.4
    ports:
      - 9001:9000
      - 8124:8123
    volumes:
      - type: volume
        source: ch-sub-1-data
        target: /var/lib/clickhouse
      - type: volume
        source: ch-sub-1-logs
        target: /var/log/clickhouse-server
      - ./sub-config.xml:/etc/clickhouse-server/config.xml

  ch-sub-2:
    container_name: ch_sub_2
    image: yandex/clickhouse-server:19.14.13.4
    ports:
      - 9002:9000
      - 8125:8123
    volumes:
      - type: volume
        source: ch-sub-2-data
        target: /var/lib/clickhouse
      - type: volume
        source: ch-sub-2-logs
        target: /var/log/clickhouse-server
      - ./sub-config.xml:/etc/clickhouse-server/config.xml

  ch-sub-3:
    container_name: ch_sub_3
    image: yandex/clickhouse-server:19.14.13.4
    ports:
      - 9003:9000
      - 8126:8123
    volumes:
      - type: volume
        source: ch-sub-3-data
        target: /var/lib/clickhouse
      - type: volume
        source: ch-sub-3-logs
        target: /var/log/clickhouse-server
      - ./sub-config.xml:/etc/clickhouse-server/config.xml

volumes:
  ch-master-data:
  ch-master-logs:
  ch-sub-1-data:
  ch-sub-1-logs:
  ch-sub-2-data:
  ch-sub-2-logs:
  ch-sub-3-data:
  ch-sub-3-logs:
Enter fullscreen mode Exit fullscreen mode

As you can see, we created volumes for each node to store the node's data and logs. Also we mounted the node configs. In the current example, we don't need a separate configuration file for each subordinate node, so all of them share the same configuration.

Cluster tables

After everything is up and running, it's time to create data tables. For this task I will use Python programming language and clickhouse-driver library. Now onto the first script, create-cluster.py:

from clickhouse_driver import Client

subs = [
    ("127.0.0.1", "9001"),
    ("127.0.0.1", "9002"),
    ("127.0.0.1", "9003")
]
master = ("127.0.0.1", "9000")

if __name__ == "__main__":
    for sub in subs:
        client = Client(sub[0], port=sub[1])

        client.execute("CREATE DATABASE IF NOT EXISTS db")

        client.execute('''CREATE TABLE IF NOT EXISTS db.entries(
                          timestamp DateTime,
                          parameter String,
                          value Float64)
                          ENGINE = MergeTree()
                          PARTITION BY parameter
                          ORDER BY (timestamp, parameter)''')

    client = Client(master[0], port=master[1])

    client.execute("CREATE DATABASE IF NOT EXISTS db")

    client.execute('''CREATE TABLE IF NOT EXISTS db.entries(
                      timestamp DateTime,
                      parameter String,
                      value Float64)
                      ENGINE = Distributed(example_cluster, db, entries, rand())''')
Enter fullscreen mode Exit fullscreen mode

At first we create a data table on each subordinate node:

CREATE TABLE IF NOT EXISTS db.entries(
timestamp DateTime,
parameter String,
value Float64)
ENGINE = MergeTree()
PARTITION BY parameter
ORDER BY (timestamp, parameter)
Enter fullscreen mode Exit fullscreen mode

Ordering increases performance of SELECT and INSERT queries, whereas partitioning is intended for data manipulations (for example, DROP PARTITION part_name). Keep in mind that the limit of partitions per insert block is controlled by max_partitions_per_insert_block configuration parameter. Recommended total number of partitions per table is 1000..10000. If this threshold is exceeded, it will lead to slow server startup and poor SELECT and INSERT performance.

Then we create a Distributed table on the master node:

CREATE TABLE IF NOT EXISTS db.entries(
timestamp DateTime,
parameter String,
value Float64)
ENGINE = Distributed(example_cluster, db, entries, rand())
Enter fullscreen mode Exit fullscreen mode

The first engine parameter is the cluster name, then goes the name of the database, the table name and a sharding key. The sharding key is an expression whose result is used to decide which shard stores the data row depending on the values of the columns. If you specify rand(), the row goes to the random shard. Sharding key is only applicable if you do INSERT operations on the master table (note that the master table itself doesn't store any data, it only aggregates the data from the shards during queries). But we can perform INSERT operations directly on the subordinate nodes:

from clickhouse_driver import Client
from datetime import datetime

client = Client("127.0.0.1", port="9002")

client.execute("INSERT INTO db.entries (timestamp, parameter, value) VALUES", \
    [(datetime.utcnow(), "elasticity", 38.9), (datetime.utcnow(), "gravity", 27.2), \
        (datetime.utcnow(), "density", 19.8)])
Enter fullscreen mode Exit fullscreen mode

You can insert any data you want to any node. Now try to connect to the master node via ClickHouse client:

docker run --network="clustering_default" -it --rm --link ch_master:clickhouse-server yandex/clickhouse-client:19.14.12.2 --host clickhouse-server
Enter fullscreen mode Exit fullscreen mode

When you are in, try to execute the next set of SQL instructions:

USE db
SELECT * FROM entries
Enter fullscreen mode Exit fullscreen mode

If everything has been set up properly, you'll see all the data you sent to each shard. Why not to try everything yourself? Just look at the project source.

Also you are welcome to take a look at part 2.

Top comments (6)

Collapse
 
datasleek profile image
Franck Leveneur

Yeah, been testing Clickhouse it's fast but when you start joining tables with Millions of rows, Clickhouse falls apart (and even runs out of memory).
You should checkout Singlestore (full cluster solution, support memory and column store engine) and lots of analytical features. Not to mention pipelines, which can be used to ingest from S3, Kafka with just few line of SQL.
Planning to post benchmark based on TPCH between Singlestore and Clickhouse.

Collapse
 
ckissi profile image
Csaba Kissi

Great! Will you do also part II?

Collapse
 
zergon321 profile image
NightGhost

Yes, but a bit later. I have to deal with my job and the studies at the university.

Collapse
 
ckissi profile image
Csaba Kissi

Thank you. Looking forward to the next post.

Collapse
 
roadtrain profile image
Pavel Vorozheykin • Edited

Thanks for the article!
I'm a little confused about the notion of master nodes. ClickHouse docs state that its clusters are homogenous, i.e. each node is equivalent in its schema and settings, and can be written to/queried against.
So, in a sense, any node can be master. Or rather there are no master nodes at all.

Collapse
 
zergon321 profile image
NightGhost

Yes, you can place a usual table and a master table for it on the same node. This allows you to create a kind of decentralized cluster in which data from all the nodes can be aggregated through any other node.