DEV Community

Cover image for Social Learning Journal - Persistence
Justin L Beall
Justin L Beall

Posted on

Social Learning Journal - Persistence

Thus far in this series, we have loaded the data from a static data source. It's only a few lines of code, but this limits our scaling potential.

with open(DATA_SEED_TWITTER_PATH) as data_seed:
    data = json.load(data_seed)
Enter fullscreen mode Exit fullscreen mode

For storing source events, MongoDB is my preferred storage engine. If you have never used a NoSQL database, it's similar to a traditional SQL database in many aspects, yet instead of relational links to data elements, everything is stored inside of one document. When storing data from a source event, I find it useful to save the original data in its rawest form attached to the record. This allows reprocessing of the data, without having to retrieve it again from the source, in the future when new insights arrive.

Outcomes

Store the learning journal data in a persisted form in a MongoDB. This will involve some slight modeling and setup of the MongoDB locally. We will set up a simple version of a flyway migration pattern to make sure our app server starts with the desired data structures and can evolve as necessary with time.

Database Setup

Our MongoDB will be hosted in Docker. For those not familiar with Docker, it is a container server that allows isolated instances of virtual machines. Follow the directions on the Docker website to get it setup on your machine. Once done, I have created a docker-compose.yml file that will create an instance of the database that will be used. This file is pretty basic right now, but docker compose can be used to set up complicated production-like environments that run a developer's machine (or CI/CD server) in a single command.

~ docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

MongoDB Compass is a free database browser tool that we can use to verify our database is up and running.

Alt Text

Alt Text

Data Model

Now that we have a database, let's create a model to store our data. Eventually, we will evolve this model to have more pertinent information, but for right now we need to have the event source name and the data from the source. In addition, a way to marshal it into JSON format. This would be the first step in an ETL (Extract Transform Load) pipeline, although I prefer an EL-TL (Extract Load Transform Load) pattern - this is where we separate the extraction from the transformation process.

from typing import Dict


class EventDataModel:
    def __init__(self, source: str, data: Dict):
        self.source = source
        self.data = data

    @property
    def to_json(self) -> Dict:
        return vars(self)
Enter fullscreen mode Exit fullscreen mode

DAO (Data Access Object)

Now that we have a shape for our data, we must create the framework that persists it into MongoDB. It will be simple to start, we are just trying to insert records. We start out with a test that will verify our results. This will be our first integration test. It will connect to the database, insert a record, and assert that a MongoDB id was returned.

from src.dao.event_data_dao import EventDataDao

from src.models.event_data_model import EventDataModel

source = "twitter"
data = {
    "key": "value"
}

default_weapon_event_dict = {
    'source': source,
    'data': data
}


def test_event_data_dao_inserts_record(mongo_db):
    event_data_model = _build_event_data_model()
    event_data_dao = EventDataDao(mongo_db)

    event_data_id = event_data_dao.insert(event_data_model)

    assert event_data_id


def _build_event_data_model():
    return EventDataModel(source, data)
Enter fullscreen mode Exit fullscreen mode

Then, in the source we implement it as follows:

from datetime import datetime

from pymongo.database import Database

from src.models.event_data_model import EventDataModel


class EventDataDao:
    def __init__(self, db: Database):
        self.event_data_collection = db['event_data']

    def insert(self, event_data_model: EventDataModel) -> str:
        values = {**event_data_model.to_json, 'updated': datetime.utcnow()}
        insert_result = self.event_data_collection.insert_one(values)
        return str(insert_result.inserted_id)
Enter fullscreen mode Exit fullscreen mode

Outside the data itself, I also added an updated value which is the date-time that the record was inserted.

Migration

We have the pieces in place to create our first migration script. Before calling the run function for the app server, we will insert the migration the execution of the migration script.

Version Collection

In order to make sure we don't run the same migration multiple times, we are going to set up a simple version collection to track what migrations have already run. Just like our event data, we will need a model and a DAO.

from typing import Dict


class VersionModel:
    def __init__(self, version: str = None):
        self.version = version

    @property
    def to_json(self) -> Dict:
        return vars(self)

    @staticmethod
    def load_json(json_data: Dict):
        version_model = VersionModel()
        version_model.__dict__.update(**json_data)

        return version_model
Enter fullscreen mode Exit fullscreen mode

The DAO lays a bit more foundation as we need a way to query for a record and delete a record (for testing).

from datetime import datetime

from pymongo.database import Database

from src.models.version_model import VersionModel


class VersionDao:
    def __init__(self, db: Database):
        self.version_collection = db['migration_version']

    def insert(self, version_model: VersionModel) -> str:
        values = {**version_model.to_json, 'updated': datetime.utcnow()}
        insert_result = self.version_collection.insert_one(values)
        return str(insert_result.inserted_id)

    def find_one(self, version: str) -> VersionModel:
        query = {'version': version}
        result = self.version_collection.find_one(query)
        if result is None:
            return None

        version_model = VersionModel.from_json(result)
        return version_model

    def delete_one(self, version: str) -> int:
        query = {'version': version}
        return self.version_collection.delete_one(query).deleted_count
Enter fullscreen mode Exit fullscreen mode

Check out the project source for the full implementation and testing code. Running the pytest command, we are returned with all green confirming that we can insert, query, and delete records from the version collection.

Migration Command

I added the following to the main function in the app.py startup file, which initializes the web server.

if __name__ == '__main__':
    V001LoadData().run()

    manager.run()
Enter fullscreen mode Exit fullscreen mode

The V001LoadData class checks for if the current version exists, and exits if it does. Otherwise, it loads the initial Twitter data set and persists it to the database.

import json
import os

from src.dao.event_data_dao import EventDataDao
from src.dao.mongodb import MongoDb
from src.dao.version_dao import VersionDao
from src.models.event_data_model import EventDataModel
from src.models.version_model import VersionModel

DATA_SEED_TWITTER_PATH = os.environ.get('DATA_SEED_TWITTER_PATH', './data/tweet.json')


class V001LoadData:
    version = "V001_Load_Data"

    def run(self):
        mongo_db = MongoDb.instance()
        version_dao = VersionDao(mongo_db)

        if version_dao.find_one(self.version):
            return

        version_model = VersionModel(self.version)
        version_dao.insert(version_model)

        with open(DATA_SEED_TWITTER_PATH) as data_seed:
            data = json.load(data_seed)

        event_data_dao = EventDataDao(mongo_db)

        for event_data in data:
            event_data_model = EventDataModel("twitter", event_data)
            event_data_dao.insert(event_data_model)
Enter fullscreen mode Exit fullscreen mode

After starting the app server, it can be confirmed that the initial data set of 4556 records is now available in MongoDB.

Alt Text

Conclusion

This was a simple way of implementing DAOs and migration scripts on an app startup. It lays the foundation for moving away from one-off scripts to actually populating data that can be used by the API to serve data to be used in front-end display.

Top comments (0)