This article was written by Andreas Braun.
If you regularly drive a car, you will no doubt be familiar with fuel prices. They vary wildly from country to country, change frequently depending on global oil prices, and are a constant discussion topic in politics. Have you sometimes felt that prices rise and fall somewhat arbitrarily? Speaking of price changes, how often do prices change where you live? How much could you save if you travelled to the next city, or even country to refuel your car?
While these questions may seem silly to some, to people in Germany, they come up quite often. You'd be surprised to hear that fuel prices in Germany change multiple times per day, and they vary wildly between mornings and evenings, and even between different cities in Germany. In 2012, the German government passed a law that enabled the Federal Cartel Office to require petrol stations to submit their fuel prices to a central database, which can then be consumed by various websites and apps to help people find the cheapest petrol station in their area. While the data isn't considered "open data" and access is only granted to people or companies that maintain a service to inform consumers, there is one such service that provides full access to all data. Since I was always interested in working with big amounts of data to see various trends, I decided to use this dataset to build a small application that shows some trends. This also provides a good opportunity to show how to work with such large amounts of data in MongoDB.
When we’re done, we’ll want to show a map of where a station is, its data, and the price history for the last day for which we have prices (which would be the current day once the application receives live data):

In this first part, we're going to inspect the data we have, how we can import it into MongoDB, and how we can design a schema that allows us to efficiently work with the data. In the second part, we will then build a Symfony application to display the data.
A first look at the data
Unfortunately, there is no open data solution to get access to this, and the data is only provided to registered applications that provide an information service to users. However, CSV files were available through a git repository for a while, which is what I used here. Cloning the repository gives us a first indication of what a massive dataset we're dealing with here:
du -hcs *
78G prices
8.0K README.md
9.8G stations
87G total
78 GB of price data, and almost 10 GB of station data. That's quite the dataset we've got here! Within the two datasets, the files are conveniently organized by year and month, with one file per day:
ls -1 prices/2025/06
2025-06-01-prices.csv
2025-06-02-prices.csv
[...]
ls -1 stations/2025/06
2025-06-01-stations.csv
2025-06-02-stations.csv
[...]
This is good news for us, as it allows us to slowly start working with some data while we build our schema, then import more and more data as we try to stretch the performance limits. But what does 78 GB of price data even mean? First, let's see how many stations we have:
wc -l stations/2025/06/2025-06-24-stations.csv
17592 stations/2025/06/2025-06-24-stations.csv
Ok, this is not too bad—we have just under 18,000 stations in the last file. Let's see how many price records we have for that day:
wc -l prices/2025/06/2025-06-24-prices.csv
453714 prices/2025/06/2025-06-24-prices.csv
Wow, more than 450,000 price records for a single day, which works out to an average of 25.8 records per station per day! This is going to be a challenge, but let's not get ahead ourselves. First, let's take a more detailed look at the data to see what information we have, and then talk about how to efficiently store such data in MongoDB.
Analyzing data
Let's start by looking at the station data. The first line of the CSV file contains the column names, so we can easily see what data is available:
uuid,name,brand,street,house_number,post_code,city,latitude,longitude,first_active,openingtimes_json
0e18d0d3-ed38-4e7f-a18e-507a78ad901d,OIL! Tankstelle München,OIL!,Eversbuschstraße 33,,80999,München,48.1807,11.4609,1970-01-01 01:00:00+01,"{""openingTimes"":[{""applicable_days"":192,""periods"":[{""startp"":""07:00"",""endp"":""20:00""}]},{""applicable_days"":63,""periods"":[{""startp"":""06:00"",""endp"":""22:00""}]}]}"
This looks quite promising, and just from this first look, we can already make some assumptions about the dataset:
- The identifier is a UUID.
- There's a name and a brand for the station, the latter of which will be quite interesting when comparing stations.
- The address data is split into street, house number, post code, and city. However, the first record already shows the house number in the street field, so the data might need some cleaning if we want to use this data.
- There are coordinates to place the petrol station on a map. We can use these coordinates in a geospatial index to find stations near a given location and better compare prices.
- The
first_activeis supposed to indicate when the station was first active, but given that the first record lists the Unix epoch timestamp, this field might not provide data for all stations. We might just skip this as a result. - There's a JSON field containing the opening hours of the station. This data doesn't look like it will be useful for querying for open stations, so we will likely want to ignore this data as well.
Now, let's look at the price data. After all, this is what we came here for. Here's an example record:
date,station_uuid,diesel,e5,e10,dieselchange,e5change,e10change
2025-06-24 00:00:32+02,cbc2ab85-28ad-47e7-a102-3a0990f5d615,1.559,1.689,1.629,1,1,1
From each record, we can obtain the following data:
- The date and time of the report
- The station that reported the price
- Prices for three types of fuel (Germany uses two different types of petrol with 5% and 10% ethanol)
- Indicators whether any price changed
Looking at more data in this file, we can spot a couple of odd rows:
2025-06-24 00:01:33+02,8336b8d7-01fc-4f79-bdd3-6870c8268035,1.669,0.000,0.000,1,0,0
2025-06-24 00:01:33+02,87666c5e-eead-4ead-9b8c-eb8ab410fddb,1.599,1.679,1.619,0,0,1
Both rows indicate that some prices changed and others didn't, but there's a small difference: One row reports a price despite no change being recorded, and the other doesn’t. I'm not sure if this is significant—it could mean that the first station doesn't sell that fuel type. Either way, we'll want to exclude such data from our import. It is also interesting that the schema isn't properly normalized: Typically, the data would be stored in individual rows for each fuel type. This would prevent those odd rows from above:
date,station_uuid,fuel,price
2025-06-24 00:00:32+02,cbc2ab85-28ad-47e7-a102-3a0990f5d615,diesel,1.559
2025-06-24 00:00:32+02,cbc2ab85-28ad-47e7-a102-3a0990f5d615,e5,1.689
2025-06-24 00:00:32+02,cbc2ab85-28ad-47e7-a102-3a0990f5d615,e10,1.629
2025-06-24 00:01:33+02,8336b8d7-01fc-4f79-bdd3-6870c8268035,diesel,1.669
2025-06-24 00:01:33+02,87666c5e-eead-4ead-9b8c-eb8ab410fddb,e10,1.619
This was most likely a deliberate choice to make the schema more efficient: Storing three additional flags (even when used as a byte) and two additional float fields per row is definitely more efficient than storing the date and UUID fields three times. So, while my university professor would probably dock some points in an exam for this, it is a valid choice. To confirm my suspicion, let's just count the number of lines in the entire price dataset:
wc -l prices/**/*.csv
993187497 total
Yes. Almost a billion price records in total! With around 50% of those records changing all three prices, we're probably looking at more than two billion individual prices in the dataset. No wonder they started optimizing the database. Now that we know what we're dealing with, let's try to design a schema for this.
Designing the schema
When we draw the schema up as an entity relationship (ER) diagram, it looks relatively straightforward:

In a relational database, we could start modeling tables based on this diagram, but as we saw with the prices, the sheer amount of data might force us to make some compromises. We're also using MongoDB, so a relational schema isn't going to be the way to go.
Station data is going to be easy, with the station document containing all data found in the stations CSV file. We can store the address as an embedded document, as well as the location as a GeoJSON point:
{
"_id": {
"$binary": {
"base64": "DhjQ0+04Tn+hjlB6eK2QHQ==",
"subType": "04"
}
},
"name": "Oil! Tankstelle München",
"brand": "OIL!",
"address": {
"street": "Eversbuschstraße 33",
"houseNumber": "",
"postCode": "80999",
"city": "München"
},
"location": {
"type": "Point",
"coordinates": [
11.4609,
48.1807
]
}
}
The ID might look a bit strange here, but this is actually the same UUID as in the CSV file. We can store it as a BSON binary with subtype 4, which stores a UUID according to RFC 4122. The $binary notation is the extended JSON representation of the BSON binary type, which is used to represent these BSON types in JSON.
Storing prices requires a little more thinking. If we wanted to store prices in a normalized way, we would store a document for each price report and reference the station and fuel type:
{
"_id": {
"$oid": "673f8a8b3de33b548f9228d0"
},
"fuel": "e5",
"station": {
"$binary": {
"base64": "DhjQ0+04Tn+hjlB6eK2QHQ==",
"subType": "04"
}
},
"date": {
"$date": "2025-06-24T05:08:10.000Z"
},
"price": 1.839
}
While this would certainly work and make my university professor quite happy, it isn't a very efficient way of storing prices for multiple reasons:
- The metadata for each price report is quite large compared to the actual information we want to store.
- With roughly 150 million price reports per year, this metadata would require a lot of storage space, which is not very efficient.
- With the collection growing to billions of documents, indexes will grow quite large as well, further complicating our storage requirements.
- Showing aggregate data requires lots of reads on this large collection, making the process quite slow.
Thanks to MongoDB's document model, we don't have to live with these downsides. Instead, we can leverage schema patterns to store data more efficiently.
The bucket pattern allows us to group data into larger documents, which can be stored and queried more efficiently. MongoDB's time series collections apply this pattern automatically, but we can't use it because we'll be using the $merge aggregation pipeline stage, which doesn't work with time series collections. An important consideration when using the bucket pattern is the granularity of the data we want to store. This is where it's useful to think about how you will be using the data.
Our first use case will be to look for a given fuel station, then seeing their latest prices and maybe the price history for that day. We might also be interested in showing aggregated prices over a longer period of time. With this use case, it makes sense to create buckets for each day, containing all prices for that particular day. Our bucket will thus be identified by a station, a fuel type, and a date. Prices will be stored in a list that we'll keep sorted in ascending order. This is what that document will look like:
{
"_id": {
"$oid": "673f8a8b3de33b548f9228d0"
},
"fuel": "e5",
"station": {
"$binary": {
"base64": "DhjQ0+04Tn+hjlB6eK2QHQ==",
"subType": "04"
}
},
"day": {
"$date": "2025-06-24T00:00:00.000Z"
},
"prices": [
{
"date": {
"$date": "2025-06-24T05:08:10.000Z"
},
"price": 1.839
}
]
}
This document certainly covers the use case above, but there's a small problem. To find a station, we need to either know the station's ID (e.g., by looking it up in the station collection), or we need to join to the station collection to find the station we're looking for. As you can imagine, this will slow down queries quite a bit. Again, we can make use of MongoDB's document model to work around this problem: Instead of storing a reference to the station, we can embed all station data in the bucket document. This way, we can create all necessary indexes and efficiently query the data. This is quite common in MongoDB, especially when working with data that doesn't change very often.
Keep in mind that this is a trade-off: Our documents will occupy more storage space, and if station data changes, we will need to update all its bucket documents. In our case, this is a fair trade-off as the data is not likely to change often, and the performance benefits of more efficient queries outweigh the increased storage requirements. Note that we don't necessarily have to duplicate all data when embedding. The amount of data to be duplicated will heavily depend on the use case, how often the data changes, and how often it will be needed by the application. This is our price document with embedded station data:
{
"_id": {
"$oid": "673f8a8b3de33b548f9228d0"
},
"fuel": "e5",
"station": {
"_id": {
"$binary": {
"base64": "DhjQ0+04Tn+hjlB6eK2QHQ==",
"subType": "04"
}
},
"name": "Oil! Tankstelle München",
"brand": "OIL!",
"address": {
"street": "Eversbuschstraße 33",
"houseNumber": "",
"postCode": "80999",
"city": "München"
},
"location": {
"type": "Point",
"coordinates": [
11.4609,
48.1807
]
}
},
"day": {
"$date": "2025-06-24T00:00:00.000Z"
},
"prices": [
{
"date": {
"$date": "2025-06-24T05:08:10.000Z"
},
"price": 1.839
}
]
}
With this schema, we can now efficiently query prices, but if we want to show prices over a larger period of time, there's still some aggregation to do. Let's say that on the detail page for a station, we want to show the lowest, average, and highest price for a given fuel type over the last 30 days. We could certainly do this by querying the right documents, then aggregating the prices using the aggregation framework. However, once a day has ended, this data is not going to change anymore. It would make sense to pre-aggregate this data and store it in the bucket. So, let's add these aggregated values to our price document:
{
"_id": {
"$oid": "673f8a8b3de33b548f9228d0"
},
"fuel": "e5",
"station": {
"_id": {
"$binary": {
"base64": "DhjQ0+04Tn+hjlB6eK2QHQ==",
"subType": "04"
}
},
"name": "Oil! Tankstelle München",
"brand": "OIL!",
"address": {
"street": "Eversbuschstraße 33",
"houseNumber": "",
"postCode": "80999",
"city": "München"
},
"location": {
"type": "Point",
"coordinates": [
11.4609,
48.1807
]
}
},
"day": {
"$date": "2025-06-24T00:00:00.000Z"
},
"prices": [
{
"date": {
"$date": "2025-06-24T05:08:10.000Z"
},
"price": 1.839
}
],
"lowestPrice": {
"date": {
"$date": "2025-06-24T05:08:10.000Z"
},
"price": 1.839
},
"highestPrice": {
"date": {
"$date": "2025-06-24T05:08:10.000Z"
},
"price": 1.839
},
"averagePrice": 1.839
}
You'll note that we're not just storing the highest and lowest price values, but also when that price was reported. This will allow us to look for patterns in the data and figure out if there's a trend when a station has the lowest price of the day. This is sufficient for now, so let's move on to the next step: importing the data and aggregating it into the schema above.
Importing data
Despite price records storing station data, we will keep all station data in a separate collection. This will serve as a canonical record of all stations, and we can use it to look up station data when we need it. We now have a few options for importing data. If I only wanted to import data for a single day, I would leverage MongoDB Atlas' federated databases to read the CSV files directly from Azure DevOps and import them into MongoDB. However, since I want to import lots of historical data, there are more efficient ways to do this. Again, it's important to know the data that we have: We want to store buckets for each day, and the CSV files contain one file per day. This means that it will be efficient to load a single CSV file, pre-aggregate that data into the format that we need, then continue with the next file. Instead of using mongoimport, I chose to write a small Symfony Console command that reads the CSV file into a collection, separating the fuel types and converting data into the right format. After that, the command aggregates the data into buckets and adds them to the DailyPrice collection. MongoDB's aggregation framework is perfect for this task, so it's about time we look at some code.
From a list of prices, we can group them into buckets and find the highest and lowest prices of the day, but to compute the average price, we would need to know the first price of the day. Since stations don't report prices at midnight, we'll need to know the last price of the previous day. So, we'll start out by storing the last price of the day and will shift that to the next document in a separate step. We also need to denormalize station data, so this is one of the few places where we'll use a join. Except for storing the opening price, we can achieve all of this in a single query and store the result in our DailyPrice collection. Here's the aggregation pipeline I built:
return new Pipeline(
Stage::set(day: Expression::dateTrunc(Expression::dateFieldPath('date'), unit: 'day')),
self::groupPriceReportsByStationDayFuel(),
self::reshapeGroupedPriceReports(),
self::addExtremeValues(),
self::lookupStation(),
);
Aggregation pipeline builder
Ah, yes, simple and elegant. Of course, I'm hiding all the interesting bits so far. This is the aggregation pipeline builder introduced in version 1.21 of the MongoDB driver. While the Doctrine ODM has had one for quite some time, the builder provided by the driver itself allows us to write our code in a functional style. This is also beneficial for testing. Just by looking at this, we can tell that the pipeline does the following in order:
- Groups price reports by station, day, and fuel type
- Reshapes the buckets into the document we want
- Adds the extreme values (lowest and highest prices)
- Looks up station data
Since the first $set stage of the pipeline is the simplest part, let’s take a more detailed look at it:
Stage::set(
// Truncate the date to the specified unit and store it in a new day field
day: Expression::dateTrunc(
// We truncate the date field, which contains the exact time of the price report
Expression::dateFieldPath('date'),
// Truncating to the day effectively sets the day to midnight
unit: 'day',
),
),
Behind the scenes, the driver transforms this code to the following pipeline stage:
{
"$set": {
"day": {
"$dateTrunc": {
"date": "$date",
"unit": "day"
}
}
}
}
The big advantage of using the builder like this is that we can test each part of the aggregation pipeline in isolation in unit tests to make sure it does what we expect, as we do with other pieces of code. It also makes it much more readable, as we don't have to parse through a huge aggregation pipeline to understand what it does. Now, we can start grouping prices. Of course, we could have done so when reading the CSV, but this would've required us to load all prices into memory. Since we're dealing with large amounts of data, this is not a good idea. Using the aggregation framework allows us to push this workload to the database, where we could even use a sharded cluster to distribute the workload across multiple nodes. So, what does the grouping stage look like?
return Stage::group(
_id: object(
station: Expression::fieldPath('station'),
day: Expression::fieldPath('day'),
fuel: Expression::fieldPath('fuel'),
),
prices: Accumulator::push(object(
_id: Expression::fieldPath('_id'),
date: Expression::fieldPath('date'),
price: Expression::fieldPath('price'),
)),
);
The _id field will create a group for each station, day, and fuel. For the prices field, we use the $push accumulator and push all prices into a list. Note that while the _id field is an object, we'll change this further down the pipeline to use a simpler ID that can be indexed more easily. Now, let's build the document that we actually want to store. This can be done with the $replaceWith stage:
return Stage::replaceWith(object(
day: Expression::fieldPath('_id.day'),
station: Expression::fieldPath('_id.station'),
fuel: Expression::fieldPath('_id.fuel'),
prices: Expression::sortArray(
Expression::arrayFieldPath('prices'),
object(date: 1),
),
pricesByPrice: Expression::sortArray(
Expression::arrayFieldPath('prices'),
object(price: 1),
),
));
We extract the day, station, and fuel from the previous _id field, and we create two price lists here: one sorted by date, the other sorted by price. We can use the first list to find the closing price of the day, and the second list to find the lowest and highest prices of the day in the next stage. We can then also remove the price list sorted by price:
return Stage::set(
closingPrice: Expression::getField(
'price',
Expression::last(Expression::arrayFieldPath('prices')),
),
lowestPrice: Expression::first(Expression::arrayFieldPath('pricesByPrice')),
highestPrice: Expression::last(Expression::arrayFieldPath('pricesByPrice')),
pricesByPrice: Expression::variable('REMOVE'),
);
This is where things start getting a bit more complex. The $first and $last operators are pretty self-explanatory, but to only get the closing price of the day, we use the $getField operator to extract the value of the price field from the price document returned by $last. Last but not least, we use the $$REMOVE variable to unset the pricesByPrice field, which saves us from using the $unset pipeline stage later on. Next up, finding the station. This is where aggregation pipelines really shine, as we can use the $lookup stage after grouping and filtering a lot of data to reduce the numbers of documents that need to be processed. In SQL, this would require some nested queries, which can become very difficult to read and understand, but in MongoDB, it is just another stage in the pipeline:
return new Pipeline(
Stage::lookup(
as: 'station',
from: 'Station',
localField: 'station',
foreignField: '_id',
pipeline: new Pipeline(
Stage::project(
_id: true,
name: true,
brand: true,
location: true,
address: true,
),
),
),
Stage::set(
station: Expression::first(Expression::arrayFieldPath('station')),
),
Stage::match(station: Query::ne(null)),
);
But what's this? We're returning an entire pipeline from this factory to be used in a pipeline? Yes! When working with aggregation pipelines in the past, I noticed that there are some common tasks that require more than a single stage. Since the $lookup stage returns an array of documents even if only a single document is matched, we use a $set stage directly afterwards to extract the first (and, in our case, only) document from the array. The aggregation pipeline builder knows how to handle this and contcatenates nested pipelines so you never need to worry about this. For good measure, we also drop all documents where we couldn't find a station. We don't expect any such documents to be there, but it's better to be safe than sorry. Also, note that we're using a $project stage within $lookup. This is to ensure we're only copying the relevant station data over. As you'll see below, we're storing some extra data in the station document, which we don't want to duplicate.
I previously mentioned that writing aggregation pipelines like these not only makes them more readable, but also allows us to test them in isolation. Here's one unit test I wrote to test the addExtremeValues stage:
public function testAddExtremeValues(): void
{
$price1 = ['_id' => 1, 'price' => 2, 'date' => new UTCDateTime(new DateTimeImmutable('2025-06-27T00:00:00+00:00'))];
$price2 = ['_id' => 2, 'price' => 1, 'date' => new UTCDateTime(new DateTimeImmutable('2025-06-27T01:00:00+00:00'))];
$price3 = ['_id' => 3, 'price' => 3, 'date' => new UTCDateTime(new DateTimeImmutable('2025-06-27T02:00:00+00:00'))];
$document = [
'prices' => [
$price1,
$price2,
$price3,
],
'pricesByPrice' => [
$price2,
$price1,
$price3,
],
];
$pipeline = new Pipeline(
Stage::documents([$document]),
PriceReport::addExtremeValues(),
);
$result = iterator_to_array(
$this
->getTestDatabase()
->aggregate($pipeline, ['typeMap' => self::TYPEMAP]),
);
self::assertCount(1, $result);
$result = $result[0];
self::assertEquals(
(object) [
'prices' => [
$price1,
$price2,
$price3,
],
'closingPrice' => 3,
'lowestPrice' => $price2,
'highestPrice' => $price3,
],
$result,
);
}
We can leverage the $documents stage to create a pipeline that starts with a set of documents of our choosing, then insert the pipeline we want to test. This way, we don't have to worry about setting up a collection with test data and removing it again. The downside is that MongoDB won't be able to optimize the pipeline for index use, but since we're only testing small workloads, this is not a problem for us.
Initial import pipeline
Now that we have our pipeline, we can run it against the temporary import collection and store the results in the DailyPrice collection. This is easy enough, as we can use the $merge stage to write the results to that collection:
return new Pipeline(
Stage::set(day: Expression::dateTrunc(Expression::dateFieldPath('date'), unit: 'day')),
self::groupPriceReportsByStationDayFuel(),
self::reshapeGroupedPriceReports(),
self::addExtremeValues(),
self::lookupStation(),
Stage::merge('DailyPrice'),
);
The $merge stage is extremely powerful, as it allows us to specify the behavior when a record with the given identifier already exists. In our case, we're always inserting new data, but we could also handle the case of partial data being there (e.g., when a bucket already exists) and make sure to merge the price lists and run the above aggregation steps on the merged list. With the full import pipeline completed, we can now show why the aggregation builder is so useful. Without it, this is what the entire pipeline looks like:
[
[
'$set' => [
'day' => [
'$dateTrunc' => [
'date' => '$date',
'unit' => 'day',
],
],
],
],
[
'$group' => [
'_id' => [
'station' => '$station',
'day' => '$day',
'fuel' => '$fuel',
],
'prices' => [
'$push' => [
'_id' => '$_id',
'date' => '$date',
'price' => '$price',
],
],
],
],
[
'$replaceWith' => [
'day' => '$_id.day',
'station' => '$_id.station',
'fuel' => '$_id.fuel',
'prices' => [
'$sortArray' => [
'input' => '$prices',
'sortBy' => [
'date' => 1,
],
],
],
'pricesByPrice' => [
'$sortArray' => [
'input' => '$prices',
'sortBy' => [
'price' => 1,
],
],
],
],
],
[
'$set' => [
'closingPrice' => [
'$getField' => [
'field' => 'price',
'input' => [
'$last' => '$prices',
],
],
],
'lowestPrice' => [
'$first' => '$pricesByPrice',
],
'highestPrice' => [
'$last' => '$pricesByPrice',
],
'pricesByPrice' => '$$REMOVE',
],
],
[
'$lookup' => [
'as' => 'station',
'from' => 'Station',
'localField' => 'station',
'foreignField' => '_id',
'pipeline' => [
[
'$project' => [
'_id' => true,
'name' => true,
'brand' => true,
'location' => true,
'address' => true,
],
],
],
],
],
[
'$set' => [
'station' => [
'$first' => '$station',
],
],
],
[
'$match' => [
'station' => [
'$ne' => null,
],
],
],
[
'$merge' => [
'into' => 'DailyPrice',
],
],
]
Sure, we could extract some parts into their own methods like we did before, but this still requires us to know about all stages, their syntax, and the types they accept. With the aggregation builder, we can leverage our IDE to provide auto-completion and type hints, making aggregation pipelines much easier to write and maintain.
Post-import aggregation
Of course, getting the data into MongoDB was just the first step. We're still missing the opening price of the day, which we couldn't compute previously. With that, we can also compute the weighted average price of the day. I will spare you all the dirty details, but the aggregation framework allows us to use the $setWindowFields stage to access a field from a different document in the pipeline:
return Stage::setWindowFields(
partitionBy: object(
station: Expression::fieldPath('station._id'),
fuel: Expression::fieldPath('fuel'),
),
sortBy: object(day: 1),
output: object(
openingPrice: Accumulator::shift(
output: Expression::fieldPath('closingPrice'),
by: -1,
default: Expression::variable('REMOVE'),
),
),
);
In this case, I want to partition by the station and fuel, then sort by the day. I then want to add an openingPrice field using the $shift accumulator with a shift of -1, which means that I want to access the previous document in the partition. If there is no previous document, we are again using the special $$REMOVE variable to completely remove the field from the document. It may only save us 15 bytes, but when we're talking about around 18 million buckets per year, these 15 bytes still add up to around 270 MB of storage space. Speaking of saving storage space, another pattern is shortening field names. For example, instead of using openingPrice, we could use o as a key. I chose not to do so in this tutorial, but this is a valid solution to reduce document size. Using the Doctrine MongoDB ODM, we could still map these fields to more readable properties in our document classes, so we wouldn't even have to worry about this in the application code.
Running the import
With all the aggregation pipeline written and tested on a small scale, it's now time to make our database server do some work. For maximum performance, I decided to run the import on a local MongoDB instance. This allows me to bypass the network latency and leverage the fast SSD storage on my machine. Here's the output of the import command, where I imported data for the years 2022 and 2023:
Import took 178m 51.6s: 582,944,678 documents inserted, 0 updated.
Aggregating imported price reports for further processing...Done in 122m 26.0s.
Adding opening price for each day and merge into prices collection...Done in 169m 49.2s.
Recomputing daily aggregates...Done in 43m 27.9s.
Setting last price report for stations...Done in 185m 42.0s.
Data aggregation completed in 11h 40m 16.8s.
Almost 12 hours. That's quite a long time, but considering that we've imported more than half a billion documents, this is actually not too bad. The first step is impressive: We read just under 583 million price reports from the CSV files and inserted them into the import collection in under three hours. That works out to an average of more than 54,000 documents per second. From previous testing, I know that the bottleneck here is disk I/O when reading the CSV files due to "endpoint protection software" installed on the machine. The actual write command to MongoDB was able to handle around 300,000 documents per second. That's quite the impressive performance, if you ask me!
The next steps then took up the remainder of the time, although I will admit that I didn't optimize the pipeline for performance. This isn't something I'll be doing on the regular, and I'm sure that with some better index use, I would've been able to improve this. Since I simplified the station schema in the beginning and didn't explain the full pipeline for importing the stations, here's what we store in the database for a station:
{
"_id": {
"$binary": {
"base64": "DhjQ0+04Tn+hjlB6eK2QHQ==",
"subType": "04"
}
},
"name": "Oil! Tankstelle München",
"brand": "OIL!",
"address": {
"street": "Eversbuschstraße 33",
"houseNumber": "",
"postCode": "80999",
"city": "München"
},
"location": {
"type": "Point",
"coordinates": [
11.4609,
48.1807
]
},
"latestPrice": {
"diesel": {},
"e10": {},
"e5": {}
},
"latestPrices": {
"diesel": [],
"e10": [],
"e5": []
}
}
The latestPrice field contains the latest DailyPrice document for each fuel type, while the latestPrices field contains the DailyPrice documents for the last 30 days. Why, you might ask? Well, in order to show station details, we now only need to find the one station among 17,000 stations that we're interested in, and we have all the data we need to show the latest price, the price history for the day, and even a chart with the lowest, highest, and average prices for the last 30 days, all without having to query our huge DailyPrice collection. This is a great example of how MongoDB's document model allows us to optimize our data for the queries we want to run. And yes, we can still update this data pretty easily thanks to the power of MongoDB's query language, but let's save that for now.
Each DailyPrice document embedded into a station is exactly the same document we store in the DailyPrice collection. This allows us to reuse our Doctrine document classes when we build our application.
I also decided to store a DailyAggregate document for each day, which contains some aggregates for each fuel type and day. We can use this to compare the price at a particular station against averages across all stations:
{
"_id": {
"$oid": "673fd3903de33b548f44d4a9"
},
"day": {
"$date": "2024-06-29T00:00:00.000Z"
},
"fuel": "e5",
"numChanges": 22.3579504275982,
"lowestPrice": 1.678,
"highestPrice": 3,
"weightedAveragePrice": 1.8388233077257572,
"percentiles": {
"p50": 1.8306483516483516,
"p90": 1.869,
"p95": 1.883,
"p99": 2.312
}
}
Now that we have our data in MongoDB, let's take a look at the size of our collections and how much data we actually stored:
- The Station collection contains around 17,000 stations, totalling just over 1 GB of data.
- The DailyPrice collection contains around 28 million documents (two full years worth of data), with 24 GB of data and a whopping 2.5 GB of indexes.
- These 28 million bucket documents contain around 600 million individual prices, so we've successfully reduced the number of documents by a factor of 20.
- Our DailyAggregates are just over 2,000 documents, coming in at 180 KB.
In the next part, we'll start building our Symfony application to work with this data. We'll use the Doctrine MongoDB ODM to map our documents to PHP classes, and we'll use Symfony UX to build a modern frontend that allows us to interact with the data. The goal is to show how we can efficiently handle this amount of data in a Symfony application, while still providing a good user experience.
Top comments (0)