Kontiki is a Python microservices framework built on AMQP (RabbitMQ) via aio-pika and asyncio. For teams familiar with Nameko, the overall philosophy will feel familiar: message-driven services on top of RabbitMQ, with RPC and events as first-class primitives. Kontiki lives in the same “family” of frameworks, while leaning into an asyncio-native implementation, a delegate-oriented service structure, and a configuration-driven runner.
Kontiki focuses on keeping service code minimal and pushing infrastructure concerns into the framework. It is open source (Apache-2.0) and the code is available on GitHub: kontiki-org/kontiki.
1. Writing a service
Kontiki encourages keeping the service class as an entrypoint layer and moving domain logic into delegates (ServiceDelegate) with setup/start/stop lifecycle hooks.
from kontiki.delegate import ServiceDelegate
from kontiki.messaging import Messenger, on_event, rpc
class MyDelegate(ServiceDelegate):
async def setup(self):
# initialize from self.container.config
pass
async def start(self):
# optional: start background tasks / open connections
pass
async def stop(self):
# optional: stop background tasks / close connections
pass
def do_something(self, x: int) -> int:
return x * 2
class MyService:
name = "compute-api" # optional; defaults to class name
delegate = MyDelegate()
messenger = Messenger() # publish events / call other services
@rpc
async def compute(self, x: int):
return self.delegate.do_something(x)
@on_event("thing_happened")
async def on_thing(self, payload):
await self.messenger.publish("thing_processed", {"payload": payload})
Running a service
To run a service consistently across environments, expose it as a CLI entrypoint via cli.run(...) and register it as a script:
First, define a small run() entrypoint that delegates startup to Kontiki:
# e.g. myapp.main
from kontiki.runner import cli
def run():
cli.run(MyService, "Example Kontiki service.", version="1.0.0")
Then, expose that entrypoint as an executable command:
[tool.poetry.scripts]
my_service = "myapp.main:run"
Finally, run the service with one (or more) config files:
my_service --config config.yaml
2. Publishing and consuming events
Events are fire-and-forget. Consumers subscribe with @on_event("event_type"), and publishers emit events via Messenger.publish(...).
# Server-side (service)
class UserEventsService:
@on_event("user.created")
async def on_user_created(self, payload: dict):
...
# Client-side (standalone script)
async def publish_event_from_script():
async with Messenger(standalone=True) as messenger:
await messenger.publish("user.created", {"id": "u_123"})
# Client-side (from another service)
class BillingService:
messenger = Messenger()
@rpc
async def create_invoice(self, user_id: str):
await self.messenger.publish("user.created", {"id": user_id})
return "ok"
3. Calling services with RPC
Kontiki RPC is synchronous request/reply over AMQP. On the server side, handlers are exposed with @rpc (and can return rpc_error(code, message)); on the client side, calls go through a RpcProxy.
# Server-side (service)
class RpcService:
@rpc
async def rpc_example(self, feature: str):
if feature == "bad_input":
return rpc_error("USER_INPUT_ERROR", "Invalid feature value")
return "ok"
# Client-side (standalone script)
async def call_rpc_from_script():
async with Messenger(standalone=True) as messenger:
rpc_service = RpcProxy(messenger, service_name="RpcService")
return await rpc_service.rpc_example("bad_input")
# Client-side (from another service)
class ApiGatewayService:
messenger = Messenger()
@rpc
async def compute(self, feature: str):
rpc_service = RpcProxy(self.messenger, service_name="RpcService")
return await rpc_service.rpc_example(feature)
4. HTTP entrypoints
Kontiki can expose HTTP routes directly in the service using @http(...), optionally validating request bodies with Pydantic models and generating OpenAPI docs/Swagger UI.
class SimpleHttpService:
http_error_handlers = {HttpExampleError: (400, "Example error occurred")}
@http("/health", "GET", version="v1", response_model=HelloResponse)
async def health(self, request):
return HelloResponse(message="ok").model_dump()
5. Periodic tasks
Use @task(interval=..., immediate=...) to run periodic async work inside the service loop.
class TaskService:
@task(interval=10, immediate=True)
async def task_example(self):
...
6. Configuration: YAML merge + dot-based parameters
Running a service with multiple configuration files
Services can be started with multiple YAML files via repeatable --config (e.g. a shared base.yaml plus an environment-specific env.yaml).
# base.yaml
kontiki:
amqp:
url: amqp://guest:guest@localhost/
app:
cache:
ttl_seconds: 60
# env.yaml
app:
cache:
enabled: true
my_service --config base.yaml --config env.yaml
Reading parameters in delegates
Configuration values are typically loaded in setup() using dot-based paths:
from kontiki.configuration import get_parameter
from kontiki.delegate import ServiceDelegate
class MyDelegate(ServiceDelegate):
async def setup(self):
self.cache_ttl_seconds = get_parameter(
self.container.config,
"app.cache.ttl_seconds",
60,
)
7. Optional registry: heartbeats + degraded state
When the Kontiki registry service is running, services can register and expose operational state:
- heartbeats at a configurable interval
- degraded mode checks (custom logic)
- optional event/exception tracking for visibility
from pathlib import Path
class MyService:
@degraded_on
def is_degraded(self) -> bool:
required_dir = Path("/var/lib/my-service/uploads")
return not required_dir.exists()
Engineering tooling: kontiki-tui
For an operator-friendly view, the kontiki-tui project (sibling repository) provides a terminal UI to inspect/operate the Kontiki ecosystem.
Conclusion
Kontiki is a good fit for asyncio-native, message-driven microservices on RabbitMQ with first-class primitives for events and RPC—without rewriting the AMQP wiring, lifecycle, and operational plumbing for every service. This article is a quick overview: Kontiki also ships advanced options around the core features (sessions, broadcast delivery, message headers, error/retry behavior, and more). This repository ships with a solid set of runnable examples: start RabbitMQ locally and run most of them from the Makefile targets (see make run-amqp and the various run-* commands).

Top comments (0)