In this chapter, we will learn about Nameko and it's capabilities as a microservice framework.
What is Nameko?
Nameko is a framework for building lightweight, highly scalable and fault-tolerant services in Python following a micro-service architecture design.
It comes with built-in support for:
- RPC over AMQP
- Asynchronous events (pub-sub) over AMQP
Why Nameko?
Nameko enables you to build a service that can respond to RPC messages, dispatch events on certain actions, and listen to events from other services. It could also have HTTP interfaces for clients that can’t speak AMQP.
Let's create a basic Nameko service and experiment with it capabilities.
Setup Basic Environment
First, you will need Docker installed. We will use Python 3, so make sure you have it installed as well.
To run Nameko, we need the RabbitMQ. It will be responsible for the communication between our Nameko services.
Install
$ pip install nameko
Start A RabbitMQ Container
$ docker run -p 5672:5672 --hostname nameko-rabbitmq rabbitmq:3
Hello World!!
A Nameko service is just a Python class. The class encapsulates the logic in its methods and declares any dependencies as attributes.
Go ahead and create a file called Service.py
with the following content:
from nameko.rpc import rpc
class Service:
name = "service"
@rpc
def receive_event(self, event):
return f"Event Received: {event}"
Let’s run our example. If you got RabbitMQ running, simply run:
$ nameko run Service
Nameko implements automatic service discovery, meaning when calling an RPC method over AMQP, Nameko will try to find the corresponding RabbitMQ service on its own.
To test our service, run the below command in another terminal:
>>> n.rpc.service.receive_event(event={'message': 'Hello World!!'})
When an RPC entrypoint is invoked, a Nameko worker is created. A worker is just a stateless instance of the service class, which makes it inherently thread-safe. The maximum number of workers by default for a service is set to 10.
Read more about Nameko Workers here.
If maximum number of workers is set to 1, then only 1 Nameko worker will execute at a time i.e. it will behave as a regular queue.
Communicate Between 2 Nameko Services
In order to communicate from one Nameko service to another and vice-versa, Nameko provides an RpcProxy
construct. Here's how you use it:
from nameko.rpc import rpc, RpcProxy
class SenderService:
name = "sender_service"
receiver_service_proxy = RpcProxy("receiver_service")
@rpc
def send_event(self, event):
return self.receiver_service_proxy.receive_event({'message': 'Hello World!!'})
class ReceiverService:
name = "receiver_service"
@rpc
def receive_event(self, event):
return f"Event Received: {event}"
Communicate Between A Nameko & A Non-Nameko Service
There will be scenarios where we need to call a Nameko service from something that isn’t a Nameko service like an API service or a cron job. Here's how you do it:
from nameko.standalone.rpc import ClusterRpcProxy
AMQP_URI = "pyamqp://user:paswword@hostname"
config = {
'AMQP_URI': AMQP_URI
}
with ClusterRpcProxy(config) as cluster_rpc:
cluster_rpc.service.receive_event({'message': 'Hello World!!'})
Concurrency
Nameko is built on top of the eventlet library, which provides concurrency via “greenthreads”.
Greenthreads unlike OS threads, cooperatively yield to each other instead of preemptively being scheduled by the OS. This behaviour proves to be advantageous when a service is I/O heavy.
One greenthread yields control only when it is busy doing I/O, giving another greenthread a chance to execute thereby allowing the service to use shared data structures without the need of using locks and other synchronisation mechanisms.
Let's experiment with Nameko concurrency in practice by modifying the above code:
from time import sleep
from nameko.rpc import rpc
class Service:
name = "service"
@rpc
def receive_event(self, event):
sleep(5)
return f"Event Received: {event}"
We are using sleep
from the time module, which is a blocking call. However, when running our services using nameko run
, nameko will automatically monkey patch blocking calls to non-blocking calls such as sleep(5) i.e. making it async.
The response time of a single RPC call to our service will be 5 seconds. Now, if we make 10 calls in one go to the same RPC, how long will it take to get the response of all 10 calls?
Let's run the following code in a nameko shell:
def time_concurrent_invocations():
start_time = time.perf_counter()
responses = []
num_concurrent_calls = 10
for i in range(num_concurrent_calls):
response = n.rpc.service.receive_event({'message': f'Worker {i+1}'})
responses.append(response)
for response in responses:
print(response.result)
end_time = time.perf_counter()
print(f'Total Time: {round(end_time-start_time, 3)}')
time_concurrent_invocations()
This example runs in just around five seconds. Each worker will be blocked waiting for the sleep call to finish, but this does not stop another worker to start, implicit yielding in action.
If you change num_concurrent_calls = 20
in the above snippet, the execution will finish in 10 seconds.
Async Pub-Sub
Let's suppose, we now have to do an asynchronous task like sending a notification or uploading a file on cloud.
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc
class MessageService:
name = "message_service"
dispatch = EventDispatcher()
def time_consuming_function(self, payload):
self.dispatch("heavy_payload_event", payload)
@rpc
def receive_message(self, event):
if event['payload']:
self.time_consuming_function(event['payload'])
print(f'Message Received: {event['message']}')
class TimeConsumingService:
name = "time_consuming_service"
@event_handler("message_service", "heavy_payload_event")
def time_consuming_event_handler(self, payload):
pass
When receive_message
processes an event with a payload, it calls time_consuming_function
that utilises the EventDispatcher
to process the payload in an asynchronous manner by invoking the time_consuming_event_handler
in a separate greenthread. The caller thread here, does not wait for the event handler to return a response, thereby allowing the caller thread to finish it's execution faster and accept further requests.
Scalable
We have been using only one server and running one instance of RabbitMQ. In a production environment, you will want to arbitrarily increase the number of nodes running the service that is getting too many calls.
To simulate service scaling, let's revisit our service from the concurrency section. Open another terminal and run the service as before, using $ nameko run Service
. This will start another service instance with the potential to run ten more workers. Now, try running that snippet again with num_concurrent_calls = 20
. It should now take five seconds again to run. When there are more than one service instances running, Nameko will round-robin the RPC requests among the available instances.
In fact you can configure these services in such a way that they can run on completely different machines and scale them independently. All you need to do is point these services at the same RabbitMQ broker.
Create a config file with broker URI:
# config.yaml
AMQP_URI: amqp://<rabbitmq-ip>:5672/
Run these services on different machines using:
$ nameko run <nameko_service> --config config.yaml
Fault Tolerant
Nameko is highly roboust and fault tolerant so, it continues operating properly in the event of failure of one or more nodes in the service cluster till at least one healthy node remains functioning.
Try running 3 instances of the Service and execute the test snippet with num_concurrent_calls = 50
. As soon as you execute the test snippet, kill one or 2 instances of the Service
.The missed messages will be re-routed to healthy node(s), thus avoiding message loss.
This behaviour is due the fact that messages are ack’d
after worker execution completes successfully, and if the connection is lost after delivery but before acknowledgement, RabbitMQ will reclaim and redeliver the message.
What Happens If The RabbitMQ Server Dies And There Are Messages Left In The Queue?
Nameko sets delivery_mode=PERSISTENT
by default for the queues it creates for RPC over AMQP. This tells RabbitMQ to save the messages to disk. However, there is a short time window when RabbitMQ has accepted a message but, hasn't saved it yet meaning, the persistence guarantees are not strong. To solve this, Nameko uses publisher confirms by default. Confirms have a performance penalty but guarantee that messages aren't lost.
Conclusion
Nameko is designed to help you build systems using micro-services and scale from a single instance of a single service, to a cluster with many instances of many different services.
To learn more about Nameko checkout Nameko Documentation and join the Nameko Discourse.
Top comments (0)