DEV Community

Derek Comartin - CodeOpinion
Derek Comartin - CodeOpinion

Posted on • Originally published at codeopinion.com on

4

Event Sourcing with SQL Stream Store

Event Sourcing with SQL Stream Store

I’ve known about SQL Stream Store for a bit (I believe when it was called Cedar) but haven’t really looked into it much. The premise is to provide a stream store over SQL. Currently supporting MS SQL Server, PostgreSQL, and MySQL. Meaning it is simply a .NET Library you can use with an SQL implementation. Let’s take a look at how you can implement Event Sourcing with SQL Stream Store.

SQL Stream Store Demo Application

For this demo/sample, I’m going to create a .NET Core 3 console app. The idea will be to create the stereotypical event-sourcing example of a bank account.

I wanted to explore the primary functions needed of an event stream.

  • Create a Stream
  • Append a new Event to a Stream
  • Read Events from a Stream
  • Subscribe to a Stream to receive appended Events

NuGet Package

As always, the first thing is to get the SqlStreamStore NuGet Package by adding it as a PackageReference in your csproj.

<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="SqlStreamStore" Version="1.1.3" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
</ItemGroup>
</Project>
view raw csproj.xml hosted with ❤ by GitHub

I’ve also included Newtonsoft.Json because I’m going to be (de)serializing events.

Events

I’m going to create two events to represent the bank account transactions. Deposited and Withdrawn. Both of these will extend the abstract AccountEvent that will contain the TransactionId, Dollar Amount, and DateTime of the event.

using System;
namespace SqlStreamStore.Demo
{
public abstract class AccountEvent
{
public Guid TransactionId { get; }
public decimal Amount { get; }
public DateTime DateTime { get; }
public AccountEvent(Guid transactionId, decimal amount, DateTime dateTime)
{
TransactionId = transactionId;
Amount = amount;
DateTime = dateTime;
}
}
public class Deposited : AccountEvent
{
public Deposited(Guid transactionId, decimal amount, DateTime dateTime) : base(transactionId, amount, dateTime)
{
}
}
public class Withdrawn : AccountEvent
{
public Withdrawn(Guid transactionId, decimal amount, DateTime dateTime) : base(transactionId, amount, dateTime)
{
}
}
}
view raw events.cs hosted with ❤ by GitHub

Creating a Stream

There is no way of creating an empty stream. This is actually really nice. Instead when appending an event to a stream, if the stream does not already exist, it’s created. This is the same how EventStore works and should also feel familiar in comparison to the API.

Appending to a Stream

Appending a new event to a stream is pretty straight forward. From the IStreamStore there is an AppendToStream method that takes a few args.

The first is StreamId. This generally would represent your aggregate root ID. In this demo, I’m setting it Account:{GUID}.

The second arg is ExpectedVersion. This is also similar to the EventStore API. This is used for handling concurrency. You can specify an integer that represents the number of events that are persisted in the stream. You can also use the ExpectedVersion enum that can specify Any to not concern yourself with concurrency or NoStream to verify its the first event.

Finally, the 3rd param is an instance of NewStreamMessage. It contains the MessageId (GUID), an event name and the event json body.

An interesting takeaway here is the event name is intentionally made as a string so you are not serializing/deserializing to the CRL type name. This is a great idea since the CLR type name is likely to be changed more than just a plain string which you can keep constant.

using System;
using System.Threading.Tasks;
using Newtonsoft.Json;
using SqlStreamStore.Streams;
namespace SqlStreamStore.Demo
{
public class Account
{
private readonly StreamId _streamId;
private readonly IStreamStore _streamStore;
public Account(IStreamStore streamStore, StreamId streamId)
{
_streamId = streamId;
_streamStore = streamStore;
}
public async Task<Guid> Deposit(decimal amount)
{
var trx = Guid.NewGuid();
var deposit = new Deposited(trx, amount, DateTime.UtcNow);
await _streamStore.AppendToStream(_streamId, ExpectedVersion.Any, new NewStreamMessage(trx, "Deposited", JsonConvert.SerializeObject(deposit)));
return trx;
}
public async Task<Guid> Withdrawal(decimal amount)
{
var trx = Guid.NewGuid();
var deposit = new Withdrawn(trx,amount, DateTime.UtcNow);
await _streamStore.AppendToStream(_streamId, ExpectedVersion.Any, new NewStreamMessage(trx, "Withdrawn", JsonConvert.SerializeObject(deposit)));
return trx;
}
}
}

Reading a Stream

You can read a stream forward or backward. Forward meaning from the very first event until the last, which is what I’ll use in this example.

You simply specify the StreamId , the starting version (0) and how many events to pull from the stream. The result contains an IsEnd to indicate there are no more events left in the stream to read.

using System;
using System.Threading.Tasks;
using Newtonsoft.Json;
using SqlStreamStore.Streams;
namespace SqlStreamStore.Demo
{
public class Account
{
private readonly StreamId _streamId;
private readonly IStreamStore _streamStore;
public Account(IStreamStore streamStore, StreamId streamId)
{
_streamId = streamId;
_streamStore = streamStore;
}
public async Task<Guid> Deposit(decimal amount)
{
var trx = Guid.NewGuid();
var deposit = new Deposited(trx, amount, DateTime.UtcNow);
await _streamStore.AppendToStream(_streamId, ExpectedVersion.Any, new NewStreamMessage(trx, "Deposited", JsonConvert.SerializeObject(deposit)));
return trx;
}
public async Task<Guid> Withdrawal(decimal amount)
{
var trx = Guid.NewGuid();
var deposit = new Withdrawn(trx,amount, DateTime.UtcNow);
await _streamStore.AppendToStream(_streamId, ExpectedVersion.Any, new NewStreamMessage(trx, "Withdrawn", JsonConvert.SerializeObject(deposit)));
return trx;
}
public async Task Transactions()
{
decimal balance = 0;
var endOfStream = false;
var startVersion = 0;
while (endOfStream == false)
{
var stream = await _streamStore.ReadStreamForwards(_streamId, startVersion, 10);
endOfStream = stream.IsEnd;
startVersion = stream.NextStreamVersion;
foreach (var msg in stream.Messages)
{
switch (msg.Type)
{
case "Deposited":
var depositedJson = await msg.GetJsonData();
var deposited = JsonConvert.DeserializeObject<Deposited>(depositedJson);
Console.WriteLine($"Deposited: {deposited.Amount:C} @ {deposited.DateTime} ({deposited.TransactionId})");
balance += deposited.Amount;
break;
case "Withdrawn":
var withdrawnJson = await msg.GetJsonData();
var withdrawn = JsonConvert.DeserializeObject<Withdrawn>(withdrawnJson);
Console.WriteLine($"Withdrawn: {withdrawn.Amount:C} @ {withdrawn.DateTime} ({withdrawn.TransactionId})");
balance -= withdrawn.Amount;
break;
}
}
}
Console.WriteLine($"Balance: {balance:C}");
}
}
}
view raw Account-Read.cs hosted with ❤ by GitHub

Subscribing to a Stream

Subscribing to a stream is pretty straight forward. Specify the StreamId , the starting version/position of where you want to be notified of new events from, and then StreamMessageReceived for handling the newly appended event.

using System;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using SqlStreamStore.Streams;
namespace SqlStreamStore.Demo
{
public class BalanceProjection
{
public Balance Balance { get; private set; } = new Balance(0, DateTime.UtcNow);
public BalanceProjection(IStreamStore streamStore, StreamId streamId)
{
streamStore.SubscribeToStream(streamId, null, StreamMessageReceived);
}
private async Task StreamMessageReceived(IStreamSubscription subscription, StreamMessage streamMessage, CancellationToken cancellationToken)
{
switch (streamMessage.Type)
{
case "Deposited":
var depositedJson = await streamMessage.GetJsonData(cancellationToken);
var deposited = JsonConvert.DeserializeObject<Deposited>(depositedJson);
Balance = Balance.Add(deposited.Amount);
break;
case "Withdrawn":
var withdrawnJson = await streamMessage.GetJsonData(cancellationToken);
var withdrawn = JsonConvert.DeserializeObject<Withdrawn>(withdrawnJson);
Balance = Balance.Subtract(withdrawn.Amount);
break;
}
}
}
public struct Balance
{
public Balance(decimal amount, DateTime asOf)
{
Amount = amount;
AsOf = asOf;
}
public decimal Amount { get; }
public DateTime AsOf { get; }
public Balance Add(decimal value)
{
return new Balance(Amount + value, DateTime.UtcNow);
}
public Balance Subtract(decimal value)
{
return new Balance(Amount - value, DateTime.UtcNow);
}
}
}
view raw projection.cs hosted with ❤ by GitHub

Wrapping it up

Now that I’ve covered all the primary aspects, it’s just a matter of adding some input/output to our console app by allowing the user to deposit and withdrawal from the account.

In the demo, I’m using the InMemoryStreamStore so there is no persisted data upon restarting the app. I’m also using a new GUID to represent the AccountId on each run.

using System;
using System.Threading.Tasks;
using SqlStreamStore.Streams;
namespace SqlStreamStore.Demo
{
static class Program
{
private static InMemoryStreamStore _streamStore;
private static Account _account;
private static BalanceProjection _balanceProjection;
static async Task Main()
{
var streamId = new StreamId($"Account:{Guid.NewGuid()}");
_streamStore = new InMemoryStreamStore();
_account = new Account(_streamStore, streamId);
_balanceProjection = new BalanceProjection(_streamStore, streamId);
var key = string.Empty;
while (key != "X")
{
Console.WriteLine("D: Deposit");
Console.WriteLine("W: Withdrawal");
Console.WriteLine("B: Balance");
Console.WriteLine("T: Transactions");
Console.WriteLine("X: Exit");
Console.Write("> ");
key = Console.ReadLine()?.ToUpperInvariant();
Console.WriteLine();
switch (key)
{
case "D":
var depositAmount = GetAmount();
if (depositAmount.IsValid)
{
var depositTrx = await _account.Deposit(depositAmount.Amount);
Console.WriteLine($"Deposited: {depositAmount.Amount:C} ({depositTrx})");
}
break;
case "W":
var withdrawalAmount = GetAmount();
if (withdrawalAmount.IsValid)
{
var withdrawalTrx = await _account.Withdrawal(withdrawalAmount.Amount);
Console.WriteLine($"Withdrawn: {withdrawalAmount.Amount:C} ({withdrawalTrx})");
}
break;
case "B":
Balance();
break;
case "T":
await _account.Transactions();
break;
}
Console.WriteLine();
}
}
private static (decimal Amount, bool IsValid) GetAmount()
{
Console.Write("Amount: ");
if (decimal.TryParse(Console.ReadLine(), out var amount))
{
return (amount, true);
}
Console.WriteLine("Invalid Amount.");
return (0, false);
}
private static void Balance()
{
Console.WriteLine($"Balance: {_balanceProjection.Balance.Amount:C} as of {_balanceProjection.Balance.AsOf}");
}
}
}
view raw program.cs hosted with ❤ by GitHub

Source Code

All the source code shown in this post is available on GitHub.

This was a quick look at just a few of the APIs in SQL Stream Store but should give you a feel for how it works.

If you’d like a more detailed contented related to various aspects of event sourcing, let me know in the comments or on twitter.

https://github.com/dcomartin/SqlStreamStore.Demo

Related Links:

Follow @codeopinion

The post Event Sourcing with SQL Stream Store appeared first on CodeOpinion.

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

Top comments (0)

Billboard image

Create up to 10 Postgres Databases on Neon's free plan.

If you're starting a new project, Neon has got your databases covered. No credit cards. No trials. No getting in your way.

Try Neon for Free →

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay