DEV Community

Robert
Robert

Posted on • Updated on

Pipeline pattern - implementation

Introduction

Pipeline pattern implementation – so popular topic. A lot of different articles were already created to cover different aspects of this pattern. But have you ever tried to incorporate it into very complicated flow with usage of Azure Functions or WebJob and still having both: clean, SOLID, testable code and pipeline implementation which can be interrupted to perform some external request and be able to get back to appropriate step when notified and proceed further?

Let me share with you my point of view on this topic. In this series of articles you will find all implementation details with explanation and examples. Let’s get started!

Implementation

First, the pipeline pattern itself will be implemented and after that I will develop it further.

Before writing the code, I would like to define all the assumptions the final solution should meet:

  1. Pipeline pattern implementation is generic.
  2. Implementation follows SOLID principles.
  3. Error handling is in place.

Let’s begin with first point.

If you are new to this pattern, before continue reading, stop here for a while and ask yourself couple of questions:

  • How should my pipeline pattern implementation work?
  • What is the input/output for my pipeline?
  • How would I define generic implementation of pipeline pattern?
  • What would I need to make it generic?
  • What methods should be provided by generic implementation?
  • How am I going to implement pipe step?

Are you ready with your ideas? Below are mine.

Pipeline requires steps. Step is nothing but separate class with single responsibility. Each step requires model to interact with when its logic is executed. So here is starting point:

public interface IPipeStep<TPipeModel>
{
    Task ExecuteAsync(TPipeModel pipeModel);
}
Enter fullscreen mode Exit fullscreen mode

On top of that, pipe service for managing pipe steps is needed.

Its interfaces: IPipeService with method definition to set up pipeline and IPipeServiceExecution to define method for pipe execution:

public interface IPipeService<TPipeModel> : IPipeServiceExecution<TPipeModel>
{
    IPipeService<TPipeModel> Add(Func<IPipeStep<TPipeModel>> pipeStep);
}

public interface IPipeServiceExecution<TPipeModel>
{
    Task ExecuteAsync(TPipeModel pipeModel);
} 
Enter fullscreen mode Exit fullscreen mode

And its implementation:

public class PipeService<TPipeModel> : IPipeService<TPipeModel>
{
    private readonly IList<Func<IPipeStep<TPipeModel>>> _pipeSteps;

    public PipeService()
    {
        _pipeSteps = new List<Func<IPipeStep<TPipeModel>>>();          
    }

    public IPipeService<TPipeModel> Add(Func<IPipeStep<TPipeModel>> pipeStep)
    {
        _pipeSteps.Add(pipeStep);
        return this;
    }

    public async Task ExecuteAsync(TPipeModel pipeModel)
    {
        foreach (var pipeStep in _pipeSteps)
        {
            await pipeStep.Invoke().ExecuteAsync(pipeModel);
        }
    }
} 
Enter fullscreen mode Exit fullscreen mode

That’s it! Simple, isn’t it? Generic pipe implementation is ready to be used. Let’s create first pipeline.

Wait...create pipeline? What is the best place to do it? Most probably it's going to have multiple steps, right? Each step is going to be represented by separate class. Steps must be combined together and at the very end IPipeServiceExecution should be returned. Isn’t it a great place for factory? Indeed, it is!

Let’s create yet another generic interface for pipeline factory which will be utilized for concreate pipe factory implementation. Factory is going to have single responsibility – create pipeline ready for being executed. Factory always provides complete pipeline with all defined steps. Also what is really important, CreatePipe method returns IPipeServiceExecution interface, so the only action that can be performed when pipe is returned from factory is its execution.

public interface IPipeFactory<TPipeModel>
{
    IPipeServiceExecution<TPipeModel> CreatePipe();
}
Enter fullscreen mode Exit fullscreen mode

You can find complete, generic pipeline implementation here.

Using this link, you can navigate to an example of “Text transformation” pipeline which uses above implementation. As an input it takes unformatted text and based on defined steps it transforms it into desired format.

Notice how easy we can test each step. Additionally, each step has always single responsibility. We also have open-closed principle in place. It all means that 2nd point from our assumptions list is already fulfilled – we do follow SOLID principles.

Just additional side note: remember, steps order really matters in pipeline!

What about error handling?

Of course you can do it in each and every step or catch it outside of pipeline execution. But what if it would be required to define multiple error handling steps with the need of using pipeline model for handling an issue correctly?

Again, keeping in mind single responsibility principle and unit testing let's extend IPipeService interface to have a possibility to add error handling steps which will be executed in case pipe breaks:

public interface IPipeService<TPipeModel> : IPipeServiceExecution<TPipeModel>
{
    IPipeService<TPipeModel> Add(Func<IPipeStep<TPipeModel>> pipeStep);
    IPipeService<TPipeModel> AddErrorStep(Func<IPipeStep<TPipeModel>> errorStep);
}
Enter fullscreen mode Exit fullscreen mode

Final implementation of pipeline pattern with error handling can be found here. Code example is available under this link.

Top comments (2)

Collapse
 
atimoshevsky profile image
atimoshevsky • Edited

Robert, your interface IPipeStep has method Execute
public interface IPipeStep
{
Task Execute(TPipeModel pipeModel);
}

but in the loop of the steps, you are calling ExecuteAsync.

public async Task ExecuteAsync(TPipeModel pipeModel)
{
foreach (var pipeStep in _pipeSteps)
{
await pipeStep.Invoke().ExecuteAsync(pipeModel);
}
}

You should rename Execute to ExecuteAsync in the IPipeStep

Collapse
 
rwasik profile image
Robert

You're right! Thanks. It's already updated.