DEV Community

Cover image for Kontiki, an async microservices framework in Python
Jpkwbr
Jpkwbr

Posted on

Kontiki, an async microservices framework in Python

Kontiki is a Python microservices framework built on AMQP (RabbitMQ) via aio-pika and asyncio. For teams familiar with Nameko, the overall philosophy will feel familiar: message-driven services on top of RabbitMQ, with RPC and events as first-class primitives. Kontiki lives in the same “family” of frameworks, while leaning into an asyncio-native implementation, a delegate-oriented service structure, and a configuration-driven runner.

Kontiki focuses on keeping service code minimal and pushing infrastructure concerns into the framework. It is open source (Apache-2.0) and the code is available on GitHub: kontiki-org/kontiki.


1. Writing a service

Kontiki encourages keeping the service class as an entrypoint layer and moving domain logic into delegates (ServiceDelegate) with setup/start/stop lifecycle hooks.

from kontiki.delegate import ServiceDelegate
from kontiki.messaging import Messenger, on_event, rpc


class MyDelegate(ServiceDelegate):
    async def setup(self):
        # initialize from self.container.config
        pass

    async def start(self):
        # optional: start background tasks / open connections
        pass

    async def stop(self):
        # optional: stop background tasks / close connections
        pass

    def do_something(self, x: int) -> int:
        return x * 2


class MyService:
    name = "compute-api"  # optional; defaults to class name
    delegate = MyDelegate()
    messenger = Messenger()  # publish events / call other services

    @rpc
    async def compute(self, x: int):
        return self.delegate.do_something(x)

    @on_event("thing_happened")
    async def on_thing(self, payload):
        await self.messenger.publish("thing_processed", {"payload": payload})
Enter fullscreen mode Exit fullscreen mode

Running a service

To run a service consistently across environments, expose it as a CLI entrypoint via cli.run(...) and register it as a script:

First, define a small run() entrypoint that delegates startup to Kontiki:

# e.g. myapp.main
from kontiki.runner import cli

def run():
    cli.run(MyService, "Example Kontiki service.", version="1.0.0")
Enter fullscreen mode Exit fullscreen mode

Then, expose that entrypoint as an executable command:

[tool.poetry.scripts]
my_service = "myapp.main:run"
Enter fullscreen mode Exit fullscreen mode

Finally, run the service with one (or more) config files:

my_service --config config.yaml
Enter fullscreen mode Exit fullscreen mode

2. Publishing and consuming events

Events are fire-and-forget. Consumers subscribe with @on_event("event_type"), and publishers emit events via Messenger.publish(...).

# Server-side (service)
class UserEventsService:
    @on_event("user.created")
    async def on_user_created(self, payload: dict):
        ...


# Client-side (standalone script)
async def publish_event_from_script():
    async with Messenger(standalone=True) as messenger:
        await messenger.publish("user.created", {"id": "u_123"})


# Client-side (from another service)
class BillingService:
    messenger = Messenger()

    @rpc
    async def create_invoice(self, user_id: str):
        await self.messenger.publish("user.created", {"id": user_id})
        return "ok"
Enter fullscreen mode Exit fullscreen mode

3. Calling services with RPC

Kontiki RPC is synchronous request/reply over AMQP. On the server side, handlers are exposed with @rpc (and can return rpc_error(code, message)); on the client side, calls go through a RpcProxy.

# Server-side (service)
class RpcService:

    @rpc
    async def rpc_example(self, feature: str):
        if feature == "bad_input":
            return rpc_error("USER_INPUT_ERROR", "Invalid feature value")
        return "ok"


# Client-side (standalone script)
async def call_rpc_from_script():
    async with Messenger(standalone=True) as messenger:
        rpc_service = RpcProxy(messenger, service_name="RpcService")
        return await rpc_service.rpc_example("bad_input")


# Client-side (from another service)
class ApiGatewayService:
    messenger = Messenger()

    @rpc
    async def compute(self, feature: str):
        rpc_service = RpcProxy(self.messenger, service_name="RpcService")
        return await rpc_service.rpc_example(feature)
Enter fullscreen mode Exit fullscreen mode

4. HTTP entrypoints

Kontiki can expose HTTP routes directly in the service using @http(...), optionally validating request bodies with Pydantic models and generating OpenAPI docs/Swagger UI.

class SimpleHttpService:
    http_error_handlers = {HttpExampleError: (400, "Example error occurred")}

    @http("/health", "GET", version="v1", response_model=HelloResponse)
    async def health(self, request):
        return HelloResponse(message="ok").model_dump()
Enter fullscreen mode Exit fullscreen mode

5. Periodic tasks

Use @task(interval=..., immediate=...) to run periodic async work inside the service loop.

class TaskService:

    @task(interval=10, immediate=True)
    async def task_example(self):
        ...
Enter fullscreen mode Exit fullscreen mode

6. Configuration: YAML merge + dot-based parameters

Running a service with multiple configuration files

Services can be started with multiple YAML files via repeatable --config (e.g. a shared base.yaml plus an environment-specific env.yaml).

# base.yaml
kontiki:
  amqp:
    url: amqp://guest:guest@localhost/

app:
  cache:
    ttl_seconds: 60
Enter fullscreen mode Exit fullscreen mode
# env.yaml
app:
  cache:
    enabled: true
Enter fullscreen mode Exit fullscreen mode
my_service --config base.yaml --config env.yaml
Enter fullscreen mode Exit fullscreen mode

Reading parameters in delegates

Configuration values are typically loaded in setup() using dot-based paths:

from kontiki.configuration import get_parameter
from kontiki.delegate import ServiceDelegate

class MyDelegate(ServiceDelegate):
    async def setup(self):
        self.cache_ttl_seconds = get_parameter(
            self.container.config,
            "app.cache.ttl_seconds",
            60,
        )
Enter fullscreen mode Exit fullscreen mode

7. Optional registry: heartbeats + degraded state

When the Kontiki registry service is running, services can register and expose operational state:

  • heartbeats at a configurable interval
  • degraded mode checks (custom logic)
  • optional event/exception tracking for visibility
from pathlib import Path


class MyService:
    @degraded_on
    def is_degraded(self) -> bool:
        required_dir = Path("/var/lib/my-service/uploads")
        return not required_dir.exists()
Enter fullscreen mode Exit fullscreen mode

Engineering tooling: kontiki-tui

For an operator-friendly view, the kontiki-tui project (sibling repository) provides a terminal UI to inspect/operate the Kontiki ecosystem.


Conclusion

Kontiki is a good fit for asyncio-native, message-driven microservices on RabbitMQ with first-class primitives for events and RPC—without rewriting the AMQP wiring, lifecycle, and operational plumbing for every service. This article is a quick overview: Kontiki also ships advanced options around the core features (sessions, broadcast delivery, message headers, error/retry behavior, and more). This repository ships with a solid set of runnable examples: start RabbitMQ locally and run most of them from the Makefile targets (see make run-amqp and the various run-* commands).

Top comments (0)