Demo code for this post can be found here.
ClickHouse is a column-oriented database with SQL-like syntax. It is designed to provide high performance for online analytical queries which would be very costly for relational databases. ClickHouse can be a great tool for purposes like calculating statistical indicators for your data or calculating features for ML models. And since Python is one of the main languages for Data Science and Data Analysis, it is not uncommon to see Python applications with ClickHouse as analytical database.
Example application
Suppose you have a backend application in Python which uses ClickHouse as OLAP database for calculating statistics on users. In this application, you have a ClickHouse client and you want to write integrations tests for it to make sure that your client is working properly with ClickHouse server.
ClickHouse server
First, you need to setup ClickHouse on your local machine. You can use Docker for this. All you need is a simple docker-compose
file with a single clickhouse
service inside:
version: '3'
services:
clickhouse:
platform: linux/x86_64
image: yandex/clickhouse-server:latest
container_name: clickhouse
restart: always
ports:
- "8123:8123"
- "9000:9000"
- "9009:9009"
volumes:
- ./temp-data/clickhouse:/var/lib/clickhouse
ulimits:
nofile:
soft: 262144
hard: 262144
Once you have this file, starting ClickHouse locally is easy:
docker-compose -f ./docker-compose.yml
If you don't have docker-compose
installed, you can read how to install it here.
pytest
We'll be using pytest
for running our integration tests. pytest
is a great tool for running tests in Python applications! If you don't know about pytest
, you can learn about it on pytest docs.
Setting up tests
Before running tests, we need to setup database and create all the tables we need for our tests.
pytest
have special files called conftest.py
used for setting up tests and creating fixtures. First, we create a client which will be used in our tests.
# conftest.py
import pytest
from src.clickhouse import ClickHouseClient, ClickHouseConfig
DEFAULT_DB = "default"
TEST_DB = "test_database"
CREATE_DB_QUERY = f"create database if not exists {TEST_DB};"
DROP_TABLE_QUERIES = [
f"drop table if exists {TEST_DB}.users;",
]
@pytest.fixture(scope="session")
def clickhouse_client():
client = ClickHouseClient(ClickHouseConfig(database=DEFAULT_DB))
client.execute_query(CREATE_DB_QUERY)
client = ClickHouseClient(ClickHouseConfig(database=TEST_DB))
for drop_table in DROP_TABLE_QUERIES:
client.execute_query(drop_table)
return client
In clickhouse_client
fixture we first create a client with connections to default
database. This database is automatically created by ClickHouse server on startup and so we can be sure that this database exists and we won't get exception when trying to connect to this database.
After initializing the client, we do the following:
- Create a database for our tests (
"test_database"
) if it does not exist - Create a new client with test database
- Drop all the test tables in case they already exist in the test database
Then we need to run migrations to create all the tables we will need in the integration tests.
# conftest.py
# ...
@pytest.fixture(scope="session")
def clickhouse_client():
...
MIGRATION_FILES = [
"migrations/0001_create_users_table.sql",
]
@pytest.fixture(scope="session")
def clickhouse_migrations(clickhouse_client):
migrations = []
for fn in MIGRATION_FILES:
with open(fn, "r") as f:
migrations.append(f.read())
for m in migrations:
clickhouse_client.execute_query(m)
return
0001_create_users_table.sql
can have something like this inside
create table if not exists users
(
`id` Int64,
`name` String,
`email` String,
`phone` FixedString(25),
`created` DateTime,
`modified` DateTime
)
engine = ReplacingMergeTree(modified)
order by id;
The migrations are stored as SQL-scripts in a separate directory called migrations
. This is useful, since you may need to use these migrations not only when running tests locally but also to setup ClickHouse tables in other environments, like CI/CD pipelines for example.
Simple healthcheck test
The simpliest integration test we can write is just to execute a simple SELECT query which doesn't use any tables.
# test_healthcheck.py
HEALTHCHECK_QUERY = "SELECT 1"
def test_healthcheck(clickhouse_client):
result = clickhouse_client.execute_query(HEALTHCHECK_QUERY)
assert result
assert result[0][0] == 1
To run tests with pytest
use this command
pytest .
After this simple healthcheck test passes, we can write more complex tests.
INSERT query test
One way to test if our ClickHouse client correctly issues INSERT queries to ClickHouse is:
- Generate some test data
- Insert this data using our client
- Check that number of rows in the ClickHouse table matches the number of data entries we generated
# test_insert.py
# this are some helper functions to generate test data
# these functions are defined in a separate module to be re-used in other tests
# in the demo repository, these helper functions are implemented using Faker (https://faker.readthedocs.io/)
from tests.utils import (
fake_int_id,
fake_name,
fake_email,
fake_phone,
fake_datetime,
)
INSERT_USERS_QUERY = """
insert into users
(
id,
name,
email,
phone,
created,
modified
)
VALUES
"""
BATCH_SIZE = 1010
COUNT_USERS_QUERY = "select count(1) from users final"
# helper function to generate test data for a single user
def get_random_user(user_id: int = None) -> dict:
if not user_id:
user_id = fake_int_id()
name = fake_name()
return {
"id": user_id,
"name": name,
"email": fake_email(name),
"phone": fake_phone(),
"created": fake_datetime(),
"modified": fake_datetime(),
}
def test_insert(clickhouse_client, clickhouse_migrations):
# all users in the batch need to be generated with unique ids
# in this case we just use sequential ids starting from 0
user_batch = [get_random_user(i + 1) for i in range(BATCH_SIZE)]
clickhouse_client.execute_query(INSERT_USERS_QUERY, user_batch)
res = clickhouse_client.execute_query(COUNT_USERS_QUERY)
# check that number of rows in users table is the same as number of users in user_batch that we have inserted
assert res[0][0] == BATCH_SIZE
Next steps
After you get these simple tests working, you have a working setup for expanding your integration test suite:
- Writing tests for SELECT queries
- Writing test for complex analytics SELECT queries (if your application has this type of queries)
- Extending the list of tables in integration tests
- Simulating data loss and checking such cases are handled correctly
- Other things that make sense for your application
Top comments (0)