When we operate our mech shooter, War Robots, and our other games, we want to know what’s going on there; it’s also nice to know what’s happening on the servers — this means we need to collect events. Obviously, it’s impossible to do this manually, necessitating some kind of system. To that end, at our studio, we have AppMetr, which has been helping us out for years.
In this article, we’ll talk about how we collect events from our mobile devices and servers, how we store them, and why we don’t use ready-made analytical databases.
We first thought about the need to collect and analyze data back in 2011. At that time, the Pixonic studio was growing and changing rapidly, so we needed a very flexible analytical system.
Our first team of developers didn’t find a suitable solution, so a team member began the creation of our own system. We called it AppMetr, and built it in Java, using Cassandra to store events — we chose these technologies for one simple reason: the team had experience working with them. Additionally, we used Storm for distributed event processing, and Kafka for queues, and of all this was running on Linux servers.
Simultaneously, our main title, the mobile game War Robots, began a period of very active growth. In 6 months, the number of events received by AppMetr increased from 150 to 1 billion events per day. The old architecture couldn’t keep up, so we had to make quick decisions.
AppMetr does three things:
- It collects events
- It stores them
- It allows us to perform analytical queries on them
An event is a simple JSON object with a name and some set of attributes. (However, we don’t know in advance the events, names or attributes that will come to us.)
Event sets are combined into batches, which are transmitted to us over the network. We also support all the standard, JSON-permissible data types, and we use a flexible data scheme, which is very convenient — different development teams can come up with new events and make changes to old ones (for example, by changing the type for some attribute).
That said, this flexible data scheme has its own problems and there aren’t many analytical databases on the market that use it. This is because it’s quite difficult to create or maintain such a system. And if you try to create it yourself, then without fail, over time, your code will become more and more complicated, it will be difficult to optimize, and everything will work slowly. Compressing data without a scheme is also not an easy task. Fortunately, in our case, we managed to partially bypass these problems — but more on that later.
How AppMetr works
We wrote libraries for various programming languages and platforms that accumulate events in memory, form batches, compress them, and send them via the HTTPS protocol to our web servers.
The first one to accept HTTP requests was NGINX. It simply redirected these requests to our processing services, where we separated them, filtered out duplicates and enriched the events with additional attributes (for example, we’d add the current level to each player event).
The problem is that, sometimes, due to poor mobile connection, clients often make several attempts to send the next batch of data. As a result, we ended up with duplicates. At the same time, these duplicates were filtered by the serial number of the batch, stored in the client settings, and incremented with each dispatch, then stored in the so-called “shared preferences”. But it turned out that shared preferences can be sometimes rolled back, and their values can no longer be trusted.
So now, we filter duplicates differently: we calculate the checksum from the client’s request body, save it in Cassandra for six months, and if the client repeats the request within six months — we ignore it. In any case, we must immediately accept events, even if some of our servers are overloaded or unavailable, and when clients are sending much more events than usual (this happens, for example, during when the game is being featured in the store).
To do all this, we created a simple microservice on Vertex called API Proxy. We put it between NGINX and our processing services. This microservice stores the entire incoming HTTP request in Kafka, and responds with “OK” to the client. Thanks to the queue of processing HTTP requests in Kafka, we can now easily endure load peaks and various data center failures, because the second microservice that forwards HTTP requests from Kafka to our processing logic is able to repeat the request, unlike the client.
Now, events enriched with additional attributes are sent to the Tracking service, which groups them by event name and date of occurrence; also it groups events in five minute blocks. The Tracking service records these blocks to Cassandra each time under a new key.
That’s how it all works now, but we previously did it differently: we formed aggregates from all incoming events per day and per five minutes. The only catch was that, quite often, clients were sending events “from the past”. Therefore, we had to generate these aggregates again and rewrite the corresponding keys in Cassandra. This frequent overwriting of data in Cassandra led to a catastrophic slowdown during subsequent readings. In addition, the re-formation of aggregates heavily loaded the network and our infrastructure, making querying very slow for us.
As a result, we abandoned the creation of aggregates and began a path towards happiness. Tracking now saves an immutable data block in Cassandra every five minutes. If the event is rare, we store this block in Cassandra for one node, if it’s frequent, then we store it for several nodes. In other words, we measure the speed of the incoming stream of events during some window and dynamically calculate how many Cassandra nodes we’ll need to store the blocks of this type of event.
It’s also nice when the hard drives on our database servers are loaded evenly. To do this, we change the partition key for the saved blocks every day, but in this case, it’s easy since this key contains the current date.
What is the block of events that we save in Cassandra? It consists of columns for each attribute and meta information. This meta information contains the compression settings for each column (we also store the minimum and maximum values in each column). We can read any column from this block independently of other blocks. For example, if we want to calculate player levels, we’ll read the Level column, and if we want to calculate the levels of players and their UserID, then we will read both columns at the same time, and the same event will correspond to the same ordinal values in both columns.
As mentioned above, we don’t have a fixed data scheme, and we don’t know what type of value we’ll receive. It may turn out that UserID can be sent as a number, and after two seconds as a string. We’re trying to determine the most general data type that will be used for a given column in a given block, and in another block the same column may have a different type.
In addition, we compress attribute columns independently of each other, analyze the stream of values, cardinalities, and other parameters of this stream, and choose the most optimal compression algorithm. When a block is formed, we try to roughly estimate how big the column will be after compression using each of the methods — and we’ll use the winner. In addition, we try to compress the column using ZSTD, but if the size of the column doesn’t decrease, we save it in Cassandra as is. In this form, we store about 2.5 trillion events in Cassandra, which take up about 180 terabytes of data (considering a replication factor of 3).
Why not use a ready-made solution?
There is this opinion that it’s better to use ready-made solutions instead of creating them yourself. For some people, this really works — but not for us. All of the third-party solutions we’ve found were significantly more expensive than the cost of our team and our servers. Thus, we use our AppMetr.
We could use ready-made analytical databases, like ClickHouse, but we have certain problems with it. The main one is that, over the years of AppMetr’s work, tens of thousands of different events with different attributes have accumulated, and transferring them to the built-in ClickHouse data scheme would not be an easy task. We also can’t create a new table in ClickHouse for each type of event, because this would be an anti-pattern.
So, we created our own system, and in Java, even though Java is considered slow. It’s unlikely, of course, that AppMetr will ever reach ClickHouse speeds, but we don’t need that right now. We recently conducted a small experiment purely for ourselves: we launched a query for a year based on hundreds of millions of events filtered by one attribute and grouped by date. This query was executed on one server at a speed of 72 million events per second, and we were completely satisfied with this result.
Another thing: we also wrote SQL support for our system, but it’s very limited and only works with standard simple queries. That is, there is no “JOIN” or subquery support, and this is a problem because our analysts need the ability to perform complex queries.
We decided not to waste time reinventing the wheel and simply installed Trino on our servers. It’s a full featured SQL query engine that works on your data. Now our analysts can use it to work with data from AppMetr and execute queries at different levels of complexity.
Actually, AppMetr is very popular within our company due to its flexibility: any employee can create a new project, distribute access to it, add dashboards with various charts, share a link to a chart with a colleague, click on various filtering expressions, add groupings, and so on. Developers and analysts can take data from AppMetr using the SQL API and process it in Jupiter or locally. And game developers can integrate the AppMetr SDK into their product, come up with a name for an event, add some attributes, and see it on the charts in minutes.
Let’s move on to the most interesting part and talk about how we perform queries on collected events. Actually, everything is pretty simple. We have several query servers. The initial SQL query of the user arrives at a random server, where we divide it and form a query execution scheme, which we send to some queries-server.
It turns out that writing your own SQL dialect is quite simple; we took the ready-made ANTLR4 library, wrote some simple grammar for it, implemented a visitor, and now we parse SQL queries without any problems. Our query servers are partitioned by the name of the event, so the query coordinator always knows which server to send the user’s request to. And if the server is unavailable, then the request is forwarded to the next one by ring.
The query servers cache the event blocks they download from Cassandra on their local hard drives, so they can take full advantage of the operating system’s page cache and work with local data very efficiently. It’s easy for us to cache event blocks on query-servers because they are immutable.
We cache blocks using an algorithm and LRU, meaning the least-used block is deleted first, and we use SSD disks for the disk cache. On each query server we have a simple folder scheme; there is a folder for each day and each event. It’s worth noting that the first component in this folder structure is the server number; this is needed so that any query server can take on the load of the previous one in the ring.
And since we use the column-oriented database approach, we have a separate file for each event attribute in this folder. We also have two files with meta information: these indicate which columns and which blocks have already been requested from Cassandra, as well as the compression settings for each column.
Before we used to store data in the cache in a different way. For each type of event from each day, we recorded data in one large file and read it from top to bottom, skipping attribute columns that were unnecessary for this request. Now, we store different event columns in different files; the amount of data read by our application from the disk has not changed, but the reading speed has increased significantly.
We only use sequential reading and sequential recording. If it’s necessary to record a new block of data to the cache, then we simply add a new block’s columns to the end of the corresponding column files on disk; and when we start executing a query, we download only the attribute columns necessary to execute this query from Cassandra. Also, before reading the column, we map it entirely into memory and this further helps increase speed. (It’s worth noting that you can map memory to files even larger than your RAM.)
How AppMetr executes queries
Any analytic query that comes into AppMetr goes through four simple steps:
- We download data blocks from Cassandra to the cache on query-servers
- We filter these data blocks
- We group events from these blocks, taking into account the aggregation functions that are specified in the query
- We perform post-processing of the query results: for example, we get the 10 most popular keys
Let’s take a closer look at some of these steps.
When we download data blocks from Cassandra, the query server knows which blocks it already has in the cache and which ones it doesn’t, so it only requests new information. And if several query servers want to use the same data at the same time, or rather, several users want to use the same data to execute a query, then the query server will download them from Cassandra only once.
When we go through the filtering step, we form a criterion tree in memory. Each criterion receives some column of block attributes as input, filters this column by some expression, and returns a bit set with the result. Each ordinal value in the original column in this bit-set will correspond to bit 0 or 1. One means that this value satisfies the criterion filtering condition. Bit sets obtained from different criteria are combined using logical expressions and a final bit set is formed with the result of filtering, which is passed to the next stage of processing.
The next step is to group the data. The input of the grouping is the initial block of the event and a bit-set with the filtering result. We iterate over the original event block using the bit set and group the events in memory using regular HashMap.
It’s worth noting that we always perform grouping in memory, but if the size of the grouping map exceeds the limits, then we sort it by key and record it to a file on disk. We create a new grouping map in memory. When it gets filled up, we take it back to the disk again. Upon completion of the grouping, we have several sorted files on the disk and one map in memory, which we also sort. Since there are multiple sorted data sources, we can use “merge sort” to merge them. We create a merge iterator for these data sources and pass it to the next stage of processing.
This grouping pattern had its issues. Namely, the issue of data accumulation in memory, and then its abrupt release and transfer to disk. Previously, we used Java 8, G1 Garbage Collector, and a 30-gigabyte Heap on each server — we had to deal with the fact that Garbage Collector stopped collecting garbage in the old generation, and we had a long Full GC. We looked at the GC logs and saw a message there: GC concurrent-mark-reset-for-overflow.
It seemed that we had enough Heap, but anyway, Full GC worked often. It turned out that G1 lacked an internal stack to store our temporary object graph. We increased the MarkStackSize parameter, after which it began collecting garbage in the old generation as well, and this meant we stopped having the long Full GC.
Once we’ve calculated the results of query execution, now we must send them to the client. What could go wrong here? Actually, a lot. The fact is that we use merge iterators, which are some kinds of deferred calculations that we cannot do all at once, otherwise we’ll get OutOfMemory. So, we stream the query result to a client with back pressure support, while encoding the response in CSV format. If the client starts to read the result more slowly, then we likewise perform the iteration more slowly over the merge iterator, reading data from the disk more slowly. As a result, streaming the result helps us return large query results.
To speed up the execution of requests within a single server, we use servers with multiple cores, and AppMetr utilizes them all. All the data in AppMetr is partitioned by day, so different days can be processed in parallel on different cores, and these different threads on different cores don’t share any common state; we can greatly optimize our code, without using concurrent data structures, which helps a lot.
In addition, we try to use only native data types, we do not use String if we need to filter a string in some substring, and perform all operations on byte arrays. And, of course, we make the minimum number of allocations of new temporary objects.
Finally, let’s take a look at how we share the resources of the query server between queries running in parallel. To do this, we have a solution that has shown great results in practice. For context, we received the task of utilizing server resources as efficiently as possible, but at the same time allowing the parallel execution of queries. To do this, each query server runs one instance of our application. We allocate a certain amount of RAM and a certain number of processor cores to this application; a separate thread of execution starts for each processor core in the application, and the memory allocated to the query-server is divided equally between these threads.
A thread can’t use the memory allocated to another thread. That is, in parallel on one query server, we can execute as many queries as there are cores in the server, and if it’s impossible to allocate a guaranteed core to a new query, then this query is queued.
Let’s consider this situation as an example: a new user request comes, we allocate a guaranteed core to it, the request begins to be executed, and at that moment — the rebalancing service is triggered. It considers the number of currently running requests and the number of cores in the server. This service can add an extra thread of execution to any active request or take this thread away by passing it to another request. The rebalancing service takes into account various nuances, for example, additional threads may not be of great use for some requests if they’re a request for one day, for example.
It’s not difficult to create your own specialized column store, and it will work quickly even in Java. In this case, you don’t have to aggregate the event or set up a data scheme in advance. In our case, we created AppMetr for our tasks — it helps us quickly collect all the necessary information and is equally useful to employees from various departments.
Top comments (0)