DEV Community

Nikolay Kim
Nikolay Kim

Posted on

Service pipeline

In the previous post, we described what a Service is and how to define it. Let's check an example:

async fn endpoint(req: HttpRequest) -> Result<HttpResponse, Error> {
    // Load operation from the request
    let oper = load_operation(req).await?;

    // Execute it using our engine
    let result = execute(oper).await?;

    // Convert the result into an HTTP response
    Ok(into_response(result).await?)
}
Enter fullscreen mode Exit fullscreen mode

This is structurally a sequence of fallible async transformations. The natural representation is a composable pipeline using an and_then-style combinator, analogous to Result::and_then():

let pipeline = chain(authn)
    .and_then(load_operation)
    .and_then(authz)
    .and_then(execute)
    .and_then(into_response);
Enter fullscreen mode Exit fullscreen mode

The resulting pipeline is itself a Service<HttpRequest, Response = HttpResponse>. Each stage is independently swappable, and insertion is trivial. For example, introducing throttling becomes a purely local change:

let pipeline = chain(authn)
    .and_then(throttling)
    .and_then(load_operation)
    .and_then(authz)
    .and_then(execute)
    .and_then(into_response);
Enter fullscreen mode Exit fullscreen mode

A throttling service may be as specific or as generic as needed. At one end, it can be Service<HttpRequest, Response = HttpRequest> for request inspection. At the other, it can be fully parametric, Service<R, Response = R> allowing it to act as a reusable concurrency boundary independent of domain types. As you can see, this approach is highly flexible.

Given the Service trait, and_then is straightforward to express generically:

fn and_then<R, A, B>(svc_a: A, svc_b: B) -> impl Service<R, Response = B::Response>
where
    A: Service<R>,
    B: Service<A::Response, Error = A::Error>,
{
    AndThen { svc_a, svc_b }
}
Enter fullscreen mode Exit fullscreen mode

The implementation is just sequential composition with short-circuiting on error:

struct AndThen<A, B> {
    svc_a: A,
    svc_b: B,
}

impl<R, A, B> Service<R> for AndThen<A, B> 
where
    A: Service<R>,
    B: Service<A::Response, Error = A::Error>,
{
    type Response = B::Response;
    type Error = A::Error;

    async fn call(&self, req: R) -> Result<Self::Response, Self::Error> {
        let intermediate = self.svc_a.call(req).await?;
        self.svc_b.call(intermediate).await
    }
}
Enter fullscreen mode Exit fullscreen mode

Other familiar combinators (then, map, map_err) fall out in the same way.

In practice, this machinery is already available in the ntex-service crate. Its ServiceChain acts as a typed builder for pipelines, deferring composition until the chain is finalized. The resulting Pipeline is a distinct type because it encodes an additional constraint over the entire service graph: readiness.

Readiness

So far, Service::call models what a service does. It says nothing about when it is able to accept work.

In practice, services are not always ready to process a request. They may be constrained by in-flight work, internal queues, or external resources. Treating call as always-available either forces unbounded buffering or pushes backpressure into ad hoc, non-composable mechanisms.

To make this explicit, we extend the Service trait with a readiness check:

trait Service<R> {
    type Response;
    type Error;

    async fn ready(&self) -> Result<(), Self::Error>;

    async fn call(&self, req: R) -> Result<Self::Response, Self::Error>;
}
Enter fullscreen mode Exit fullscreen mode

Conceptually, ready() is a gate: it resolves when the service can accept one request without violating its internal constraints.

For a composed service like AndThen<A, B>, readiness is no longer local—it depends on both components.

impl<R, A, B> Service<R> for AndThen<A, B>
where
    A: Service<R>,
    B: Service<A::Response, Error = A::Error>,
{
    type Response = B::Response;
    type Error = B::Error;

    async fn ready(&self) -> Result<(), Self::Error> {
        self.svc_a.ready().await?;
        self.svc_b.ready().await?;
        Ok(())
    }

    async fn call(&self, req: R) -> Result<Self::Response, Self::Error> {
        let intermediate = self.svc_a.call(req).await?;
        self.svc_b.call(intermediate).await
    }
}
Enter fullscreen mode Exit fullscreen mode

This is the simplest correct implementation: the composed service is ready only if both svc_a and svc_b are ready.

Pipeline readiness

Unlike call, readiness is not purely compositional—it requires coordination across the entire chain. A pipeline must determine whether it can admit new work based on the state of all inner services.

Execution is asynchronous, so multiple requests may be in flight at any given time. As a result, readiness is a function of global state—it depends on all currently active executions, not just the local state of an individual service. This makes readiness inherently a cross-cutting concern.

The Pipeline is responsible for enforcing this invariant. It owns readiness across the full chain and must suspend progression at any point if one of the inner services becomes not ready.

This introduces the notion of shared readiness. Each Pipeline instance represents a single readiness slot—i.e., it can admit at most one request at a time. A well-behaved client must either:

  • clone the pipeline per request, or
  • use Pipeline::call_static(), which performs the cloning internally.

This constraint ensures that readiness is tracked consistently across concurrent executions without introducing implicit buffering or violating backpressure guarantees.

Because readiness must be enforced at every step in the chain, services cannot directly invoke the next service. Instead, readiness must be re-established before progressing. This requirement is encoded into the Service interface via an explicit execution context.

The ServiceCtx acts as the carrier of pipeline-level state, allowing readiness to be coordinated across service boundaries.

A useful mental model:

  • Service defines what happens at each step
  • Pipeline defines the structure
  • ServiceCtx defines how execution flows through that structure safely

Final Service Definition

trait Service<R> {
    type Response;
    type Error;

    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error>;

    async fn call(
        &self,
        req: R,
        ctx: ServiceCtx<'_, Self>,
    ) -> Result<Self::Response, Self::Error>;
}
Enter fullscreen mode Exit fullscreen mode

Notes

  • Readiness is no longer a purely local property—it is mediated through ServiceCtx<'_, S>.
  • The context provides the necessary linkage between otherwise independent service instances.
  • Backpressure becomes explicit, composable, and enforceable without hidden queues.

Service Lifecycle Methods

In addition to call and ready, the Service trait defines two lifecycle methods: poll and shutdown. Each serves a distinct purpose.

At its core, a Service is a passive component—it only makes progress when driven by an external caller. However, some services need to maintain internal asynchronous state (e.g., a connection to a backend, background tasks, or resource pools). This is where poll comes in.

poll()

poll() is the mechanism that allows a service to make progress independently of request execution:

fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
Enter fullscreen mode Exit fullscreen mode

It is expected to be called by the executor (or an owning future) as part of the normal Future::poll() cycle.

A service can use poll to:

  • drive internal state machines
  • establish or maintain external resources (e.g., connections)
  • update readiness based on background progress

Crucially, poll is allowed to influence readiness. For example, a service may only become ready once a connection is established, or may transition back to Pending if capacity is exhausted. This makes poll the active side of the service, complementing the otherwise passive call.

shutdown()

shutdown() defines the termination phase of the service lifecycle:

async fn shutdown(&self);
Enter fullscreen mode Exit fullscreen mode

It must be called by the owner of the service (e.g., the Pipeline) when the service is being dropped or taken out of operation.

The purpose of shutdown is to:

  • gracefully terminate background tasks
  • release external resources
  • flush or complete in-flight work

Unlike poll, which is part of steady-state operation, shutdown is a one-time transition that finalizes the service.

Top comments (0)