We have seen how you design a workflow and how the domain model keeps it strictly type-safe. Today, we are peeling back the most complex layer of Vyshyvanka: The Execution Pipeline. This is where the graph you drew in the browser becomes a series of actual operations executing on the server.
From Graph to Execution
When you hit 'Run', the engine doesn't just start running nodes randomly. It first performs a topological sort on your directed graph using BuildExecutionLevels. This groups nodes into execution levels - nodes at the same level have no dependencies on each other and can run in parallel, while each level depends only on previous levels.
If Node B needs the output of Node A, the topological sort guarantees Node A finishes first. If there's a circular dependency in your graph, our validator catches it before it ever hits the engine.
The Life Cycle of an Execution
Every run is an immutable record that transitions through a strict state machine:
Pending ──→ Running ──→ Completed
│ │
│ ├──→ Failed
│ │
└──→ Cancelled ←──┘
- Pending: The trigger has fired, and we have initialized the execution context.
- Running: We start processing the sorted levels of nodes.
-
Terminal States: Once a workflow hits
Completed,Failed, orCancelled, that's it. No further transitions allowed.
We don't allow mutable state transitions back from a terminal state. This ensures that every execution record is an audit-ready snapshot that you can trust.
The Pipeline at Work
The pipeline itself is fully asynchronous. We leverage .NET's async/await throughout the entire chain, passing a CancellationToken into every method. If a user cancels a workflow or the server initiates a graceful shutdown, every single node in the pipeline receives that signal and halts immediately.
Here is a simplified view of the core execution loop:
public async Task<ExecutionResult> ExecuteAsync(
Workflow workflow,
IExecutionContext context,
CancellationToken cancellationToken = default)
{
var executionLevels = BuildExecutionLevels(workflow);
var maxParallelism = GetEffectiveMaxParallelism(workflow.Settings.MaxDegreeOfParallelism);
foreach (var level in executionLevels)
{
cancellationToken.ThrowIfCancellationRequested();
var levelResults = await ExecuteLevelWithThrottlingAsync(
workflow, level, context, nodeResults, maxParallelism, cancellationToken);
// Check StopOnFirstError policy
if (workflow.Settings.ErrorHandling == ErrorHandlingMode.StopOnFirstError)
{
var failedResult = levelResults.FirstOrDefault(r => !r.Success);
if (failedResult is not null) return BuildResult(/* ... */);
}
}
// ...
}
Inside the pipeline, each node goes through several stages:
- Input Gathering: We collect outputs from upstream nodes via connection mapping. Multi-input nodes receive a merged JSON object keyed by target port name.
- Expression Evaluation: All
{{ nodes.someNode.data }}expressions in the node configuration are resolved in real-time from the execution context before the node runs. - Node Execution: We invoke
ExecuteAsyncon the node instance. Since all nodes inherit from our base classes (BaseTriggerNode,BaseActionNode,BaseLogicNode), the engine handles them uniformly regardless of what they actually do. - Output Storage: Successful node outputs are stored in the context, ready for downstream nodes to reference.
Parallel Branch Execution
Nodes at the same execution level run in parallel, throttled by a SemaphoreSlim:
private async Task<NodeExecutionResult[]> ExecuteLevelWithThrottlingAsync(
Workflow workflow, List<string> level, IExecutionContext context,
ConcurrentBag<NodeExecutionResult> nodeResults, int maxParallelism,
CancellationToken cancellationToken)
{
using var semaphore = new SemaphoreSlim(maxParallelism, maxParallelism);
var tasks = level.Select(async nodeId =>
{
await semaphore.WaitAsync(cancellationToken);
try { return await ExecuteNodeInWorkflowAsync(workflow, nodeId, context, nodeResults, cancellationToken); }
finally { semaphore.Release(); }
}).ToList();
return await Task.WhenAll(tasks);
}
The default max parallelism is Environment.ProcessorCount * 2, but you can configure it per workflow.
Handling Failure
What happens when a node fails? The engine supports two error handling modes:
- StopOnFirstError: Stops the entire workflow immediately when any node fails.
- ContinueOnError: Marks the failed node and continues executing independent branches. Dependent nodes on the failed branch are skipped.
Because we have that immutable execution record, you can see exactly which node caused the issue, what data it received, and the error message — all structured in the NodeExecutionResult list.
Plugin Node Isolation
Plugin nodes (from external assemblies) are executed through IPluginHost.ExecuteNodeInIsolationAsync, which adds an additional timeout layer:
private async Task<NodeOutput> ExecuteNodeInstanceAsync(
INode nodeInstance, NodeInput input, IExecutionContext context, TimeSpan timeout)
{
if (_pluginHost is not null && _pluginHost.IsPluginNode(nodeInstance.Type))
return await _pluginHost.ExecuteNodeInIsolationAsync(nodeInstance, input, context, timeout);
return await nodeInstance.ExecuteAsync(input, context);
}
This ensures that a misbehaving plugin cannot hang the entire engine.
Why Asynchronous?
We chose async-first because workflow engines are inherently I/O bound. You are constantly waiting for databases, external APIs, and file systems. By using async/await everywhere, we ensure that while one workflow waits for a response, the thread is returned to the pool to serve other workflows. This is how a single engine instance can manage hundreds of concurrent executions without thread pool starvation.
In the next part, we will discuss Part 7: Expression Language Syntax - How dynamic data flows between your nodes. Stay tuned!
Check out the project source code here: https://github.com/homolibere/Vyshyvanka
Top comments (0)