<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Max Gurewitz</title>
    <description>The latest articles on DEV Community by Max Gurewitz (@dittofeed-max).</description>
    <link>https://dev.to/dittofeed-max</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1260353%2Fc4d69291-679a-4a1b-b533-bd5107d8a8b8.jpeg</url>
      <title>DEV Community: Max Gurewitz</title>
      <link>https://dev.to/dittofeed-max</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/dittofeed-max"/>
    <language>en</language>
    <item>
      <title>How We Stopped Our ClickHouse DB From Exploding</title>
      <dc:creator>Max Gurewitz</dc:creator>
      <pubDate>Mon, 29 Jan 2024 17:24:23 +0000</pubDate>
      <link>https://dev.to/dittofeed-max/how-we-stopped-our-clickhouse-db-from-exploding-2969</link>
      <guid>https://dev.to/dittofeed-max/how-we-stopped-our-clickhouse-db-from-exploding-2969</guid>
      <description>&lt;h2&gt;
  
  
  Micro-batching in ClickHouse for User Segmentation
&lt;/h2&gt;

&lt;p&gt;I will share some techniques for calculating live user segments while keeping your ClickHouse instance from exploding! User segmentation is key to understanding your business or application’s users’ behaviors. For example, you might want to keep track of which users have gotten stuck in an onboarding flow or which users haven’t logged in within the last 30 days.&lt;/p&gt;

&lt;p&gt;We’re building an application, Dittofeed, an Open-Source customer engagement platform. It’s used to automate email and SMS messaging (among other channels). We use ClickHouse as our primary store of user events and to calculate live user segments.&lt;/p&gt;

&lt;p&gt;We’d love it if you visited our repo and gave us a star!&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/dittofeed/dittofeed"&gt;https://github.com/dittofeed/dittofeed&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We use the techniques discussed below to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Guarantee idempotency when inserting events.&lt;/li&gt;
&lt;li&gt;Improve scalability with micro-batching.&lt;/li&gt;
&lt;li&gt;Handle late and out-of-order events.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  What Is ClickHouse and Why Should You Use It?
&lt;/h3&gt;

&lt;p&gt;ClickHouse is one of the world’s most powerful OLAP databases. It excels in use cases that require efficiently running (relatively) low throughput queries that access many rows. This contrasts with a standard OLTP database you might be more familiar with, like Postgres. In relative terms, OLTP databases like Postgres excel at making frequent queries that access a relatively small number of rows.&lt;/p&gt;

&lt;p&gt;ClickHouse differs in several respects from data warehouses like Snowflake, Redshift, BigQuery, etc., which are also OLAP stores. ClickHouse provides powerful tools for application engineers to optimize critical queries. On the other hand, data warehouses are specialized for query flexibility and the Business Intelligence use case. Critically, ClickHouse is  open source (Apache License 2.0), which was an important consideration when designing Dittofeed (MIT License).&lt;/p&gt;

&lt;p&gt;ClickHouse also excels at storing and querying semi-structured data, like event logs. Previously, many engineering teams used Elasticsearch in a similar niche to ClickHouse, building applications like Kibana. Increasingly, developers are choosing ClickHouse over Elasticsearch for its unparalleled performance characteristics. For example, our friends at &lt;a href="http://hyperdx.io"&gt;hyperdx.io&lt;/a&gt; are using ClickHouse to build an open-source OpenTelemetry provider!&lt;/p&gt;

&lt;h3&gt;
  
  
  User Segments - How We Use ClickHouse
&lt;/h3&gt;

&lt;p&gt;We use ClickHouse to store and query our semi-structured event logs describing users’ behavior. The results of these queries allow us to decide which users to message, when, and how to message them. Relevantly, we use ClickHouse to calculate &lt;em&gt;user segments&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;User segments, also called audiences in some platforms, are just a list of users calculated based on the user’s traits or past behaviors. Calculating user segments is central to Dittofeed’s platform, and as we began to scale, it’s caused us problems!&lt;/p&gt;

&lt;p&gt;💣💥&lt;/p&gt;

&lt;p&gt;We provide a low-code interface for Dittofeed workspace members (not to be confused with end-users) to define segments. These segment definitions are constructed in Dittofeed’s UI and are stored as JSON. These definitions are later translated into ClickHouse queries. We run the queries on a polling period to keep the segment assignments live and updated as users enter and exit segments. The following typescript pseudocode demonstrates this logic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight tsx"&gt;&lt;code&gt;&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt; &lt;span class="nf"&gt;main&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;while &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;reCalculateSegments&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;POLLING_INTERVAL&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The concern of this article is how to define the tables and queries underlying the &lt;code&gt;reCalculateSegments&lt;/code&gt; function. This turns out to be a fairly general problem that many engineers can benefit from understanding!&lt;/p&gt;

&lt;p&gt;For this problem, I’ve provided a git repo containing a series of test implementations in increasing order of complexity. Check it out to follow along!&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/dittofeed/clickhouse-segments-tutorial"&gt;https://github.com/dittofeed/clickhouse-segments-tutorial&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Sample Segment Definition&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;For this article, we’re going to use a simple illustrative segment. That segment will contain users who have performed at least 2 &lt;code&gt;BUTTON_CLICK&lt;/code&gt; events.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Naive Approach
&lt;/h2&gt;

&lt;p&gt;In this naive approach, we’ll create two tables. One is to store our user events. The other is to store segment assignments.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;user_events_naive&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="c1"&gt;-- we care when this value equals 'BUTTON_CLICK'&lt;/span&gt;
    &lt;span class="n"&gt;event_name&lt;/span&gt; &lt;span class="n"&gt;LowCardinality&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="nb"&gt;timestamp&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;MergeTree&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;segment_assignments_naive&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="nb"&gt;Boolean&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;assigned_at&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ReplacingMergeTree&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We insert assignments by checking which users have at least two &lt;code&gt;BUTTON_CLICK&lt;/code&gt; events.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;segment_assignments_naive&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;count&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;user_events_naive&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;event_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'BUTTON_CLICK'&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;SELECT&lt;/span&gt;
  &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="c1"&gt;-- We have to use an aggregation here to select the latest value because the&lt;/span&gt;
  &lt;span class="c1"&gt;-- ReplacingMergeTree engine replaces values asynchronously, so we need to be&lt;/span&gt;
  &lt;span class="c1"&gt;-- sure to skip over stale values.&lt;/span&gt;
  &lt;span class="n"&gt;argMax&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;assigned_at&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;latest_value&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;segment_assignments_naive&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;
&lt;span class="k"&gt;HAVING&lt;/span&gt; &lt;span class="n"&gt;latest_value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;True&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above code is implemented here:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/dittofeed/clickhouse-segments-tutorial/blob/5a851fb4af1f4183c90fd47b4cbd5b5f8f205405/src/1-naive.test.ts"&gt;1-naive.test.ts&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We use a &lt;code&gt;ReplacingMergeTree&lt;/code&gt; engine table for our segment assignments so that when we re-calculate our segment values, the new values replace the old ones.&lt;/p&gt;

&lt;p&gt;This approach is relatively simple. However, it has several issues. Let’s start with one, a lack of &lt;em&gt;idempotency&lt;/em&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Idempotency
&lt;/h2&gt;

&lt;p&gt;Our table/query structure is not idempotent! What’s idempotency, and why do we care? An operation is idempotent if performing it multiple times is logically equivalent to performing it a single time. This is one of the most important concepts for understanding distributed systems. &lt;/p&gt;

&lt;p&gt;In distributed systems, even in the best case, messages either get delivered &lt;em&gt;at least once&lt;/em&gt; with the potential for duplicates, or &lt;em&gt;at most once&lt;/em&gt; with a potential for dropped messages. To know which bucket you fall into, a simple rule of thumb is to ask whether clients implement retries when issuing requests to your API. If they do, in the best case you’ve got at least once delivery. Otherwise, you may have at most once delivery.&lt;/p&gt;

&lt;p&gt;Luckily, we can combine at least once delivery, with idempotency, to achieve logically exactly once delivery. In our case, we allow clients to send duplicate events and are smart enough to ignore the duplicates when calculating user segments. We can achieve that by introducing an idempotency key &lt;code&gt;message_id&lt;/code&gt; to our events table.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;user_events_idempotent&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;event_name&lt;/span&gt; &lt;span class="n"&gt;LowCardinality&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;message_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nb"&gt;timestamp&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;MergeTree&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message_id&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;segment_assignments_idempotent&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="nb"&gt;Boolean&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;assigned_at&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ReplacingMergeTree&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, we can count the number of unique message Ids when calculating the number of times a user has performed an action.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;segment_assignments_idempotent&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;uniq&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;user_events_idempotent&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;event_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'BUTTON_CLICK'&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you can engineer your APIs to be idempotent, you’ll already be doing better than the bulk of the field! For example, other popular customer engagement platforms like Braze and OneSignal don’t include idempotency keys in their APIs and are thus susceptible to storing duplicated user events and miscalculating segments. This can result in consumers of these platforms messaging the wrong users at the wrong times!&lt;/p&gt;

&lt;p&gt;The above code is implemented here:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/dittofeed/clickhouse-segments-tutorial/blob/5a851fb4af1f4183c90fd47b4cbd5b5f8f205405/src/2-idempotent.test.ts"&gt;2-idempotent.test.ts&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Still, in the intro, I alluded to our database being at risk of exploding. We haven’t addressed performance at all. Our existing table and query structures have a critical flaw. We’re querying against every event, and re-calculating every user segment with every polling period!&lt;/p&gt;

&lt;p&gt;That means as your events table grows larger, and your application acquires more users, these queries will grow slower and more expensive over time, putting your application at risk! 😱.&lt;/p&gt;

&lt;p&gt;To solve this problem, we’ll need an approach that allows us only to consider new events as they’re inserted, and to recalculate segments incrementally.&lt;/p&gt;

&lt;h2&gt;
  
  
  Micro-Batching
&lt;/h2&gt;

&lt;p&gt;We’ll approach this problem by using a technique called “micro-batching”. Micro-batching can be considered a way to simulate stream processing, by processing incoming events in small batches. This requires introducing several new tables. A lot is happening here, so let’s look at the following SQL statements and accompanying comments.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;user_events_micro_batch&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;event_name&lt;/span&gt; &lt;span class="n"&gt;LowCardinality&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;message_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="nb"&gt;timestamp&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;MergeTree&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message_id&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- We make use of the AggregatingMergeTree engine which allows us to to store&lt;/span&gt;
&lt;span class="c1"&gt;-- the intermediate state representing each user's message id count. This allows&lt;/span&gt;
&lt;span class="c1"&gt;-- us to avoid having to re-sum all of a user's BUTTON_CLICK event message id's&lt;/span&gt;
&lt;span class="c1"&gt;-- with each new polling period, and instead only add the new events to an&lt;/span&gt;
&lt;span class="c1"&gt;-- existing count. ClickHouse conveniently merges the aggregation state in the&lt;/span&gt;
&lt;span class="c1"&gt;-- background.&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;user_states_micro_batch&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;event_count&lt;/span&gt; &lt;span class="n"&gt;AggregateFunction&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;uniq&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;computed_at&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AggregatingMergeTree&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- This table allows us to track the last time a user's state was updated,&lt;/span&gt;
&lt;span class="c1"&gt;-- within the last 100 days. This is useful so that we can only update user&lt;/span&gt;
&lt;span class="c1"&gt;-- segment's for users whose state have been updated since the last polling&lt;/span&gt;
&lt;span class="c1"&gt;-- period.&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;updated_user_states_micro_batch&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;computed_at&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;MergeTree&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;PARTITION&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;toYYYYMMDD&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;computed_at&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;computed_at&lt;/span&gt;
&lt;span class="n"&gt;TTL&lt;/span&gt; &lt;span class="n"&gt;toStartOfDay&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;computed_at&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;interval&lt;/span&gt; &lt;span class="mi"&gt;100&lt;/span&gt; &lt;span class="k"&gt;day&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- This materialized view is used to automatically populate the updated states&lt;/span&gt;
&lt;span class="c1"&gt;-- table whenever the states table receives new data.&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;MATERIALIZED&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;updated_user_states_micro_batch_mv&lt;/span&gt;
&lt;span class="k"&gt;TO&lt;/span&gt; &lt;span class="n"&gt;updated_user_states_micro_batch&lt;/span&gt;
&lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;computed_at&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;user_states_micro_batch&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;segment_assignments_micro_batch&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="nb"&gt;Boolean&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;assigned_at&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ReplacingMergeTree&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, we need to populate the assignments table in two steps: inserting into the state table and then inserting into the assignments table.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;user_states_micro_batch&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="c1"&gt;-- The following is the syntax for producing the incremental state used by&lt;/span&gt;
    &lt;span class="c1"&gt;-- AggregatingMergeTree tables.&lt;/span&gt;
    &lt;span class="n"&gt;uniqState&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message_id&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;parseDateTimeBestEffort&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;user_events_micro_batch&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt;
    &lt;span class="n"&gt;event_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'BUTTON_CLICK'&lt;/span&gt;
    &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;parseDateTimeBestEffort&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="n"&gt;lower_bound&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;segment_assignments_micro_batch&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;uniqMerge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event_count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;parseDateTimeBestEffort&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;user_states_micro_batch&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt;
    &lt;span class="c1"&gt;-- We use this clause to select only users whose state has been updated in&lt;/span&gt;
    &lt;span class="c1"&gt;-- the latest polling period&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="k"&gt;IN&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;
        &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;updated_user_states_micro_batch&lt;/span&gt;
        &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;computed_at&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;parseDateTimeBestEffort&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Our polling pseudocode also needs to be updated. Now, our &lt;code&gt;reCalculateSegments&lt;/code&gt; function depends on when the last polling period occurred, so it can window its queries.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight tsx"&gt;&lt;code&gt;&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="kd"&gt;function&lt;/span&gt; &lt;span class="nf"&gt;main&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="kd"&gt;let&lt;/span&gt; &lt;span class="nx"&gt;lastPollingPeriod&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;Date&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;getLastPollingPeriod&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

  &lt;span class="k"&gt;while &lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;reCalculateSegments&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;lastPollingPeriod&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;POLLING_INTERVAL&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

    &lt;span class="nx"&gt;lastPollingPeriod&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Date&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;updateLastPollingPeriod&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;lastPollingPeriod&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This code introduces two new function calls, &lt;code&gt;getLastPollingPeriod&lt;/code&gt;, and &lt;code&gt;updateLastPollingPeriod&lt;/code&gt;, which are methods for reading and writing a single timestamp. We store the equivalent of this timestamp in our Postgres database.&lt;/p&gt;

&lt;p&gt;This relatively complex setup gets us segment recalculations whose performance depends on the number of events in a single polling period, but crucially &lt;em&gt;does not directly depend&lt;/em&gt; on the total number of user events.&lt;/p&gt;

&lt;p&gt;The above code is implemented here:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/dittofeed/clickhouse-segments-tutorial/blob/0b7785dddbf0a33fb22a5c7f44ccb124b0f0c2d6/src/3-microBatch.test.ts"&gt;3-microBatch.test.ts&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;However, we’re still not done. Something’s still not quite right.&lt;/p&gt;

&lt;h3&gt;
  
  
  Event Time vs Processing Time
&lt;/h3&gt;

&lt;p&gt;What exactly is the &lt;code&gt;timestamp&lt;/code&gt; on our events table? Is it the time when the user behavior occurred, the “event time”? Or is it the time when the event was recorded, the “processing time”? This is an important distinction because you might receive events that are &lt;em&gt;drastically&lt;/em&gt; out of order in event time.&lt;/p&gt;

&lt;p&gt;Imagine an end-user’s actions are logged as events, but their device loses network connectivity, only to regain it an hour later. The result will be events received an hour late in event time! Because we lack ordering guarantees with respect to event time, it’s important that we do our micro-batch windowing in processing time.&lt;/p&gt;

&lt;p&gt;As an aside, due to imperfect clocks, events can also be recorded out of order with respect to processing time. This issue can be exacerbated when you introduce sharding to your database. However, that’s a discussion for another time…&lt;/p&gt;

&lt;p&gt;Let’s see how we can address this issue of distinguishing event time from processing time with our new table structure.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;user_events_event_time&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;event_name&lt;/span&gt; &lt;span class="n"&gt;LowCardinality&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;message_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="c1"&gt;-- We store event time and processing time separately.&lt;/span&gt;
    &lt;span class="n"&gt;event_time&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;processing_time&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;MergeTree&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="c1"&gt;-- Crucially our sorting key uses processing time, *not* event time.&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;processing_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message_id&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;user_states_event_time&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;event_count&lt;/span&gt; &lt;span class="n"&gt;AggregateFunction&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;uniq&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="c1"&gt;-- We maintain state representing the last event time.&lt;/span&gt;
    &lt;span class="n"&gt;last_event_time&lt;/span&gt; &lt;span class="n"&gt;AggregateFunction&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;max&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;computed_at&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;AggregatingMergeTree&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;segment_assignments_event_time&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="nb"&gt;Boolean&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="c1"&gt;-- We record the last relevant time for the segment.&lt;/span&gt;
    &lt;span class="n"&gt;last_event_time&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;assigned_at&lt;/span&gt; &lt;span class="nb"&gt;DateTime&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;Engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ReplacingMergeTree&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then the accompanying queries.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;segment_assignments_event_time&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;uniqMerge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event_count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;maxMerge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;last_event_time&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;parseDateTimeBestEffort&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;user_states_event_time&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="k"&gt;IN&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;
        &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;updated_user_states_event_time&lt;/span&gt;
        &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;computed_at&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;parseDateTimeBestEffort&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;segment_assignments_event_time&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;uniqMerge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event_count&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;maxMerge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;last_event_time&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;parseDateTimeBestEffort&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;user_states_event_time&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt; &lt;span class="k"&gt;IN&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;
        &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;updated_user_states_event_time&lt;/span&gt;
        &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;computed_at&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;parseDateTimeBestEffort&lt;/span&gt;&lt;span class="p"&gt;({&lt;/span&gt;&lt;span class="n"&gt;now&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;SELECT&lt;/span&gt;
    &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;argMax&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;last_event_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;assigned_at&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;last_event_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;argMax&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;assigned_at&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;latest_value&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;segment_assignments_event_time&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;user_id&lt;/span&gt;
&lt;span class="k"&gt;HAVING&lt;/span&gt; &lt;span class="n"&gt;latest_value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;True&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This new table and query structure allows us to record event and processing times separately and surface event times with our segment assignments!&lt;/p&gt;

&lt;p&gt;The above code is implemented here:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/dittofeed/clickhouse-segments-tutorial/blob/0b7785dddbf0a33fb22a5c7f44ccb124b0f0c2d6/src/4-eventTime.test.ts"&gt;4-eventTime.test.ts&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Conclusion
&lt;/h3&gt;

&lt;p&gt;We covered a lot of concepts and code in this article. We’ve covered how to calculate user segments in ClickHouse with the following qualities:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Idempotency&lt;/li&gt;
&lt;li&gt;Micro-batching&lt;/li&gt;
&lt;li&gt;Distinguished event and processing times.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If you felt you got any value from it, we’d appreciate it if you gave Dittofeed’s repo a star. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/dittofeed/dittofeed"&gt;https://github.com/dittofeed/dittofeed&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Thanks!&lt;/p&gt;

</description>
    </item>
  </channel>
</rss>
