ClickHouse is an open-source column-oriented DBMS developed by Yandex, a Russian IT company. It's good for Big Data, business analytics and time series data. In this article I'll show you how to run ClickHouse in cluster mode.
For this tutorial we'll need the official docker image for ClickHouse. Of course, Docker and docker-compose must be installed.
Any ClickHouse cluster consists of shards. Shard is a group of nodes storing the same data. If you have a table, its data will be distributed across shards.
Shards consist of replicas. Each replica of the shard stores the same data. All of them are synchronized with each other, so if one replica receives data, it broadcasts the data to the others.
Basically, we need database sharding for load distribution and database replication for fault tolerance.
To query sharded and replicated data tables, we need to use master nodes which form the cluster from subordinate nodes. Master nodes have a couple of distinctive features:
they store cluster configuration;
they have tables on
Distributedengine which don't store any rows of data and are intended for querying tables on subordinate nodes and gathering data across the cluster.
Well, now onto node configuration. Each node in the cluster should have these fields initialized:
tcp_port- port for the clients to dial and establish a new connection (default value is 9000);
http_port- port for communicating with the clients using HTTP (default value is 8123);
listen_host- IP address to listen on (there can be many of them specified in the configuration file);
interserver_http_port- port for replicas to communicate with each other in the cluster.
Also you can assign values to the parameters such as
keep_alive_timeout at your own discretion.
And it's better to leave the default values for these parameters:
logger/log- path to the log file, default value is
logger/errorlog- path to the error log file, default value is
path- path to the data directory, default value is
tmp_path- path to the temporary files directory, default value is
users_config- path to the file with users, their access rights and settings profiles, default value is
default_profile- profile with default settings, default value is
default_database- the database the client will be connected at first after dialing, default value is
Of course, master nodes are special and have to store the cluster configuration. Here's the configuration for a cluster with 3 shards and only 1 replica for each shard:
<remote_servers> <example_cluster> <shard> <replica> <host>ch-sub-1</host> <port>9000</port> </replica> </shard> <shard> <replica> <host>ch-sub-2</host> <port>9000</port> </replica> </shard> <shard> <replica> <host>ch-sub-3</host> <port>9000</port> </replica> </shard> </example_cluster> </remote_servers>
host field, an actual domain name is specified.
example_cluster is the name of the cluster. You can specify as many clusters as you want in the
Now we are ready to launch the system. I will do it using docker-compose:
version: "3.7" services: ch-master: container_name: ch_master image: yandex/clickhouse-server:126.96.36.199 depends_on: - ch-sub-1 - ch-sub-2 - ch-sub-3 ports: - 9000:9000 - 8123:8123 volumes: - type: volume source: ch-master-data target: /var/lib/clickhouse - type: volume source: ch-master-logs target: /var/log/clickhouse-server - ./master-config.xml:/etc/clickhouse-server/config.xml ch-sub-1: container_name: ch_sub_1 image: yandex/clickhouse-server:188.8.131.52 ports: - 9001:9000 - 8124:8123 volumes: - type: volume source: ch-sub-1-data target: /var/lib/clickhouse - type: volume source: ch-sub-1-logs target: /var/log/clickhouse-server - ./sub-config.xml:/etc/clickhouse-server/config.xml ch-sub-2: container_name: ch_sub_2 image: yandex/clickhouse-server:184.108.40.206 ports: - 9002:9000 - 8125:8123 volumes: - type: volume source: ch-sub-2-data target: /var/lib/clickhouse - type: volume source: ch-sub-2-logs target: /var/log/clickhouse-server - ./sub-config.xml:/etc/clickhouse-server/config.xml ch-sub-3: container_name: ch_sub_3 image: yandex/clickhouse-server:220.127.116.11 ports: - 9003:9000 - 8126:8123 volumes: - type: volume source: ch-sub-3-data target: /var/lib/clickhouse - type: volume source: ch-sub-3-logs target: /var/log/clickhouse-server - ./sub-config.xml:/etc/clickhouse-server/config.xml volumes: ch-master-data: ch-master-logs: ch-sub-1-data: ch-sub-1-logs: ch-sub-2-data: ch-sub-2-logs: ch-sub-3-data: ch-sub-3-logs:
As you can see, we created volumes for each node to store the node's data and logs. Also we mounted the node configs. In the current example, we don't need a separate configuration file for each subordinate node, so all of them share the same configuration.
After everything is up and running, it's time to create data tables. For this task I will use Python programming language and clickhouse-driver library. Now onto the first script,
from clickhouse_driver import Client subs = [ ("127.0.0.1", "9001"), ("127.0.0.1", "9002"), ("127.0.0.1", "9003") ] master = ("127.0.0.1", "9000") if __name__ == "__main__": for sub in subs: client = Client(sub, port=sub) client.execute("CREATE DATABASE IF NOT EXISTS db") client.execute('''CREATE TABLE IF NOT EXISTS db.entries( timestamp DateTime, parameter String, value Float64) ENGINE = MergeTree() PARTITION BY parameter ORDER BY (timestamp, parameter)''') client = Client(master, port=master) client.execute("CREATE DATABASE IF NOT EXISTS db") client.execute('''CREATE TABLE IF NOT EXISTS db.entries( timestamp DateTime, parameter String, value Float64) ENGINE = Distributed(example_cluster, db, entries, rand())''')
At first we create a data table on each subordinate node:
CREATE TABLE IF NOT EXISTS db.entries( timestamp DateTime, parameter String, value Float64) ENGINE = MergeTree() PARTITION BY parameter ORDER BY (timestamp, parameter)
Ordering increases performance of SELECT and INSERT queries, whereas partitioning is intended for data manipulations (for example,
DROP PARTITION part_name). Keep in mind that the limit of partitions per insert block is controlled by
max_partitions_per_insert_block configuration parameter. Recommended total number of partitions per table is 1000..10000. If this threshold is exceeded, it will lead to slow server startup and poor SELECT and INSERT performance.
Then we create a
Distributed table on the master node:
CREATE TABLE IF NOT EXISTS db.entries( timestamp DateTime, parameter String, value Float64) ENGINE = Distributed(example_cluster, db, entries, rand())
The first engine parameter is the cluster name, then goes the name of the database, the table name and a sharding key. The sharding key is an expression whose result is used to decide which shard stores the data row depending on the values of the columns. If you specify
rand(), the row goes to the random shard. Sharding key is only applicable if you do INSERT operations on the master table (note that the master table itself doesn't store any data, it only aggregates the data from the shards during queries). But we can perform INSERT operations directly on the subordinate nodes:
from clickhouse_driver import Client from datetime import datetime client = Client("127.0.0.1", port="9002") client.execute("INSERT INTO db.entries (timestamp, parameter, value) VALUES", \ [(datetime.utcnow(), "elasticity", 38.9), (datetime.utcnow(), "gravity", 27.2), \ (datetime.utcnow(), "density", 19.8)])
You can insert any data you want to any node. Now try to connect to the master node via ClickHouse client:
docker run --network="clustering_default" -it --rm --link ch_master:clickhouse-server yandex/clickhouse-client:18.104.22.168 --host clickhouse-server
When you are in, try to execute the next set of SQL instructions:
USE db SELECT * FROM entries
If everything has been set up properly, you'll see all the data you sent to each shard. Why not to try everything yourself? Just look at the project source.
Also you are welcome to take a look at part 2.