Thus far in this series, we have loaded the data from a static data source. It's only a few lines of code, but this limits our scaling potential.
with open(DATA_SEED_TWITTER_PATH) as data_seed:
data = json.load(data_seed)
For storing source events, MongoDB is my preferred storage engine. If you have never used a NoSQL database, it's similar to a traditional SQL database in many aspects, yet instead of relational links to data elements, everything is stored inside of one document. When storing data from a source event, I find it useful to save the original data in its rawest form attached to the record. This allows reprocessing of the data, without having to retrieve it again from the source, in the future when new insights arrive.
Outcomes
Store the learning journal data in a persisted form in a MongoDB. This will involve some slight modeling and setup of the MongoDB locally. We will set up a simple version of a flyway migration pattern to make sure our app server starts with the desired data structures and can evolve as necessary with time.
Database Setup
Our MongoDB will be hosted in Docker. For those not familiar with Docker, it is a container server that allows isolated instances of virtual machines. Follow the directions on the Docker website to get it setup on your machine. Once done, I have created a docker-compose.yml
file that will create an instance of the database that will be used. This file is pretty basic right now, but docker compose can be used to set up complicated production-like environments that run a developer's machine (or CI/CD server) in a single command.
~ docker-compose up -d
MongoDB Compass is a free database browser tool that we can use to verify our database is up and running.
Data Model
Now that we have a database, let's create a model to store our data. Eventually, we will evolve this model to have more pertinent information, but for right now we need to have the event source name and the data from the source. In addition, a way to marshal it into JSON format. This would be the first step in an ETL (Extract Transform Load) pipeline, although I prefer an EL-TL (Extract Load Transform Load) pattern - this is where we separate the extraction from the transformation process.
from typing import Dict
class EventDataModel:
def __init__(self, source: str, data: Dict):
self.source = source
self.data = data
@property
def to_json(self) -> Dict:
return vars(self)
DAO (Data Access Object)
Now that we have a shape for our data, we must create the framework that persists it into MongoDB. It will be simple to start, we are just trying to insert records. We start out with a test that will verify our results. This will be our first integration test. It will connect to the database, insert a record, and assert that a MongoDB id was returned.
from src.dao.event_data_dao import EventDataDao
from src.models.event_data_model import EventDataModel
source = "twitter"
data = {
"key": "value"
}
default_weapon_event_dict = {
'source': source,
'data': data
}
def test_event_data_dao_inserts_record(mongo_db):
event_data_model = _build_event_data_model()
event_data_dao = EventDataDao(mongo_db)
event_data_id = event_data_dao.insert(event_data_model)
assert event_data_id
def _build_event_data_model():
return EventDataModel(source, data)
Then, in the source we implement it as follows:
from datetime import datetime
from pymongo.database import Database
from src.models.event_data_model import EventDataModel
class EventDataDao:
def __init__(self, db: Database):
self.event_data_collection = db['event_data']
def insert(self, event_data_model: EventDataModel) -> str:
values = {**event_data_model.to_json, 'updated': datetime.utcnow()}
insert_result = self.event_data_collection.insert_one(values)
return str(insert_result.inserted_id)
Outside the data itself, I also added an updated
value which is the date-time that the record was inserted.
Migration
We have the pieces in place to create our first migration script. Before calling the run
function for the app server, we will insert the migration the execution of the migration script.
Version Collection
In order to make sure we don't run the same migration multiple times, we are going to set up a simple version collection to track what migrations have already run. Just like our event data, we will need a model and a DAO.
from typing import Dict
class VersionModel:
def __init__(self, version: str = None):
self.version = version
@property
def to_json(self) -> Dict:
return vars(self)
@staticmethod
def load_json(json_data: Dict):
version_model = VersionModel()
version_model.__dict__.update(**json_data)
return version_model
The DAO lays a bit more foundation as we need a way to query for a record and delete a record (for testing).
from datetime import datetime
from pymongo.database import Database
from src.models.version_model import VersionModel
class VersionDao:
def __init__(self, db: Database):
self.version_collection = db['migration_version']
def insert(self, version_model: VersionModel) -> str:
values = {**version_model.to_json, 'updated': datetime.utcnow()}
insert_result = self.version_collection.insert_one(values)
return str(insert_result.inserted_id)
def find_one(self, version: str) -> VersionModel:
query = {'version': version}
result = self.version_collection.find_one(query)
if result is None:
return None
version_model = VersionModel.from_json(result)
return version_model
def delete_one(self, version: str) -> int:
query = {'version': version}
return self.version_collection.delete_one(query).deleted_count
Check out the project source for the full implementation and testing code. Running the pytest
command, we are returned with all green confirming that we can insert, query, and delete records from the version collection.
Migration Command
I added the following to the main function in the app.py
startup file, which initializes the web server.
if __name__ == '__main__':
V001LoadData().run()
manager.run()
The V001LoadData class checks for if the current version exists, and exits if it does. Otherwise, it loads the initial Twitter data set and persists it to the database.
import json
import os
from src.dao.event_data_dao import EventDataDao
from src.dao.mongodb import MongoDb
from src.dao.version_dao import VersionDao
from src.models.event_data_model import EventDataModel
from src.models.version_model import VersionModel
DATA_SEED_TWITTER_PATH = os.environ.get('DATA_SEED_TWITTER_PATH', './data/tweet.json')
class V001LoadData:
version = "V001_Load_Data"
def run(self):
mongo_db = MongoDb.instance()
version_dao = VersionDao(mongo_db)
if version_dao.find_one(self.version):
return
version_model = VersionModel(self.version)
version_dao.insert(version_model)
with open(DATA_SEED_TWITTER_PATH) as data_seed:
data = json.load(data_seed)
event_data_dao = EventDataDao(mongo_db)
for event_data in data:
event_data_model = EventDataModel("twitter", event_data)
event_data_dao.insert(event_data_model)
After starting the app server, it can be confirmed that the initial data set of 4556 records is now available in MongoDB.
Conclusion
This was a simple way of implementing DAOs and migration scripts on an app startup. It lays the foundation for moving away from one-off scripts to actually populating data that can be used by the API to serve data to be used in front-end display.
Top comments (0)