In this second part of the tutorial we will create the first microservice of our project. Being the first contact with a Minos microservice, we will delve into the structure of the project and its main features.
Create de Product Microservice
from the project folder we have to call the following Minos CLI commands:
$:(venv)>cd microservices
$:(venv)>minos new microservice product
The cli will ask us some questions:
- The name of the microservice ( product )
- What kind of language do we want to use ( python ): for the moment Minos only supports Python, but the future has some great surprises in store ;-)
- The microservice version ( 0.1.0 )
- The name of the RootEntity (product ). For the moment leave the configuration by default, later we will explain what a RootEntity is and what it is for.
- The package manager ( poetry ). For the moment we support only Poetry as package manager.
- The deploy that we want to use ( docker-compose ). In our previous post we set Kubernetes as our deployment service, so we maintain the same deployment for the microservices.
- A series of questions about the general information of the project, such as the description, the name of the developer, etc.
The Microservice Folder structure
- The src contain all the python project files.
- The tests all the unit tests files.
- In the root folder we have all the configuration files and the Poetry files.
- In the root folder we have the Dockefile as well.
Let’s take a look at the config.yml file.
service:
name: product
aggregate: src.aggregates.Product
injections:
lock_pool: minos.common.PostgreSqlLockPool
postgresql_pool: minos.common.PostgreSqlPool
broker_publisher: minos.plugins.kafka.InMemoryQueuedKafkaBrokerPublisher
broker_subscriber_builder: minos.plugins.kafka.InMemoryQueuedKafkaBrokerSubscriberBuilder
broker_pool: minos.networks.BrokerClientPool
transaction_repository: minos.aggregate.PostgreSqlTransactionRepository
event_repository: minos.aggregate.PostgreSqlEventRepository
snapshot_repository: minos.aggregate.PostgreSqlSnapshotRepository
saga_manager: minos.saga.SagaManager
discovery: minos.networks.DiscoveryConnector
product_repository: src.ProductQueryRepository
services:
- minos.networks.BrokerHandlerService
- minos.networks.RestService
- minos.networks.PeriodicTaskSchedulerService
middleware:
- minos.saga.transactional_command
services:
- minos.aggregate.TransactionService
- minos.aggregate.SnapshotService
- minos.saga.SagaService
- src.queries.ProductQueryService
- src.commands.ProductCommandService
rest:
host: 0.0.0.0
port: 8080
broker:
host: localhost
port: 9092
queue:
database: product_db
user: minos
password: min0s
host: localhost
port: 5432
records: 1000
retry: 2
repository:
database: product_db
user: minos
password: min0s
host: localhost
port: 5432
snapshot:
database: product_db
user: minos
password: min0s
host: localhost
port: 5432
saga:
storage:
path: ./product.lmdb
discovery:
client: minos.plugins.minos_discovery.MinosDiscoveryClient
host: localhost
port: 5567
This file contains all the configurations of the microservice.
From here you can:
- Modify the configurations of the database
- Select the broker or the section injections
- Use the plugins provided by Minos
- Usa a custom plugin, for example RabbitMQ.
In this tutorial we will not go into detail on how to reconfigure the microservice, we will use the default data.
Well, now let’s get into the code in python (finally !!!).
The src folder contains:
- the commands module.
- the queries module.
- the aggregates file.
Let’s see what each of them does.
Aggregates
An Aggregate defines a self-contained grouping of entities, which is treated as a single, atomic unit. A change to any of the entities is considered to be a change to the entire Aggregate.
Ok, let’s try to clarify the concept a little better. In our microservice we know that we will deal with the product catalog, so our main Entity or RootEntity is Product.
A product will have the following attributes:
- id: uuid string
- name: string
- description: text
- picture: string
- price Entity
- id: string
- currencyCode: string
- units: int
- categories: list
Notice that Price is an Entity, and Categories is a list or Value Object, we will delve into that in a moment.
Before, see how we reflect this model in the aggregates.py:
# ....
class Price(Entity):
currency: str
units: int
class Category(ValueObject):
name: str
class Product(RootEntity):
"""Productcatalog RootEntity class."""
name: str
description: str
picture: str
price: Price
categories: ValueObjectSet[Category]
#....
From the code it is clear that
- An Entity is very similar to the concept of Model that we have encountered in other frameworks such as Django, for example.
- The name, description and picture attributes use the native Python typing and are therefore strings (str).
- The price, on the other hand, is a reference to an Entity class called Price. An Entity class is very similar to a RootEntity and the reference is very similar to the ForeignKey of a classic ORM, so Product has an OneToOne relationship with Price.
- Categories, on the other hand, has a different concept, at first sight it would seem a “normal” OneToMany relationship (and it is), but this relationship does not take place through an id, instead the given list of categories is loaded entirely inside the object Product.
Now we have our set of Entities ready, it is time to create a set of methods to manage them and this is where the aggregate comes into play.
We will create three basic CRUD methods: add, get and delete.
from uuid import (
UUID,
)
from minos.aggregate import (
Aggregate,
RootEntity, Entity, ValueObject, ValueObjectSet
)
class Price(Entity):
currency: str
units: int
class Category(ValueObject):
name: str
class Product(RootEntity):
"""Product RootEntity class."""
name: str
description: str
picture: str
price: Price
categories: ValueObjectSet[Category]
class ProductAggregate(Aggregate[Product]):
"""ProductAggregate class."""
@staticmethod
async def getProduct(uid) -> Product:
product = await Product.get(uid)
return product
@staticmethod
async def createProduct(data: {}) -> UUID:
"""Create a new instance."""
price = Price(**data['price'])
data['price'] = price
if 'categories' in data:
cat_list = []
for category in data['categories']:
category_object = Category(**category)
cat_list.append(category_object)
data['categories'] = set(cat_list)
root = await Product.create(**data)
return root.uuid
@staticmethod
async def deleteProduct(uid) -> UUID:
"""Create a new instance."""
product = await Product.get(uid)
product.delete()
return product.uuid
These operations will change the state of the Event Storer.
What can we do with the Aggregate class? The aggregate mainly works with the Event Storer and the Snapshot (in other articles we will describe better what are those two patterns and how Minos implement them), so the Aggregate class is extensively used in the Command Service.
Command Service
CQRS separates reads and writes into different models, using commands to update data, and queries to read data.
In Minos, the Command Service is in charge of modifying the Event Storer and exposing the commands for that purpose.
In our Command Service we want to define three main commands:
- create_product
- get_product_by_id
- delete_product
from minos.cqrs import (
CommandService,
)
from minos.networks import (
Request,
Response,
ResponseException,
enroute,
)
from ..aggregates import (
ProductAggregate,
)
class ProductCommandService(CommandService):
"""ProductCommandService class."""
@enroute.broker.command("GetProductById")
async def get_product_by_id(self, request: Request) -> Response:
"""Create a new ``Product`` instance.
:param request: The ``Request`` instance.
:return: A ``Response`` instance.
"""
try:
content = request.content() # get the request payload
product = await ProductAggregate.getProduct(content['uid'])
return Response(product)
except Exception as exc:
raise ResponseException(f"An error occurred during the Query process: {exc}")
@enroute.rest.command("/product", "POST")
@enroute.broker.command("CreateProduct")
async def create_product(self, request: Request) -> Response:
"""Create a new ``Product`` instance.
:param request: The ``Request`` instance.
:return: A ``Response`` instance.
"""
try:
content = await request.content() # get the request payload
uuid = await ProductAggregate.createProduct(content)
return Response({"uuid": uuid})
except Exception as exc:
raise ResponseException(f"An error occurred during the Product creation: {exc}")
@enroute.rest.command("/product/{uuid}", "DELETE")
@enroute.broker.command("DeleteProduct")
async def delete_product(self, request: Request) -> Response:
"""Delete a ``Product`` .
:param request: The ``Request`` instance.
:return: A ``Response`` instance.
"""
try:
params = request.params() # get the url params [uuid]
uuid = params['uuid']
uuid = await ProductAggregate.deleteProduct(uuid)
return Response({"uuid": uuid})
except Exception as exc:
raise ResponseException(f"An error occurred during Product delete: {exc}")
Let’s take some time to analyze the Command Service.
As you can see the methods have been decorated with two different functions:
@enroute.rest.command(“…”, “POST”)
The Command Service receive the enrouting from the REST interface for all the following HTTP commands:
- POST
- PUT
- DELETE
then, the “@enroute.rest.command()” decorator, instructs the REST Interface to enroute all the calls to a specific method.
@enroute.broker.command(….)
This decorator, as its name implies, routes the broker’s calls, hence the events coming from the broker. Although it may seem confusing to talk about events within the Command Service, it is necessary to clarify that the Commands are Events with a different payload.
There remains only one method in our Command Service that has a different functionality than the others.
async def get_product_by_id(…)
So far we have said that the Command Service takes care of doing the Write operations on the Event Storer, so why put a retrieve method?
The answer is simple, because the Command Service has the duty to carry out the writes but, at the same time, it is the communication interface between the microservices themselves.
Whenever a microservice needs to add data or request data from another microservice, it has to go through the Command Service Interface. Moreover, the queries, passing through the Event Storer, will always maintain data consistency.
Query Service
The query Service is the service that is responsible for returning the data. The Query Service, unlike the Command Service, does not read from the Event Storer, hindsight from a database that the programmer has chosen.
To do this we will use two features offered by Minos
- MinosSetup
- Dependency injection
The Minos CLI will provide a default configuration of the Query Service with SQLAlchemy and postgreSQL. The postgreSQL host would be the same used for the Event Storer.
Repository.py
If you need to configure something during the startup process of the microservice, extend MinosSetup class would be the solution.
Open the repository.py file
from minos.common import MinosSetup, MinosConfig
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from .models import Base, Product, Price, Categories
class ProductQueryRepository(MinosSetup):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.engine = create_engine("postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}".format(**kwargs))
Session = sessionmaker(bind=self.engine)
self.session = Session()
async def _setup(self) -> None:
Base.metadata.drop_all(self.engine)
Base.metadata.create_all(self.engine)
@classmethod
def _from_config(cls, *args, config: MinosConfig, **kwargs):
return cls(*args, **(config.repository._asdict() | {"database": "product_query_db"}) | kwargs)
In the repository.py file you already have all the injections configured and the information coming from the MinosConfig instance.
Models.py
The models.py provide the declarative base class from sqlalchemy.
from sqlalchemy.orm import declarative_base
Base = declarative_base()
From that point, we can add all the sqlalchemy models we need. For our product microservice we replicate the aggregate structure
from sqlalchemy import Column, String, Text, Float, Integer, ForeignKey
from sqlalchemy.orm import declarative_base, relationship, backref
Base = declarative_base()
class Price(Base):
__tablename__ = 'price'
id = Column(Integer, primary_key=True)
uuid = Column(String(30))
currency = Column(String)
units = Column(Float)
class Categories(Base):
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String)
product_id = Column(Integer, ForeignKey('product.id'))
product = relationship("Product", backref=backref("categories"))
class Product(Base):
__tablename__ = 'product'
id = Column(Integer, primary_key=True)
uuid = Column(String(30))
name = Column(String(80))
description = Column(Text, nullable=True)
picture = Column(String(120), nullable=True)
price_id = Column(Integer, ForeignKey('price.id'))
price = relationship("Price", backref=backref("product", uselist=False))
Dependency injection
Once the query service database has been configured, we can start configuring our QueryService.
from dependency_injector.wiring import Provide
from minos.aggregate import (
Event,
)
from minos.cqrs import (
QueryService,
)
from minos.networks import (
Request,
Response,
enroute,
)
from src.queries.repository import ProductQueryRepository
class ProductQueryService(QueryService):
"""ProductQueryService class."""
repository: ProductQueryRepository = Provide["product_repository"]
@enroute.rest.query("/products", "GET")
async def get_products(self, request: Request) -> Response:
"""Get a Product instance.
:param request: A request instance..
:return: A response exception.
"""
return Response(self.repository.get_all())
@enroute.rest.query("/product/{uuid}", "GET")
async def get_product(self, request: Request) -> Response:
"""Get a Product instance.
:param request: A request instance..
:return: A response exception.
"""
params = request.params()
product = self.repository.get(params['uuid'])
return Response(product)
@enroute.broker.event("ProductCreated")
async def product_created(self, request: Request) -> None:
"""Handle the Product creation events.
:param request: A request instance containing the aggregate difference.
:return: This method does not return anything.
"""
event: Event = await request.content()
self.repository.add(event)
@enroute.broker.event("ProductUpdated")
async def product_updated(self, request: Request) -> None:
"""Handle the Product update events.
:param request: A request instance containing the aggregate difference.
:return: This method does not return anything.
"""
event: Event = await request.content()
print(event)
@enroute.broker.event("ProductDeleted")
async def product_deleted(self, request: Request) -> None:
"""Handle the Product deletion events.
:param request: A request instance containing the aggregate difference.
:return: This method does not return anything.
"""
event: Event = await request.content()
print(event)
In our QueryService, we integrate HTTP GET commands and the events coming from our microservice and, if needed, events from others microservices.
The treatment of the events in the QueryService is a very important milestone. The events are generated by the Aggregates during the CUD processes, so the QueryService can get all that information and from that point create the structure of the data.
In the 3 part of this tutorial we will deploy the product microservice and all the services required and we test it.
If you found this article interesting, then remember to leave a star in the Minos repository
Top comments (0)