Part of the redb.Route series — the Apache Camel we built for .NET. If you're just tuning in:
- redb.Route — Apache Camel for .NET: 22 transports, 30+ EIP patterns, compiled DSL — the origin story;
- Enterprise Integration Patterns in .NET — Part 1: the four in-memory channels (and the Exchange they carry);
- redb.Route 3.0.1 — flat DSL navigation, CRTP refactor, and a silent null fix;
- Apache Camel for .NET, dissected: the HTTP connector with no ASP.NET MVC + the Content-Based Router pattern — the previous "EIP + connector" deep-dive.
This time we open with the Kafka connector — taken apart the same way we did HTTP — and then we mount two EIP patterns on top of it: Scatter-Gather and Aggregator. And the part tutorials skip: how the whole thing behaves once you wrap a transaction around it. Also, 3.2.0 shipped.
One spoiler up front
There's no "exactly-once out of the box" in Kafka here — and you'll see exactly why from the code below. What works and what doesn't — stated plainly.
next A real production example: one HTTP request → six parallel aggregations
Table of contents
- Why leave MassTransit at all
- What landed in 3.2.0
- The Kafka connector: URI anatomy and the connection factory
- The Kafka producer: what
.To("kafka:...")actually does - The Kafka consumer: polling, batches, rebalance
- Headers and tracing across the broker
- Straight talk about
transacted=true - EIP #1 — Scatter-Gather: one processor, both fan-out and join
- EIP #2 — Aggregator: assembly over time
- Transactions: two models, and that's the whole point
- The parallel cousins: Splitter and Multicast
- Saga — "a slightly different one"
- Outbox — there isn't one, and that's correct
- Bottom line: a trade, not "better/worse"
1. Why leave MassTransit at all
MassTransit is genuinely good, and I'm not here to bury it. But it's an opinionated framework: the durable state-machine saga, the transactional outbox, the retry policies — they're baked in, and you live by the framework's rules. When your scenario matches its worldview, it's a joy; everything "just works." When it doesn't, you're fighting an abstraction, spelunking through internals and bolting hacks onto someone else's decisions.
Apache Camel is a different school. The bedrock is EIP primitives + connectors + a DSL. Nobody gifts you an outbox as a feature — you get Splitter, Aggregator, Scatter-Gather, transactional transports, and you assemble exactly what your invariants need. Less magic, more explicit composition from bricks.
redb.Route sits in the second school. Hence the title: not "batteries included," but "here are the bricks, and they snap together predictably." The rest of the article is about what that trade costs you, and what you get back.
2. What landed in 3.2.0
Full notes are in the CHANGELOG. Two items matter here:
-
Parallel
Splitter/Multicastbranches now isolate the ambient transaction per branch. Each branch gets its ownDependentTransaction.DependentClone(BlockCommitUntilComplete); the parent commit blocks until every branch signals done. Before this, parallel branches shared a singleTransaction.Current, andSystem.Transactionsforbids using one transaction concurrently across threads. We'll read this fix line by line and see why Scatter-Gather never needed it. -
Throttlegot an RFC 6585 mode —.RejectOnOverflow()(429 +Retry-After). Off-topic today, but handy if you front all of this with an HTTP edge.
All redb.Route.* packages are version-aligned at 3.2.0.
3. The Kafka connector: URI anatomy and the connection factory
We open with Kafka, as promised — and we open with the simplest thing there is: a string endpoint URI. Here's a minimal consumer and a minimal producer:
// Consumer: read topic 'orders', group 'order-workers', from the beginning
From("kafka:orders?brokers=kafka:9092&groupId=order-workers&autoOffsetReset=earliest")
.Log("Got: ${body}")
.To("direct:process");
// Producer: publish to 'notifications' with acks=all
From("direct:notify")
.To("kafka:notifications?brokers=kafka:9092&acks=All");
The URI shape is kafka:<topic>?<query>. The topic comes from the path (KafkaEndpoint):
public KafkaEndpoint(EndpointUri uri, KafkaComponent component, KafkaEndpointOptions options)
: base(uri, component, options)
{
TopicName = uri.Path; // ← topic = URI path
// If the URI says connectionFactory=<name>, resolve it from the registry
if (!string.IsNullOrEmpty(options.ConnectionFactory) && component.Context is not null)
ResolvedFactory = component.Context.GetFromRegistry<KafkaConnectionFactory>(options.ConnectionFactory);
// No brokers in the URI but the factory has them — fill it in
if (string.IsNullOrWhiteSpace(options.Brokers) && ResolvedFactory is not null)
options.Brokers = ResolvedFactory.Brokers;
}
Note the last touch: on a consumer, groupId is mandatory, and forgetting it fails endpoint creation rather than blowing up later:
public override IConsumer CreateConsumer(IProcessor processor)
{
if (string.IsNullOrWhiteSpace(Options.GroupId))
throw new InvalidOperationException(
$"The 'groupId' parameter is required for Kafka consumer on topic '{TopicName}'.");
return new KafkaConsumer(this, processor, Options);
}
The connection factory — so you don't repeat yourself
When you've got twenty Kafka endpoints on one cluster, spelling out brokers=...&securityProtocol=...&saslMechanism=... in every URI is suicide. That's what KafkaConnectionFactory is for: register it in the registry under a name, then reference it via connectionFactory=name.
Its full parameter surface (all from source, nothing invented):
| Group | Properties | Default |
|---|---|---|
| Brokers | Brokers |
localhost:9092 |
| Security |
SecurityProtocol, SaslMechanism, SaslUsername, SaslPassword
|
Plaintext |
| SSL/TLS |
SslCaLocation, SslCertificateLocation, SslKeyLocation, SslKeyPassword, SslEndpointIdentificationAlgorithm
|
— |
| Producer |
Acks, Retries
|
Leader, 3
|
| Consumer |
GroupId, AutoOffsetReset
|
—, Latest
|
| Consumer tuning |
GroupInstanceId, SessionTimeoutMs, HeartbeatIntervalMs, MaxPollIntervalMs, PartitionAssignmentStrategy, IsolationLevel
|
— |
| Producer tuning |
LingerMs, BatchSize, CompressionType, MessageTimeoutMs
|
— |
| Reconnect |
ReconnectBackoffMs, ReconnectBackoffMaxMs
|
— |
| Advanced |
ClientId, RequestTimeoutMs, MetadataMaxAgeMs, SocketTimeoutMs, MaxInFlight, AdditionalProperties
|
redb.Route, 30000, 300000, 60000, 5 |
Register and use:
context.AddToRegistry("prod-cluster", new KafkaConnectionFactory
{
Brokers = "kafka1:9092,kafka2:9092,kafka3:9092",
SecurityProtocol = "SaslSsl",
SaslMechanism = "ScramSha512",
SaslUsername = "svc-orders",
SaslPassword = secret,
CompressionType = "Zstd",
MaxInFlight = 5,
});
// Now the URI only carries what's endpoint-specific:
From("kafka:orders?connectionFactory=prod-cluster&groupId=order-workers");
To("kafka:notifications?connectionFactory=prod-cluster&acks=All");
The factory builds three configs — BuildConsumerConfig(), BuildProducerConfig(), BuildAdminConfig() (the last for topic-metadata queries). Endpoint-level options are applied on top of the factory (if (!string.IsNullOrWhiteSpace(Brokers)) config.BootstrapServers = Brokers; and so on), so the factory sets the baseline and the URI overrides it surgically.
The full endpoint option set
The complete list — KafkaEndpointOptions, grouped, with defaults:
Connection / security: brokers (required, or Validate() throws), securityProtocol (Plaintext/Ssl/SaslPlaintext/SaslSsl), saslMechanism/saslUsername/saslPassword, sslCaLocation/sslCertificateLocation/sslKeyLocation/sslKeyPassword, connectionFactory.
Consumer: groupId, autoOffsetReset (Latest/Earliest/Error, default Latest), enableAutoCommit (default true, framework-level — since 3.2.1), maxPollRecords (0 = single mode, >0 = batch), pollTimeoutMs (1000), breakOnFirstError, seekTo (beginning/end), topicIsPattern, groupInstanceId, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, partitionAssignmentStrategy (Range/RoundRobin/CooperativeSticky), isolationLevel (ReadUncommitted/ReadCommitted).
Producer: acks (None/Leader/All, default Leader), retries (3), recordMetadata, key, partitionNumber, transacted, transactionIdPrefix (redb-kafka).
Producer tuning: lingerMs, batchSize, compressionType (None/Gzip/Snappy/Lz4/Zstd), messageTimeoutMs.
Advanced: additionalProperties — arbitrary librdkafka properties, applied last, overriding everything typed.
Validate() checks exactly: brokers non-empty, maxPollRecords >= 0, pollTimeoutMs >= 0, retries >= 0. Everything else has a default or is optional.
One thing to get precise — about the offset commit, and it matters. At the librdkafka level, EnableAutoCommit is hard-wired to false always — the library never commits on a background timer (which would commit un-processed offsets). From BuildConsumerConfig:
var cfg = new ConsumerConfig
{
BootstrapServers = Brokers,
GroupId = GroupId,
AutoOffsetReset = ParseAutoOffsetReset(),
EnableAutoCommit = false, // librdkafka level: always manual commit
};
But there is a commit — one level up, at the framework level. As of 3.2.1 the consumer has a typed EnableAutoCommit option (default true): after a successful Process it commits the offset inline — exactly like the RabbitMQ consumer acks after processing. Set it from the URI (?enableAutoCommit=false) or fluently (.EnableAutoCommit(false)). A transactional route takes precedence: if a .Transacted() already committed the offset, the inline path is skipped. The mechanics are in §5.
The same thing, fluent
String URIs are great for short snippets and config files. In code you usually build endpoints fluently — Kafka.Topic(...):
From(Kafka.Topic("orders")
.Brokers("kafka1:9092,kafka2:9092")
.GroupId("order-workers")
.AutoOffsetReset("Earliest")
.MaxPollRecords(500)
.PartitionAssignmentStrategy("CooperativeSticky")
.IsolationLevel("ReadCommitted")
.SessionTimeout(30_000)
.MaxPollInterval(300_000));
To(Kafka.Topic("notifications")
.Brokers("kafka1:9092")
.Acks("All")
.Compression("zstd")
.Linger(20)
.BatchSize(64 * 1024)
.Key("${header.orderId}")); // partition key — an expression
The fluent builder just assembles the same query string (Kafka.Topic("orders").Brokers(...)...Build() → "kafka:orders?brokers=...&..."), URL-encoding the values. The setters take an IExpression, but options resolve at different times: the producer key is evaluated as a ${...} expression per message (in DetermineKey at send time — see below), whereas connection-level options (brokers, credentials) are resolved once on connect, where there's no exchange yet, so an expression there is meaningless. The per-message story is specifically the partition key.
4. The Kafka producer: what .To("kafka:...") actually does
Now let's read what KafkaProducer really does when a message hits .To("kafka:..."). It's a ConnectableProducer, so it requires Start() first (EnsureStarted() at the top of Process).
Connect
protected override Task ConnectAsync(CancellationToken ct)
{
var config = _options.BuildProducerConfig(_endpoint.ResolvedFactory);
_producer = new ProducerBuilder<string, byte[]>(config)
.SetValueSerializer(Serializers.ByteArray)
.SetErrorHandler((_, e) =>
Logger?.LogError("Kafka producer error: {Reason} (Code: {Code})", e.Reason, e.Code))
.Build();
if (_options.Transacted)
{
_producer.InitTransactions(TimeSpan.FromSeconds(30));
Logger?.LogInformation("Kafka transactional mode enabled: topic={Topic}", _endpoint.TopicName);
}
return Task.CompletedTask;
}
Key is string, value is byte[]. On disconnect it does Flush(30s) before Dispose so buffered messages aren't lost.
Preparing the message
PrepareMessage turns the exchange body into a Message<string, byte[]>:
byte[] valueBytes = body switch
{
byte[] bytes => bytes, // already bytes — as-is
string str => Encoding.UTF8.GetBytes(str), // string → UTF-8
null => Array.Empty<byte>(), // null → empty
_ => Encoding.UTF8.GetBytes(body.ToString() ?? string.Empty) // else → ToString → UTF-8
};
Then headers. ContentType rides as a content-type header, and the exchange's user headers move into Kafka headers — except the internal redbKafka.* ones, which are filtered out:
foreach (var (key, value) in exchange.In.Headers)
{
if (KafkaHeaders.IsRedbHeader(key)) // redbKafka.* are internal — don't propagate
continue;
msg.Headers.Add(key, Encoding.UTF8.GetBytes(value?.ToString() ?? string.Empty));
}
The partition key
DetermineKey — three branches, in priority order:
private string? DetermineKey(IExchange exchange)
{
// 1. Explicit partition → no key (partitioner is bypassed)
if (_options.PartitionNumber.HasValue)
return null;
// 2. Key from the option — supports ${...} expressions, resolved per message
if (!string.IsNullOrWhiteSpace(_options.Key))
{
var resolved = _options.ResolveOption(_options.Key, exchange);
if (!string.IsNullOrEmpty(resolved)) return resolved;
}
// 3. Otherwise — keyless (round-robin across partitions)
return null;
}
So key=${header.orderId} means "partition by orderId" — all events for one order land in one partition and keep their order.
Send: immediate vs deferred
Here's the fork that matters for transactions:
if (_options.Transacted)
{
// Deferred send — the real publish happens when the route transaction commits
var action = new KafkaSendAction(_producer, _endpoint.TopicName, message,
_options.PartitionNumber, _options.RecordMetadata, exchange, Logger);
RegisterTransactedAction(exchange, $"kafka-send-{Guid.NewGuid():N}", action);
}
else
{
// Immediate send
var result = _options.PartitionNumber.HasValue
? await _producer.ProduceAsync(new TopicPartition(_endpoint.TopicName,
new Partition(_options.PartitionNumber.Value)), message, ct)
: await _producer.ProduceAsync(_endpoint.TopicName, message, ct);
if (_options.RecordMetadata)
AddDeliveryMetadata(exchange, result); // redbKafka.Sent.Topic/Partition/Offset/Timestamp
}
In transactional mode the producer doesn't send right away — it parks a KafkaSendAction in the deferred-action bag under a unique key kafka-send-{guid}. The real ProduceAsync happens later, at the .Transacted() boundary. More on that below.
KafkaSendAction clones the message at construction (a deep header copy via Buffer.BlockCopy) so the deferred send doesn't depend on what happens to the exchange afterwards:
public async Task Commit(CancellationToken ct = default)
{
var result = _partition.HasValue
? await _producer.ProduceAsync(new TopicPartition(_topicName, new Partition(_partition.Value)), _message, ct)
: await _producer.ProduceAsync(_topicName, _message, ct);
if (_recordMetadata) { /* write the Sent.* headers */ }
}
public Task Rollback(CancellationToken ct = default)
{
// The message is simply discarded — nothing was published
return Task.CompletedTask;
}
Tracing across the broker
Before sending, the producer injects W3C trace context (traceparent/tracestate) into Kafka headers via the standard DistributedContextPropagator:
var propagator = DistributedContextPropagator.Current;
propagator.Inject(activity, headers, static (carrier, key, value) =>
{
if (carrier is Headers h && !string.IsNullOrEmpty(value))
{
h.Remove(key);
h.Add(key, Encoding.UTF8.GetBytes(value));
}
});
The consumer on the other end lifts it back out — and your trace shows an unbroken chain across the broker. Zero configuration on your side.
5. The Kafka consumer: polling, batches, rebalance
KafkaConsumer is a DrainableConsumer (it can gracefully drain in-flight messages on stop). Step by step.
Start: subscribe/assign, seek, metadata
protected override Task OnStarting(CancellationToken ct)
{
var config = _options.BuildConsumerConfig(_endpoint.ResolvedFactory);
_consumer = new ConsumerBuilder<string, byte[]>(config)
.SetValueDeserializer(Deserializers.ByteArray)
.SetErrorHandler((_, e) => { /* fatal → LogError, else LogWarning */ })
.SetPartitionsAssignedHandler((c, partitions) => { /* log assigned */ })
.SetPartitionsRevokedHandler((c, partitions) =>
{
// Commit current offsets before partitions are revoked
try { c.Commit(partitions); }
catch (KafkaException) { /* nothing to commit — fine */ }
})
.SetPartitionsLostHandler((_, partitions) => { /* log involuntary loss */ })
.Build();
if (_options.PartitionNumber.HasValue)
_consumer.Assign(new[] { new TopicPartition(_endpoint.TopicName, new Partition(_options.PartitionNumber.Value)) });
else
_consumer.Subscribe(_endpoint.TopicName);
HandleSeekTo(); // seekTo=beginning/end on first start
LogTopicMetadata(); // dump: partitions, leaders, replicas, ISR
return Task.CompletedTask;
}
Two membership modes: Subscribe(topic) (dynamic group assignment + rebalance) or an explicit Assign(partition) when partitionNumber is set (static assignment, no group rebalance).
LogTopicMetadata spins up an AdminClient at start and logs the topic topology — partition count, each partition's leader, replicas, ISR. Handy for "why did I only get assigned 2 of 6 partitions" debugging.
The poll loop
RunAsync kicks off a long-running task with the poll loop:
private async Task PollLoop(CancellationToken pollCt, CancellationToken processingCt)
{
while (!pollCt.IsCancellationRequested)
{
try
{
if (_options.MaxPollRecords > 0)
await ProcessBatch(pollCt, processingCt); // batch mode
else
await ProcessSingleMessage(pollCt, processingCt); // one at a time
}
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
Logger?.LogError(ex, "Error in Kafka poll loop for topic {Topic}", _endpoint.TopicName);
try { await Task.Delay(1000, pollCt); } catch (OperationCanceledException) { break; }
}
}
}
Caught an exception in the loop? Log it, wait a second, carry on (one bad message doesn't kill the consumer).
Single mode
private async Task ProcessSingleMessage(CancellationToken pollCt, CancellationToken processingCt)
{
var result = _consumer!.Consume(pollCt);
if (result?.Message is null) return;
var exchange = CreateExchange(result);
IncrementInflight();
try
{
// Register the offset commit as a deferred action — for the transactional-route case.
var commitAction = new KafkaCommitAction(_consumer, result, Logger);
RegisterTransactedAction(exchange, $"kafka-commit-{result.Offset.Value}", commitAction);
await Processor.Process(exchange, processingCt);
ProcessedCount++;
// 3.2.1: auto-commit after success — unless a transaction already committed it (Committed flag).
if (_options.EnableAutoCommit && !commitAction.Committed)
await commitAction.Commit(processingCt);
}
finally
{
await exchange.DisposeAsync();
DecrementInflight();
}
}
The key detail: the consumer registers a KafkaCommitAction before processing. So the offset isn't committed immediately — it commits when the route transaction commits, alongside any deferred sends. If processing throws, the offset isn't committed and the message comes back.
KafkaCommitAction.Commit is a plain _consumer.Commit(result):
public Task Commit(CancellationToken ct = default)
{
_consumer.Commit(_result); // ← plain offset commit, NOT SendOffsetsToTransaction
return Task.CompletedTask;
}
public Task Rollback(CancellationToken ct = default) => Task.CompletedTask; // no commit → message is re-read
Batch mode
With maxPollRecords > 0, the consumer collects up to maxPollRecords records, or until the pollTimeoutMs deadline, and hands them off as one exchange whose body is a List<IMessage>:
while (batch.Count < _options.MaxPollRecords && DateTime.UtcNow < deadline && !pollCt.IsCancellationRequested)
{
var result = _consumer!.Consume(TimeSpan.FromMilliseconds(100));
if (result?.Message is not null) batch.Add(result);
}
if (batch.Count == 0) return;
var exchange = CreateBatchExchange(batch); // Body = List<IMessage>, redbKafka.BatchSize = N
// Commit only the last offset of the batch
var last = batch[^1];
RegisterTransactedAction(exchange, $"kafka-batch-commit-{last.Offset.Value}", new KafkaCommitAction(_consumer!, last, Logger));
await Processor.Process(exchange, processingCt);
Subtlety: only the last offset is committed (Kafka offsets are monotonic per partition, so committing the last covers the whole batch). Downstream you can .Split(body => body) and handle each message individually — but the commit is still one per batch.
How the exchange gets its metadata
CreateExchange sets the body (byte[]), transfers Kafka headers (UTF-8 decoded), restores ContentType, and stamps metadata:
message.Headers[KafkaHeaders.Topic] = result.Topic;
message.Headers[KafkaHeaders.Partition] = result.Partition.Value;
message.Headers[KafkaHeaders.Offset] = result.Offset.Value;
if (result.Message.Timestamp.Type != TimestampType.NotAvailable)
message.Headers[KafkaHeaders.Timestamp] = result.Message.Timestamp.UtcDateTime;
if (!string.IsNullOrEmpty(result.Message.Key))
message.Headers[KafkaHeaders.Key] = result.Message.Key;
var exchange = Exchange.Create(message, _endpoint.ScopeFactory);
exchange.Pattern = ExchangePattern.InOnly; // a consumer is InOnly
What the consumer does while a message travels the route
Let's walk ProcessSingleMessage step by step, because it also explains the commit story:
-
Consume(pollCt)— pulled one record (in single mode the poll is blocking: the next record isn't pulled until the current one finishes); -
CreateExchange— built the exchange; -
IncrementInflight()— marked the message in-flight (this is whatDrainableConsumerneeds for a graceful stop: on shutdown it drains all in-flight messages); - registered a
KafkaCommitActioninTRANSACT_ACTION— not committed yet; -
await Processor.Process(exchange)— ran the message through the whole route (synchronously awaited); -
ProcessedCount++; - if
EnableAutoCommit(defaulttrue) and the offset wasn't already committed by a transaction — commits it inline right here; -
finally:exchange.DisposeAsync()+DecrementInflight().
The key point: by default (EnableAutoCommit=true, since 3.2.1) the consumer commits the offset inline after a successful Process — at-least-once, just like RabbitMQ. The KafkaCommitAction registered in TRANSACT_ACTION is for the other case — when a transaction should own the commit: then .Transacted() commits it at its boundary, and the action's Committed flag stops the consumer from committing it a second time inline.
Batch mode is the same, except Process gets a single exchange whose body is a List<IMessage>, and the last offset of the batch is committed (inline or by the transaction).
How the offset commits: default vs transactional mode
As of 3.2.1 the default is simple and matches RabbitMQ: process it, commit it.
// ✅ Default (EnableAutoCommit=true): the offset commits inline after a successful Process.
// A plain consumer needs no configuration — at-least-once out of the box.
From("kafka:orders?brokers=kafka:9092&groupId=w")
.To("direct:process");
Transactional mode is for when the offset must commit together with other work (deferred sends to Kafka/Redis, a redb write) — atomically at the route boundary. Turn auto-commit off and wrap it in a transaction; the offset commit becomes part of the TRANSACT_ACTION batch.
// ✅ Transactional: the offset + deferred sends commit together at the .Transacted() boundary.
// enableAutoCommit=false → no inline commit; the transaction owns the offset.
From("kafka:orders?brokers=kafka:9092&groupId=w&enableAutoCommit=false")
.Transacted()
.To("kafka:orders.done?brokers=kafka:9092&transacted=true")
.EndTransaction();
// ✅ Imperative — the same via an explicit commit.
From("kafka:orders?brokers=kafka:9092&groupId=w&enableAutoCommit=false")
.BeginTransaction()
.To("direct:process")
.CommitTransaction();
If you leave enableAutoCommit=true (the default) AND wrap in .Transacted(), the transaction wins: it commits the offset at the boundary, and the Committed flag on KafkaCommitAction stops the consumer from duplicating the commit inline. So inside a transactional route the option is effectively ignored.
By the way, in the redb.Route.Demo/Routes demos Kafka is used only as a producer (
.WireTap(KafkaWireTap)to thedemo-audittopic inMainPipelineRoutes.cs); there's no consumer there — so the production consume-process-produce pattern is in the transactions section.
6. Headers and tracing across the broker
The full header reference — KafkaHeaders. They all share the redbKafka. prefix:
| Header | Set by | Meaning |
|---|---|---|
redbKafka.Topic |
consumer | topic of the consumed record |
redbKafka.Partition |
consumer | partition |
redbKafka.Offset |
consumer | offset in the partition |
redbKafka.Timestamp |
consumer | record timestamp |
redbKafka.Key |
consumer | key (if present) |
redbKafka.BatchSize |
consumer | batch size (batch mode) |
redbKafka.Sent.Topic |
producer | topic sent to (with recordMetadata) |
redbKafka.Sent.Partition |
producer | destination partition |
redbKafka.Sent.Offset |
producer | assigned offset |
redbKafka.Sent.Timestamp |
producer | send time |
In a route you reach them as ${header.redbKafka.Offset}:
From("kafka:orders?brokers=kafka:9092&groupId=w")
.Log("offset=${header.redbKafka.Offset} partition=${header.redbKafka.Partition} key=${header.redbKafka.Key}");
On re-publish these redbKafka.* headers are not propagated into the outgoing Kafka message (the IsRedbHeader filter in the producer) — so one hop's metadata doesn't leak into the next.
7. Straight talk about transacted=true
This is where marketing wants to say "exactly-once." Let's read the code with no spin.
What transacted=true does (from BuildProducerConfig):
if (Transacted)
{
config.EnableIdempotence = true;
config.Acks = Acks.All; // required for the idempotent producer
}
That's it: no transactional.id, and InitTransactions / BeginTransaction / CommitTransaction / SendOffsetsToTransaction appear nowhere. None of librdkafka's transactional API is in play.
What that means in practice:
- "transacted" here = an idempotent producer (
EnableIdempotence=true, dedup of retries within the session) + a deferred send (viaKafkaSendAction, realProduceAsyncat the route's transaction boundary); - this is not Kafka EOS over consume-process-produce: there's no atomic Kafka transaction via
SendOffsetsToTransaction; - a crash between the send and the offset commit yields a duplicate on restart (at-least-once), not exactly-once.
So the honest phrasing is "idempotent + route-level deferred commit," not exactly-once. For most needs — "don't lose the message, don't commit the offset before the work is done" — that's enough; true read-process-write EOS (BeginTransaction / SendOffsetsToTransaction / CommitTransaction) is a separate story, and it isn't here.
Apache Camel, by the way, doesn't give you EOS via transacted() either: it's auto-commit by default + a manual-commit hook (KafkaManualCommit) + an idempotent repository for dedup — the same model. So this isn't "Kafka-lite" — it's the same honest trade a mature integration framework makes.
With Kafka taken apart, we can put EIP patterns on it.
8. EIP #1 — Scatter-Gather: one processor, both fan-out and join
Classic Scatter-Gather from Hohpe/Woolf is "broadcast a request to N recipients and gather their responses into one." In redb.Route it's a single processor, ScatterGatherProcessor, whose aggregator is mandatory and whose parallelism is on by default.
The most stringly-typed shape — Kafka in, fan out to three services, fold, publish the result:
From("kafka:orders.incoming?brokers=kafka:9092&groupId=enricher&autoOffsetReset=earliest")
.RouteId("order-enrich")
.ScatterGather(
// (accumulated, current) → merged. Called pair-wise.
aggregationStrategy: (acc, cur) =>
{
var merged = (acc.In.Headers.TryGetValue("merged", out var m) ? m as string : "") ?? "";
acc.In.Headers["merged"] = merged + cur.In.Body;
return acc;
},
"http://pricing:8080/quote",
"http://inventory:8080/check",
"http://fraud:8080/score")
.To("kafka:orders.enriched?brokers=kafka:9092&acks=All");
ℹ️ No transaction here, and that's fine: by default (3.2.1) the inbound offset commits inline after successful processing. You only need a transaction for an atomic "commit-offset + publish-result" coupling — then
enableAutoCommit=false+.Transacted()(see §5).A realistic note about Kafka. Scatter-Gather gathers responses, so its natural fan-out targets are request/reply endpoints: HTTP, gRPC, SQL SELECT. A Kafka producer is fire-and-forget — there's nothing to "gather" beyond delivery metadata. So Kafka lives at the edges here: source (
From("kafka:...")) and sink (To("kafka:...")), while the fan-out targets services. That's how it shakes out in production.
Under the hood: the parallel path
private async Task ProcessParallel(IExchange exchange, IReadOnlyList<string> recipients, CancellationToken ct, CancellationToken callerCt)
{
var clones = new IExchange?[recipients.Count];
var maxDop = _maxDegreeOfParallelism > 0 ? _maxDegreeOfParallelism : Environment.ProcessorCount;
using var semaphore = new SemaphoreSlim(maxDop);
async Task<IExchange> SendToRecipient(int index)
{
await semaphore.WaitAsync(ct);
try
{
var clone = exchange.Clone(); // ← Clone(): Properties copied shallowly
clones[index] = clone;
var producer = GetOrCreateProducer(recipients[index]); // per-URI producer cache
await producer.Process(clone, ct);
return clone;
}
finally
{
if (clones[index] != null)
await clones[index]!.ReleaseScopes(); // release DI scopes early; body/headers live on for aggregation
semaphore.Release();
}
}
var tasks = Enumerable.Range(0, recipients.Count).Select(SendToRecipient).ToList();
var results = await Task.WhenAll(tasks); // (stopOnException; best-effort wraps each branch in try/catch)
// Aggregation — in deterministic index order, not arrival order
IExchange? aggregated = null;
foreach (var result in results)
{
if (result == null) continue;
aggregated = aggregated is null ? result : _aggregationStrategy(aggregated, result);
}
ApplyAggregation(exchange, aggregated);
}
A few things here are critical:
-
exchange.Clone()— each recipient works on its own clone.Clone()copiesPropertiesshallowly (more on this in the transactions section), which matters for a consistent commit. -
SemaphoreSlim(maxDop)— parallelism is capped.MaxDegreeOfParallelism=0meansEnvironment.ProcessorCount. -
Aggregation in index order. Even if
fraudanswers first, the fold runspricing → inventory → fraud. Predictability you can lean on. -
ReleaseScopes()early — the clone's DI scopes are released right after the send, while body and headers stay alive for aggregation. -
Producer cache —
GetOrCreateProducerholds aConcurrentDictionary<string, Lazy<ToProcessor>>;DisposeAsyncstops every producer it created.
The sequential path
With ParallelProcessing(false) — different code, and a different clone:
foreach (var uri in recipients)
{
var clone = exchange.CloneLinked(); // ← CloneLinked(), not Clone()
var producer = GetOrCreateProducer(uri);
await producer.Process(clone, ct);
aggregated = aggregated is null ? clone : _aggregationStrategy(aggregated, clone);
}
The Clone() vs CloneLinked() difference isn't cosmetic — covered under transactions.
Error handling: best-effort vs stop-on-exception
StopOnException(false) (default, best-effort): a failed branch gets its clone.Exception set and is still handed to the aggregator — your strategy decides what to do with a branch carrying an Exception:
catch (Exception ex)
{
if (_stopOnException) throw;
clone.Exception = ex;
aggregated = aggregated is null ? clone : _aggregationStrategy(aggregated, clone);
}
Timeout in best-effort: the sequential path aggregates the partial set and bails (break); the parallel path drops the timed-out branches (null). StopOnException(true) propagates the first failure, and a timeout is wrapped into a TimeoutException:
catch (OperationCanceledException) when (_timeout > TimeSpan.Zero && !ct.IsCancellationRequested)
{
throw new TimeoutException($"Scatter-gather timed out after {_timeout}.");
}
Every parameter
The full contract — IScatterGatherDefinition:
| Parameter | Default | What it does |
|---|---|---|
Recipients(params string[]) |
— | Static list of recipient URIs. |
Recipients(Func<IExchange, IEnumerable<string>>) |
— | Dynamic — computed from the live message. |
AggregationStrategy(...) |
required | Pair-wise (acc, cur) → merged. Missing it fails the build. |
ParallelProcessing(bool) |
true |
Parallel or sequential. |
MaxDegreeOfParallelism(int) |
0 → ProcessorCount |
Concurrency cap. |
StopOnException(bool) |
false |
best-effort or fail-fast. |
Timeout(TimeSpan) |
Zero (none) |
Whole-operation deadline. |
The fully-knobbed fluent form:
.ScatterGather(sg => sg
.Recipients("http://pricing:8080/quote", "http://inventory:8080/check", "http://fraud:8080/score")
.AggregationStrategy((acc, cur) => { /* ... */ return acc; })
.ParallelProcessing(true)
.MaxDegreeOfParallelism(3)
.Timeout(TimeSpan.FromSeconds(2))
.StopOnException(false))
Dynamic recipients
The recipient list can be computed from the message — say, a fan-out across shards whose numbers arrived in a header:
.ScatterGather(sg => sg
.Recipients(e =>
{
var shards = (e.In.Headers["shards"] as string ?? "").Split(',');
return shards.Select(s => $"http://shard-{s.Trim()}:8080/query");
})
.AggregationStrategy((acc, cur) => MergeJson(acc, cur)))
That's essentially a Dynamic Recipient List inside Scatter-Gather — exactly what EIP primitives exist for.
A real production example: one HTTP request → six parallel aggregations
Enough synthetic snippets — here's a real production route (a logistics monitoring dashboard). The POST /api/tsum/routes endpoint returns a page of routes plus five different aggregate blocks for the dashboard widgets. That used to be six sequential database round-trips; here it's a single Scatter-Gather:
From("http:0.0.0.0:5090/api/tsum/routes?inOut=true&cors=true")
.RouteId("tsum-api-routes")
.Process(Auth.ProcessAsync) // authentication
.ConvertBody<string>()
.Process(ParseAndStashFilter) // parse the filter → into Properties
.ScatterGather(sg => sg
.Recipients(
"direct://routes-page", // the result page (pagination)
"direct://routes-ownership", // aggregate: owned/hired × load status
"direct://routes-route-status", // aggregate: by route status
"direct://routes-point-status", // aggregate: by point status
"direct://routes-territory", // aggregate: vehicles on territory
"direct://routes-departure") // aggregate: departure stats
.AggregationStrategy(MergeRouteFragments)
.ParallelProcessing(true)
.MaxDegreeOfParallelism(4)
.StopOnException(true)
.Timeout(TimeSpan.FromSeconds(30)))
.Process(ComposeRoutesResponse); // assemble the final JSON from the fragments
Each branch is its own direct:// route with its own redb query that drops its slice of the response into a frag:* property (plus its own timing metric):
From("direct://routes-ownership")
.ProcessWithRedb(async (redb, ex, ct) =>
{
var sw = Stopwatch.StartNew();
var query = await BuildFilteredQuery(redb, Filter(ex)); // the same filter every branch uses
var groups = await query
.GroupBy(r => new { r.Own, r.LoadStatus }) // server-side aggregation in redb
.SelectAsync(g => new { g.Key.Own, g.Key.LoadStatus, Count = Agg.Count(g) });
ex.Properties["frag:ownership"] = BuildOwnershipFragment(groups);
ex.Properties["metric:ownershipMs"] = sw.ElapsedMilliseconds;
});
The aggregator just folds each branch clone's frag:* and metric:* into the accumulator:
private static IExchange MergeRouteFragments(IExchange aggregated, IExchange current)
{
foreach (var kv in current.Properties)
if (kv.Key.StartsWith("frag:") || kv.Key.StartsWith("metric:"))
aggregated.Properties[kv.Key] = kv.Value; // branch fragment → into the shared result
return aggregated;
}
and ComposeRoutesResponse builds the final JSON out of the six frag:* pieces.
Why this is "very fast." The six branches run in parallel (cap of 4 at a time). The endpoint's latency is the slowest single branch, not the sum of six. If each aggregation is ~80–150 ms, sequentially you'd be at ~0.6–0.9 s; through Scatter-Gather it's ~150 ms. One request from the frontend → one HTTP hop → six server-side redb aggregations at once → one JSON.
And here everything we covered about transactions lands in practice: each branch's ProcessWithRedb spins up its own per-exchange redb scope → its own connection (see §10). Six parallel queries on six connections, not sharing one transaction — which is exactly why it doesn't fall over. No .Transacted() is needed here: the branches only read, and the join is frag:* in memory. StopOnException(true) means "no half-built widget" — if any branch fails, the whole response is an error, not partial data.
9. EIP #2 — Aggregator: assembly over time
Scatter-Gather joins responses right now (fan-out → join). The Aggregator is a different pattern: it collects independent messages over time by a correlation key and emits the merge once a completion predicate trips.
The AggregatorProcessor contract is four things: correlationKey, aggregationStrategy, completionPredicate, target. The core:
public async Task Process(IExchange exchange, CancellationToken ct = default)
{
var key = _correlationKey(exchange);
IExchange? completed = null;
lock (_lock)
{
if (_aggregated.TryGetValue(key, out var existing))
{
var merged = _aggregationStrategy(existing, exchange);
_aggregated[key] = merged;
if (_completionPredicate(merged)) // is the group complete?
{
completed = merged;
_aggregated.Remove(key);
}
}
else
{
_aggregated[key] = exchange; // first in the group
if (_completionPredicate(exchange)) { completed = exchange; _aggregated.Remove(key); }
}
}
if (completed != null)
await _target.Process(completed, ct); // only completed groups flow downstream
}
A live example from the EipRoutes.cs demo — collect 3 events sharing a batchId:
From("timer://agg-source?period=2000&repeatCount=9")
.RouteId("demo-aggregator")
.SetHeader("batchId", e => $"batch-{DateTime.UtcNow.Second % 3}")
.SetBody(e => $"event-{DateTime.UtcNow:ss.fff}")
.Aggregate(
correlationKey: e => GetHeader(e, "batchId") ?? "default",
aggregationStrategy: (oldEx, newEx) =>
{
oldEx.In.Body = $"{oldEx.In.Body} + {newEx.In.Body}";
var count = oldEx.Properties.TryGetValue("agg.count", out var c) ? (int)c! : 1;
oldEx.Properties["agg.count"] = count + 1;
return oldEx;
},
completionPredicate: e => e.Properties.TryGetValue("agg.count", out var c) && (int)c! >= 3)
.Log("[AGG] done — merged 3 events: ${body}"); // runs on the completed group
.Aggregate(...) opens a scope (AggregateDefinition) — the steps after it (.Log(...)) build the target pipeline that runs on completed groups. Pre-completion messages are consumed silently.
On Kafka it's the classic "collect N by key":
From("kafka:order-lines?brokers=kafka:9092&groupId=order-assembler")
.Aggregate(
correlationKey: e => e.In.Headers["redbKafka.Key"]?.ToString() ?? "x", // key = orderId
aggregationStrategy: (acc, cur) => AppendLine(acc, cur),
completionPredicate: e => IsOrderComplete(e))
.To("kafka:orders.assembled?brokers=kafka:9092&acks=All");
ℹ️ By default the inbound offset commits after processing. For an atomic "offset + publish-assembled-result" coupling, use
enableAutoCommit=false+.Transacted().
The hard limitation, said out loud
The group store is a plain Dictionary<string, IExchange> behind a lock:
private readonly Dictionary<string, IExchange> _aggregated = new(StringComparer.Ordinal);
private readonly object _lock = new();
From that follows:
- it's lost on a process restart — all in-flight groups vanish;
- there is no timeout / eviction — only the completion predicate. A group that never reaches its condition (you wait for 3 events, 2 arrived, the source died) sits in memory forever. A potential leak if you have many keys that don't always complete.
This is not the persistent, recovery-capable aggregator from "big" Camel (completion timeout, persistent repository, recovery). For "collect N by key and emit a batch, a deploy losing in-flight state is acceptable" — great. For "buffer for a day, survive a restart" — you need persistence that isn't here. Honestly. There's a PendingGroupCount for monitoring how many groups are pending — at least you can see the pile growing.
10. Transactions: two models, and that's the whole point
Here's the fork to internalize. redb.Route has two distinct transaction models, and the connectors split into two camps. Getting this fork straight is half of using the framework well.
Camp 1: deferred actions (ITransactedAction) — they do NOT touch System.Transactions
This is every broker plus redb: Kafka, Redis, RabbitMQ, AMQP, IBM MQ, Azure Service Bus, and redb itself. One mechanic for all. The transport doesn't do the "real" work immediately — it parks an ITransactedAction in the shared bag at exchange.Properties["TRANSACT_ACTION"] (a ConcurrentDictionary) under a per-message-unique key:
private static void RegisterTransactedAction(IExchange exchange, string key, ITransactedAction action)
{
if (!exchange.Properties.TryGetValue("TRANSACT_ACTION", out var raw) ||
raw is not ConcurrentDictionary<string, ITransactedAction> dict)
{
dict = new ConcurrentDictionary<string, ITransactedAction>(StringComparer.OrdinalIgnoreCase);
exchange.Properties["TRANSACT_ACTION"] = dict;
}
dict[key] = action;
}
Keys are kafka-send-{guid}, redis-write-{guid}, redb:{name}, an offset, a deliveryTag. Uniqueness is critical: parallel fan-out branches write into one shared bag (Clone() copies Properties shallowly → all clones share the dictionary), and without unique keys they'd clobber each other.
At the .Transacted() boundary, TransactedProcessor takes over:
public async Task Process(IExchange exchange, CancellationToken ct = default)
{
if (!exchange.Properties.ContainsKey(TransactActionPropertyKey))
exchange.Properties[TransactActionPropertyKey] = new ConcurrentDictionary<string, ITransactedAction>(...);
using var scope = _policy.CreateScope(); // System.Transactions.TransactionScope (AsyncFlow enabled)
try
{
await _inner.Process(exchange, ct);
await CommitActions(exchange, ct); // commit ALL deferred actions
scope.Complete();
}
catch (Exception ex)
{
await RollbackActions(exchange, ct); // on error — roll them all back
throw;
}
}
CommitActions walks the bag sequentially and commits each action — which bites us in the trade-off below:
foreach (var kvp in actions)
await kvp.Value.Commit(ct);
These transports never touch Transaction.Current. So parallel fan-out over them is safe: unique keys → no bag collisions → no concurrent enlistment in a single System.Transactions transaction.
redb joins this camp via RedbTransactedAction — it wraps a redb-native IRedbTransaction (its own BEGIN/COMMIT on its own connection) and drops it into the same bag under key redb:{name}:
public async Task Commit(CancellationToken ct = default)
{
if (Interlocked.Exchange(ref _completed, 1) != 0) return; // single-use
try { if (_tx.IsActive) await _tx.CommitAsync(); }
finally { await _tx.DisposeAsync(); }
}
Camp 2: SQL — it enlists in System.Transactions
The SQL connector behaves fundamentally differently (SqlProducer):
var connection = await factory.CreateConnectionAsync(readOnly: ..., ct);
// If an ambient TransactionScope exists (route-level .Transacted()), the connection auto-enlists —
// no local transaction needed. Otherwise always wrap in a local one (like EF SaveChanges).
var hasAmbientTx = Transaction.Current != null;
DbTransaction? tx = null;
if (!hasAmbientTx)
tx = await connection.BeginTransactionAsync(ct);
// ... execute ...
if (tx != null) await tx.CommitAsync(ct);
So:
-
no ambient → each execution is wrapped in a local
DbTransactionon its own connection (atomic, like EF'sSaveChanges, with no?transacted=true); -
an ambient
Transaction.Currentpresent (opened by.Transacted()/.BeginTransaction()) → it skips the local one and the connection auto-enlists in the ambientTransactionScope.
SQL is the one in-box connector that genuinely participates in System.Transactions. That's both a feature (one .Transacted() wrapper around several SQL writes = one atomic transaction, perfect for an outbox write) and a thing whose edges you must understand.
Why parallel Scatter-Gather doesn't blow up under a transaction
The reasonable fear: parallel branches run on different threads, Transaction.Current flows into each via ExecutionContext — won't that be concurrent use of one transaction and a promotion to MSDTC (which on PG/Linux simply throws)?
No. Here's why — from the code:
1. Scopes are created per-exchange. A named redb instance resolves through GetRedbService(name, exchange) (RedbRouteExtensions), which caches a DI scope under exchange.Properties["__redb_scope:{name}"] and pulls the service from that scope's provider:
var cacheKey = ScopeCachePrefix + cleanName; // "__redb_scope:orders-db"
if (exchange.Properties.TryGetValue(cacheKey, out var cached) && cached is IServiceScope cachedScope)
return cachedScope.ServiceProvider.GetRequiredService<IRedbService>();
var scope = factory.CreateScope(); // not cached — create our own
exchange.Properties[cacheKey] = scope;
return scope.ServiceProvider.GetRequiredService<IRedbService>();
2. Clone() deliberately does NOT copy scopes. From Exchange.cs:
foreach (var kvp in _properties)
{
// Named redb scopes are per-exchange; the child creates its own on first access.
if (kvp.Key.StartsWith("__redb_scope:", StringComparison.Ordinal))
continue;
clone._properties[kvp.Key] = kvp.Value;
}
So each parallel branch spins up its own scope → its own IRedbService → its own connection.
3. redb never enlists in System.Transactions — it uses its own IRedbTransaction, parked in TRANSACT_ACTION.
Net result: there is no single System.Transactions transaction being hit concurrently by multiple connections → no MSDTC promotion, no "transaction context in use by another thread," no crash. Different connections, not one transaction — that's the design, not a bug.
Same story for SQL. In a Scatter-Gather without a .Transacted() wrapper, each branch opens its own connection (CreateConnectionAsync per Process) and its own local DbTransaction → independent atomic writes → no crash. The only way to hit the multi-connection-enlistment-to-MSDTC crash is to deliberately wrap a parallel fan-out in .Transacted() (an ambient System.Transactions scope) and fan out to several SQL connections — on PG/Npgsql that escalates to a distributed transaction and throws. A narrow edge nobody walks into, because the deferred model exists for brokers/redb.
So what is the 3.2.0 DependentTransactionBranch for, then?
That 3.2.0 per-branch isolation fix is wired only into Multicast and Splitter (plus the helper itself) — nowhere else. Here's its core (DependentTransactionBranch):
internal static async Task RunAsync(Func<Task> branch)
{
var ambient = Transaction.Current;
if (ambient is null) { await branch(); return; } // no transaction — zero overhead
// Fork a dependent clone: this branch's enlistment is private to its thread.
var dependent = ambient.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
using (var scope = new TransactionScope(dependent, TransactionScopeAsyncFlowOption.Enabled))
{
await branch();
scope.Complete();
}
dependent.Complete(); // release the parent commit (it waited on BlockCommitUntilComplete)
}
It matters only for resources that genuinely enlist in Transaction.Current — i.e. when parallel Multicast/Splitter branches do inline work on an enlisting connection. Scatter-Gather doesn't need it: per-exchange scopes + deferred ITransactedAction already give isolation without sharing a connection. Not a missing-feature asymmetry — two mechanisms for two situations.
The honest trade-off (a design choice, not a bug)
TransactedProcessor.CommitActions commits the deferred actions sequentially in a loop, and Scatter-Gather/Multicast branches are different connections, different transactions. So: if branch A commits and branch B's commit throws, you get a partial commit. No cross-branch DB atomicity here. That's the price of "no crash, no MSDTC promotion." If you need real cross-branch atomicity with enlistment, that's Multicast/Splitter under .Transacted() with the 3.2.0 fix — not Scatter-Gather.
Transaction policies
.Transacted() is backed by TransactionPolicy — four ready-made:
| Policy | ScopeOption |
Behavior |
|---|---|---|
Default |
Required |
Join the ambient or create a new one. 30s timeout, ReadCommitted. |
RequiresNew |
RequiresNew |
Always a new transaction; the ambient is suspended. |
Suppress |
Suppress |
Run with no transaction (ambient suppressed). |
Mandatory |
(marker) | Requires an existing ambient, else InvalidOperationException at scope creation. |
TransactionDefinition also gives Camel-parity hooks: .Retry(attempts, delay) (wraps the body in a RetryProcessor) and .DeadLetterChannel(uri) (on failure after rollback, send the exchange to the DLC). So a transactional block with retries and dead-letter is:
From("kafka:orders?brokers=kafka:9092&groupId=w")
.Transacted()
.Retry(3, TimeSpan.FromMilliseconds(200))
.To("sql:INSERT INTO orders ...")
.To("kafka:orders.done?brokers=kafka:9092&transacted=true")
.EndTransaction();
11. The parallel cousins: Splitter and Multicast
Scatter-Gather isn't the only fan-out in the family. Next to it sit Splitter (split the body into parts, process each) and Multicast (send a copy to N processors). Both, like Scatter-Gather, have an optional aggregation, parallelism, and stopOnException. The difference is where the "branches" come from:
- Scatter-Gather: branches = endpoints (producer URIs), aggregator mandatory.
-
Multicast: branches = processors / sub-pipelines, aggregator optional (
MulticastProcessor). -
Splitter: branches = body parts (
SplitterProcessor), aggregator optional, with some Camel-compat nuances.
Splitter is actually the most feature-rich on aggregation: its strategy (IExchange?, IExchange) → IExchange is called even for the first part with oldExchange == null (Camel's seed/wrap contract), and there are flags parallelAggregate (aggregate inline under a lock from workers instead of a deterministic post-pass) and aggregateOnException (include failed parts in the aggregate). And it's Splitter/Multicast's parallel path that carries the DependentTransactionBranch.RunAsync:
// MulticastProcessor.ProcessParallel / SplitterProcessor.ProcessParallel
await DependentTransactionBranch.RunAsync(() => _targets[idx].Process(clone, ct));
Why they have it and Scatter-Gather doesn't — covered above: Splitter/Multicast run inline processors (which can open an enlisting connection right in the branch), whereas Scatter-Gather runs producers by URI with per-exchange scopes. Different risks, different mechanisms.
A simple Multicast from the demo:
From("direct://demo-multicast")
.Multicast(new[] { "direct://mcast-a", "direct://mcast-b", "direct://mcast-c" }, parallelProcessing: true)
.Log("[MCAST] ◀ All endpoints received the message");
12. Saga — "a slightly different one"
redb.Route has a saga, but it's not MassTransit's durable state machine. It's SagaProcessor — an in-process compensating saga within a single exchange:
public async Task Process(IExchange exchange, CancellationToken ct = default)
{
var completedCount = 0;
try
{
for (var i = 0; i < _steps.Length; i++)
{
ct.ThrowIfCancellationRequested();
await _steps[i].Action(exchange, ct);
completedCount++;
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
// Compensate completed steps in REVERSE order
for (var i = completedCount - 1; i >= 0; i--)
{
if (_steps[i].Compensate is null) continue;
try { await _steps[i].Compensate!(exchange, ct); }
catch (Exception compEx) { _logger?.LogError(compEx, "Saga compensation for step {i} failed", i); }
}
throw;
}
if (_onCompletion is not null) await _onCompletion(exchange, ct);
}
Compensation failures are logged and don't abort the rest of the rollback. The DSL (SagaDefinition) comes in two styles, callback and fluent-scope:
From("direct:checkout")
.Saga(s => s
.Step(reserve, compensate: unreserve)
.Step(charge, compensate: refund)
.Step(ship) // forward-only, no compensation
.OnCompletion(e => Log("order placed")));
Bluntly: no state persistence, no correlation across messages/time, does not survive a process restart. It's Camel-style "routing slip with compensation," not a persisted state machine. Only the name overlaps. There are metrics, by the way: SagaCompleted / SagaCompensated / SagaFailed.
Again — not a shortcoming, a different school. Need a durable saga with persistence over Kafka? Build it from Aggregator + a SQL/Redis store + a correlation key. The bricks are there.
13. Outbox — there isn't one, and that's correct
Grep across src for Outbox — zero. No built-in outbox. That's not a gap, it's a stance: the transactional outbox is a pattern, not a framework button. Baking it into the core is over-engineering; anyone who wants one wires their own in five lines for their own invariants:
// 1. Write: business data + an outbox row under one transaction.
// For SQL this is exactly where the System.Transactions enlistment works FOR you:
// one .Transacted() wrapper → one ambient scope → atomic.
From("direct:place-order")
.Transacted()
.To("sql:INSERT INTO orders(id, payload) VALUES (@id, @payload)")
.To("sql:INSERT INTO outbox(id, topic, payload, sent) VALUES (@id, 'orders', @payload, false)")
.EndTransaction();
// 2. Deliver: a separate poller route reads the outbox and publishes.
From("sql:SELECT * FROM outbox WHERE sent = false ORDER BY id?outputType=SelectList&delay=1000")
.Split(body => (IEnumerable<object?>)body) // one message per row
.To("kafka:orders?brokers=kafka:9092")
.To("sql:UPDATE outbox SET sent = true WHERE id = @id");
Want the inbox pattern, dedup by messageId, a TTL on rows, a claim-check for big payloads? Add it. Nobody forces their outbox schema or retry semantics on you. That is "moving toward Camel": less baked-in magic, more explicit assembly.
14. Bottom line: a trade, not "better/worse"
| MassTransit | redb.Route (Camel school) | |
|---|---|---|
| Saga | durable state machine, persisted, survives restarts | in-process compensation within an exchange |
| Outbox | a framework feature | your own route from SQL/Redis in five lines |
| Transactions | a framework abstraction | two explicit models: deferred ITransactedAction (brokers+redb) / System.Transactions (SQL) |
| Kafka "transactional" | EOS out of the box | idempotent producer + deferred commit (not EOS) |
| Philosophy | batteries included, do it their way | EIP + connectors + DSL, compose it yourself |
This isn't "redb.Route beats MassTransit." It's a different trade: less baked-in magic, more explicit assembly from primitives. If your scenarios don't fit someone else's saga/outbox model, the Camel approach lets you assemble exactly yours. If they do fit, maybe you don't need to leave at all.
Scatter-Gather, for the record, has been rock-solid for us in production — one processor that fans out and joins, in parallel, with a deterministic merge order and transaction semantics you can actually reason about. Exactly the case where an EIP primitive solves the real problem without a single line of infrastructure code.
We run 3.2.0 in production — CHANGELOG here. War stories, things to re-verify (looking at you, transacted Kafka), which EIP to dissect next — drop them in the comments and we'll dig through the code together.

Top comments (0)