<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Robert</title>
    <description>The latest articles on DEV Community by Robert (@rwasik).</description>
    <link>https://dev.to/rwasik</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F772179%2F75419355-a564-44ee-a99c-db2981569a81.png</url>
      <title>DEV Community: Robert</title>
      <link>https://dev.to/rwasik</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/rwasik"/>
    <language>en</language>
    <item>
      <title>Pipeline pattern - intermittent flow code examples</title>
      <dc:creator>Robert</dc:creator>
      <pubDate>Fri, 07 Jan 2022 09:01:30 +0000</pubDate>
      <link>https://dev.to/rwasik/pipeline-pattern-intermittent-flow-code-examples-3fd</link>
      <guid>https://dev.to/rwasik/pipeline-pattern-intermittent-flow-code-examples-3fd</guid>
      <description>&lt;h2&gt;
  
  
  Pipeline example using Azure Functions
&lt;/h2&gt;

&lt;p&gt;Let’s imagine a request to translate header, body and footer of some article. But before marking translation as accepted, it has to be approved by some external unit as well. To get approval, request message for text acceptance must be placed on some external queue. When approval response is received, pipeline can be finalized.&lt;/p&gt;

&lt;p&gt;Working example can be found &lt;a href="https://github.com/rwasik/pipeline-pattern-examples/tree/pipeline-azure-functions"&gt;here&lt;/a&gt;. To run it, all you have to do is to define your ServiceBus connection string and create three queues:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;translate-text&lt;/li&gt;
&lt;li&gt;approve-text-out&lt;/li&gt;
&lt;li&gt;approve-text-in&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Using for instance ServiceBusExplorer tool you can put a messages on the queues.&lt;/p&gt;

&lt;p&gt;Couple words about implementation.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;TranslateTextPipeModel&lt;/code&gt; implements two interfaces:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;IInQueuePipeModel&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;IOutQueuePipeModel&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;There are two ‘In Message’ steps:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;DeserializeTranslateTextRequestStep&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;DeserializeApprovalTextResponseStep&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;and one ‘Out Message’ step:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;PrepareApprovalTextRequestStep&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;I have defined &lt;code&gt;TranslateTextPipeFactory&lt;/code&gt; which is used inside &lt;code&gt;TranslateTextService&lt;/code&gt; to create complete pipeline. Moreover, in service, pipe model with queue name and input message is initiated:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;var pipeModel = new TranslateTextPipeModel
{
    QueueName = queueName,
    InMessage = message
};
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Service also invokes pipeline execution.&lt;/p&gt;

&lt;p&gt;Two Azure Functions are defined:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;TranslateTextFunction&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;TextApprovalFunction&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Both functions make use of &lt;code&gt;TranslateTextService&lt;/code&gt;. First function receives a message from “translate-text” service bus and at the end it puts a request for text approval on “approve-text-out” queue. With Azure Functions that return value, you can bind a function output binding to the return value. So if you want return value to be put on service bus, you have to specify following attribute:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[return: ServiceBus("approve-text-out", Connection = "ServiceBus")]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Second function is invoked when text approval response is delivered to “approve-text-in” queue. This function finalizes the pipeline flow.&lt;/p&gt;

&lt;p&gt;Simple, isn’t it?&lt;/p&gt;

&lt;h2&gt;
  
  
  Pipeline example using Azure WebJob
&lt;/h2&gt;

&lt;p&gt;Example used for WebJob is exactly the same example as for Azure Functions – request for header, body and footer translation of some article.&lt;/p&gt;

&lt;p&gt;Solution for Azure Web Job is a bit different. First of all a  message must be put on external queue by ourselves. Using Azure Functions it was possible to apply the output binding attribute.&lt;/p&gt;

&lt;p&gt;When it comes to receiving messages, it can be done by simply defining functions desired for it or by making use of &lt;code&gt;IHostedService&lt;/code&gt; interface to implement some background queue related tasks.&lt;/p&gt;

&lt;p&gt;I will present you second approach. Using &lt;code&gt;IHostedService&lt;/code&gt; interface, &lt;code&gt;ServiceBusListener&lt;/code&gt; for pipeline will be implemented. For each pipeline queue that delivers messages, message handler is going to be registered automatically. Thanks to that it's possible to define general flow for receiving messages and passing them into pipeline execution.&lt;/p&gt;

&lt;p&gt;If by any mean it is required to extend pipeline and add some additional external call, all what is needed is just to define additional queue name and pipeline steps to handle this specific case.&lt;/p&gt;

&lt;p&gt;As I've already mentioned, sending message to external queue must be handled by ourselves. For that let’s define generic &lt;code&gt;OutServiceBusPipeStep&lt;/code&gt; which will implement &lt;code&gt;IOutQueuePipeStep&lt;/code&gt; interface. Generic step implementation will contain three abstractions required to be overridden by the inheriting class:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;ServiceBusConnectionConfigKey&lt;/code&gt; (property)&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;OutQueueName&lt;/code&gt; (property)&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;CreateOutQueueMessage(TPipeModel pipeModel)&lt;/code&gt; (function)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;code&gt;ExecuteAsync&lt;/code&gt; function from &lt;code&gt;OutServiceBusPipeStep&lt;/code&gt; will prepare message, create &lt;code&gt;QueueClient&lt;/code&gt; and based on the configuration passed through properties it will put a message on the queue.&lt;/p&gt;

&lt;p&gt;From now on this generic implementation can be used for all WebJob pipe steps that are responsible for sending message to external queue.&lt;/p&gt;

&lt;p&gt;Next part that differs from Azure Functions solution is the way of handling incoming messages.&lt;/p&gt;

&lt;p&gt;For that I defined abstract &lt;code&gt;ServiceBusListenerBase&amp;lt;TPipeModel, TInQueueEnum&amp;gt;&lt;/code&gt; class which implements &lt;code&gt;IHostedService&lt;/code&gt; interface. Implementing such interface it is required to implement following  interface members:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;Task StartAsync(CancellationToken cancellationToken)&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;Task StopAsync(CancellationToken cancellationToken)&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;code&gt;StartAsync&lt;/code&gt; method will iterate through all queues that deliver messages for pipeline and for each and every queue, message handler will be registered. Message handler does nothing but receiving message, preparing pipe model, creating and executing pipeline.&lt;/p&gt;

&lt;p&gt;Has the second generic type parameter for &lt;code&gt;ServiceBusListenerBase&lt;/code&gt; caught your attention?&lt;/p&gt;

&lt;p&gt;To have single source of truth for all queues related to receiving or sending messages for specific pipeline I created two enumeration types:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;TranslateTextPipeInQueue&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;TranslateTextPipeOutQueue&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Enumeration type which defines ‘InQueue’ values - &lt;code&gt;TranslateTextPipeInQueue&lt;/code&gt;, will be passed as &lt;code&gt;TInQueueEnum&lt;/code&gt; type into &lt;code&gt;ServiceBusListenerBase&lt;/code&gt;. Thanks to that, I’m able to iterate through its values and register message handler for defined queues.&lt;/p&gt;

&lt;p&gt;Another benefit of introducing an enumeration type is that this is the only place to be modified if new queue will become part of existing process.&lt;/p&gt;

&lt;p&gt;Final solution can be found under &lt;a href="https://github.com/rwasik/pipeline-pattern-examples/tree/pipeline-azure-webjob"&gt;this link&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>azure</category>
      <category>codequality</category>
    </item>
    <item>
      <title>Pipeline pattern - intermittent flow </title>
      <dc:creator>Robert</dc:creator>
      <pubDate>Fri, 07 Jan 2022 08:58:19 +0000</pubDate>
      <link>https://dev.to/rwasik/pipeline-pattern-intermittent-flow-21j4</link>
      <guid>https://dev.to/rwasik/pipeline-pattern-intermittent-flow-21j4</guid>
      <description>&lt;p&gt;Having implementation of pipeline pattern with error handling in place, let's start working on more sophisticated pipeline solution.&lt;/p&gt;

&lt;p&gt;Let’s define additional requirements for pipeline implementation:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;it allows to define an intermittent flow,&lt;/li&gt;
&lt;li&gt;it allows to define step to read a message from the queue,&lt;/li&gt;
&lt;li&gt;it allows to define step to send a message to a different queue,&lt;/li&gt;
&lt;li&gt;it is terminated after sending out a message,&lt;/li&gt;
&lt;li&gt;it allows to continue from a certain step after receiving response.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;On the picture below you can find desired flow diagram:&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmi4i03147zq3ol79i2p9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmi4i03147zq3ol79i2p9.png" alt="Image description"&gt;&lt;/a&gt;&lt;br&gt;
The goal of ‘In Message’ step is to read a message from the queue. It means, message has to be delivered into this specific pipe step. In other words, we must be sure that message from the queue is part of defined pipe model. To ensure this, let’s create an interface which will contain such property:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IInQueuePipeModel
{
    string InMessage { get; set; }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As you can see on flow diagram, it is possible to define multiple ‘In Message’ steps for reading messages from different queues. How does pipeline know which step should be invoked for a particular queue?&lt;/p&gt;

&lt;p&gt;To receive a message from the queue or send out a message to some queue, it is required to know queue name. The queue name is key here and this is exactly what is going to be used to distinguish between different ‘In Message’ steps. So there are two things needed:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;queue name present in pipe model to have information from which queue message was received,&lt;/li&gt;
&lt;li&gt;queue name defined for specific 'In Message' step, so pipeline can figure out where is starting point for pipe execution.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;To force queue name definition in pipe model, let’s define &lt;code&gt;QueueName&lt;/code&gt; property in &lt;code&gt;IInQueuePipeModel&lt;/code&gt; interface:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IInQueuePipeModel
{
    string QueueName { get; set; }
    string InMessage { get; set; }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Additionally, separate interface for ‘In Message’ step is needed. It will ensure queue name presence for appropriate step:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IInQueuePipeStep&amp;lt;TPipeModel&amp;gt; : IPipeStep&amp;lt;TPipeModel&amp;gt; where 
TPipeModel : IInQueuePipeModel
{
    string InQueueName { get; }
} 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice I added generic type constraint on &lt;code&gt;TPipeModel&lt;/code&gt;, meaning that the type parameter &lt;code&gt;TPipeModel&lt;/code&gt; implements the &lt;code&gt;IInQueuePipeModel&lt;/code&gt; interface.&lt;/p&gt;

&lt;p&gt;Last part is ‘Out Message’ step. This step is going to be used for preparing request message for some external queue. Additionally, after this step, pipe is going to be terminated until receiving response.&lt;/p&gt;

&lt;p&gt;To achieve that I need specific interface for ‘Out Message’ step which will indicate breaking point in the pipeline:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IOutQueuePipeStep&amp;lt;TPipeModel&amp;gt; : IPipeStep&amp;lt;TPipeModel&amp;gt; where TPipeModel : IInQueuePipeModel
{ 
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Also, I introduced yet another interface, so when needed, pipe model will implement property for outgoing messages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IOutQueuePipeModel
{
    string OutMessage { get; set; }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;IOutQueuePipeModel&lt;/code&gt; interface is not required. Since there are different ways to implement sending out messages, such property is not always needed in pipe model. Add it when you are sure you will make use of it. &lt;/p&gt;

&lt;p&gt;Even though whole pipeline is defined as one piece, in runtime it will be split into multiple executions, as flow diagram shows. In other words there is going to be a mechanism in place that will extract only steps that are required for a particular execution. How to achieve that? Let's make use of interfaces we've already defined: &lt;code&gt;IInQueuePipeStep&lt;/code&gt; and &lt;code&gt;IOutQueuePipeStep&lt;/code&gt;. Additionally, new &lt;code&gt;QueuePipeService&lt;/code&gt; will be introduced and it's going to be responsible for queue related pipe execution.&lt;/p&gt;

&lt;p&gt;Below you can find implementation of &lt;code&gt;QueuePipeService&lt;/code&gt;. The only difference comparing to the previous implementation is inside &lt;code&gt;ExecuteAsync&lt;/code&gt; method:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class QueuePipeService&amp;lt;TPipeModel&amp;gt; : IPipeService&amp;lt;TPipeModel&amp;gt;
{
    protected readonly IList&amp;lt;Func&amp;lt;IPipeStep&amp;lt;TPipeModel&amp;gt;&amp;gt;&amp;gt; _pipeSteps;

    protected QueuePipeService()
    {
        _pipeSteps = new List&amp;lt;Func&amp;lt;IPipeStep&amp;lt;TPipeModel&amp;gt;&amp;gt;&amp;gt;();
    }

    public IPipeService&amp;lt;TPipeModel&amp;gt; Add(Func&amp;lt;IPipeStep&amp;lt;TPipeModel&amp;gt;&amp;gt; pipeStep)
    {
        _pipeSteps.Add(pipeStep);
        return this;
    }

    public async Task ExecuteAsync(TPipeModel pipeModel)
    {
        var steps = GetInitializedSteps().SkipUntil(s =&amp;gt; !(s is IInQueuePipeStep&amp;lt;TPipeModel&amp;gt; &amp;amp;&amp;amp; (s as IInQueuePipeStep&amp;lt;TPipeModel&amp;gt;).InQueueName == pipeModel.QueueName))
                                         .TakeUntil(s =&amp;gt; !(s is IOutQueuePipeStep&amp;lt;TPipeModel&amp;gt;));

        foreach (var pipeStep in steps)
        {
            await pipeStep.ExecuteAsync(pipeModel);
        }
    }

    private IEnumerable&amp;lt;IPipeStep&amp;lt;TPipeModel&amp;gt;&amp;gt; GetInitializedSteps()
    {
        return _pipeSteps.Select(s =&amp;gt; s.Invoke());
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;ExecuteAsync&lt;/code&gt; works in a way that until it finds step which implements &lt;code&gt;IInQueuePipeStep&lt;/code&gt; interface and queue name for such step is equal to queue name passed in pipe model it will be skipping steps. From the matching step it gets all next steps but if one of them implements &lt;code&gt;IOutQueuePipeStep&lt;/code&gt;, it is going to be an indication that after this step some request message is going to be sent out and pipeline will be terminated until receiving response. To achieve this functionality I created two &lt;code&gt;IEnumerable&lt;/code&gt; extension methods: &lt;code&gt;SkipUntil&lt;/code&gt; and &lt;code&gt;TakeUntil&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;As I mentioned earlier, there is a big chunk of code which is common for both: &lt;code&gt;QueuePipeService&lt;/code&gt; and &lt;code&gt;PipeService&lt;/code&gt;. To follow DRY (don’t repeat yourself) principle, under &lt;a href="https://github.com/rwasik/pipeline-pattern" rel="noopener noreferrer"&gt;this link&lt;/a&gt;, you can find final implementation with extracted &lt;code&gt;PipeServiceBase&lt;/code&gt; abstract class which contains common implementation for both services.&lt;/p&gt;

&lt;p&gt;Based on the final implementation, &lt;a href="https://www.nuget.org/packages/PipelinePattern" rel="noopener noreferrer"&gt;PipelinePattern NuGet&lt;/a&gt; package is generated. It is ready to be downloaded and used.&lt;/p&gt;

&lt;h2&gt;
  
  
  Summary
&lt;/h2&gt;

&lt;p&gt;Using proposed pipe implementation, whole business process can be defined in a single place. Moreover, implementation of complex business flow will have clean, SOLID, testable and easy to understand code. Without knowing implementation details, looking just on pipe steps, at the first glance it is quite obvious what is going to happen in specific pipeline. It is just a matter of digging into specific step to find out everything. Additionally, if you integrate logging into your pipeline, issues can be tracked easily.&lt;/p&gt;

&lt;p&gt;I hope you will make use of it in your project.&lt;/p&gt;

&lt;p&gt;You can find it in NuGet package manager by typing ‘PipelinePattern’.&lt;/p&gt;

&lt;h2&gt;
  
  
  Future enhancement
&lt;/h2&gt;

&lt;p&gt;There are more things that could be incorporated into pipeline solution, like for instance:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;parallel execution of a series of steps,&lt;/li&gt;
&lt;li&gt;reporting of pipeline progress with SignalR.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Stay tuned!&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>azure</category>
      <category>codequality</category>
    </item>
    <item>
      <title>Pipeline pattern - implementation</title>
      <dc:creator>Robert</dc:creator>
      <pubDate>Mon, 20 Dec 2021 12:14:15 +0000</pubDate>
      <link>https://dev.to/rwasik/pipeline-pattern-implementation-with-intermittent-flow-31c8</link>
      <guid>https://dev.to/rwasik/pipeline-pattern-implementation-with-intermittent-flow-31c8</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Pipeline pattern implementation – so popular topic. A lot of different articles were already created to cover different aspects of this pattern. But have you ever tried to incorporate it into very complicated flow with usage of Azure Functions or WebJob and still having both: clean, SOLID, testable code and pipeline implementation which can be interrupted to perform some external request and be able to get back to appropriate step when notified and proceed further?&lt;/p&gt;

&lt;p&gt;Let me share with you my point of view on this topic. In this series of articles you will find all implementation details with explanation and examples. Let’s get started!&lt;/p&gt;

&lt;h2&gt;
  
  
  Implementation
&lt;/h2&gt;

&lt;p&gt;First, the pipeline pattern itself will be implemented and after that I will develop it further.&lt;/p&gt;

&lt;p&gt;Before writing the code, I would like to define all the assumptions the final solution should meet:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Pipeline pattern implementation is generic.&lt;/li&gt;
&lt;li&gt;Implementation follows SOLID principles.&lt;/li&gt;
&lt;li&gt;Error handling is in place.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Let’s begin with first point.&lt;/p&gt;

&lt;p&gt;If you are new to this pattern, before continue reading, stop here for a while and ask yourself couple of questions:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;How should my pipeline pattern implementation work?&lt;/li&gt;
&lt;li&gt;What is the input/output for my pipeline?&lt;/li&gt;
&lt;li&gt;How would I define generic implementation of pipeline pattern?&lt;/li&gt;
&lt;li&gt;What would I need to make it generic?&lt;/li&gt;
&lt;li&gt;What methods should be provided by generic implementation?&lt;/li&gt;
&lt;li&gt;How am I going to implement pipe step?&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Are you ready with your ideas? Below are mine.&lt;/p&gt;

&lt;p&gt;Pipeline requires steps. Step is nothing but separate class with single responsibility. Each step requires model to interact with when its logic is executed. So here is starting point:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IPipeStep&amp;lt;TPipeModel&amp;gt;
{
    Task ExecuteAsync(TPipeModel pipeModel);
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;On top of that, pipe service for managing pipe steps is needed.&lt;/p&gt;

&lt;p&gt;Its interfaces: &lt;code&gt;IPipeService&lt;/code&gt; with method definition to set up pipeline and &lt;code&gt;IPipeServiceExecution&lt;/code&gt; to define method for pipe execution:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IPipeService&amp;lt;TPipeModel&amp;gt; : IPipeServiceExecution&amp;lt;TPipeModel&amp;gt;
{
    IPipeService&amp;lt;TPipeModel&amp;gt; Add(Func&amp;lt;IPipeStep&amp;lt;TPipeModel&amp;gt;&amp;gt; pipeStep);
}

public interface IPipeServiceExecution&amp;lt;TPipeModel&amp;gt;
{
    Task ExecuteAsync(TPipeModel pipeModel);
} 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And its implementation:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class PipeService&amp;lt;TPipeModel&amp;gt; : IPipeService&amp;lt;TPipeModel&amp;gt;
{
    private readonly IList&amp;lt;Func&amp;lt;IPipeStep&amp;lt;TPipeModel&amp;gt;&amp;gt;&amp;gt; _pipeSteps;

    public PipeService()
    {
        _pipeSteps = new List&amp;lt;Func&amp;lt;IPipeStep&amp;lt;TPipeModel&amp;gt;&amp;gt;&amp;gt;();          
    }

    public IPipeService&amp;lt;TPipeModel&amp;gt; Add(Func&amp;lt;IPipeStep&amp;lt;TPipeModel&amp;gt;&amp;gt; pipeStep)
    {
        _pipeSteps.Add(pipeStep);
        return this;
    }

    public async Task ExecuteAsync(TPipeModel pipeModel)
    {
        foreach (var pipeStep in _pipeSteps)
        {
            await pipeStep.Invoke().ExecuteAsync(pipeModel);
        }
    }
} 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That’s it! Simple, isn’t it? Generic pipe implementation is ready to be used. Let’s create first pipeline.&lt;/p&gt;

&lt;p&gt;Wait...create pipeline? What is the best place to do it? Most probably it's going to have multiple steps, right? Each step is going to be represented by separate class. Steps must be combined together and at the very end &lt;code&gt;IPipeServiceExecution&lt;/code&gt; should be returned. Isn’t it a great place for factory? Indeed, it is!&lt;/p&gt;

&lt;p&gt;Let’s create yet another generic interface for pipeline factory which will be utilized for concreate pipe factory implementation. Factory is going to have single responsibility – create pipeline ready for being executed. Factory always provides complete pipeline with all defined steps. Also what is really important, &lt;code&gt;CreatePipe&lt;/code&gt; method returns &lt;code&gt;IPipeServiceExecution&lt;/code&gt; interface, so the only action that can be performed when pipe is returned from factory is its execution.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IPipeFactory&amp;lt;TPipeModel&amp;gt;
{
    IPipeServiceExecution&amp;lt;TPipeModel&amp;gt; CreatePipe();
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can find complete, generic pipeline implementation &lt;a href="https://github.com/rwasik/pipeline-pattern/tree/pipeline-no-error-handling"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Using &lt;a href="https://github.com/rwasik/pipeline-pattern-examples/tree/no-error-handling-steps"&gt;this link&lt;/a&gt;, you can navigate to an example of “Text transformation” pipeline which uses above implementation. As an input it takes unformatted text and based on defined steps it transforms it into desired format.&lt;/p&gt;

&lt;p&gt;Notice how easy we can test each step. Additionally, each step has always single responsibility. We also have open-closed principle in place. It all means that 2nd point from our assumptions list is already fulfilled – we do follow SOLID principles.&lt;/p&gt;

&lt;p&gt;Just additional side note: remember, steps order really matters in pipeline!&lt;/p&gt;

&lt;p&gt;What about error handling?&lt;/p&gt;

&lt;p&gt;Of course you can do it in each and every step or catch it outside of pipeline execution. But what if it would be required to define multiple error handling steps with the need of using pipeline model for handling an issue correctly?&lt;/p&gt;

&lt;p&gt;Again, keeping in mind single responsibility principle and unit testing let's extend &lt;code&gt;IPipeService&lt;/code&gt; interface to have a possibility to add error handling steps which will be executed in case pipe breaks:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IPipeService&amp;lt;TPipeModel&amp;gt; : IPipeServiceExecution&amp;lt;TPipeModel&amp;gt;
{
    IPipeService&amp;lt;TPipeModel&amp;gt; Add(Func&amp;lt;IPipeStep&amp;lt;TPipeModel&amp;gt;&amp;gt; pipeStep);
    IPipeService&amp;lt;TPipeModel&amp;gt; AddErrorStep(Func&amp;lt;IPipeStep&amp;lt;TPipeModel&amp;gt;&amp;gt; errorStep);
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Final implementation of pipeline pattern with error handling can be found &lt;a href="https://github.com/rwasik/pipeline-pattern/tree/pipeline-error-handling"&gt;here&lt;/a&gt;. Code example is available under &lt;a href="https://github.com/rwasik/pipeline-pattern-examples/tree/error-handling-steps"&gt;this link&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>azure</category>
      <category>codequality</category>
    </item>
  </channel>
</rss>
