DEV Community

Cover image for Redis as a Database — Live Leaderboards With PubSub and GraphQL Subscriptions
Marius Muntean
Marius Muntean

Posted on • Edited on

2

Redis as a Database — Live Leaderboards With PubSub and GraphQL Subscriptions

Say you’re tracking the score of the players in your game. To each player name you associate a score. I’d be nice to always know who the top 50 or so players are and to get updates if the ranking changes.

To implement such a feature, we can make use of Redis’ Sorted Set data structure, Pub/Sub and GraphQL subscriptions.

This post dovetails with a previous one, called **Redis as a Database — Live Data Updates With PubSub and GraphQL Subscriptions**, so I highly recommend you check that one out first.

TL;DR

I’m using Visualizer, **my pet project that I’ve talked about it in my previous posts about Redis, to show how to implement a leaderboard . The gist of it is that **Visualizer ingests tweets from Twitter’s v2 API and stores them locally, in a Redis instance. Using the many capabilities of Redis, Visualizer offers a set of nice features, including a leaderboard (though in Visualizer it call it differently).

Algorithm: continuously track the top of the leaderboard, compare the current version with the previous one and publish an update in case that there are differences.

Find the backend code here and the frontend code here.

Ranked Hashtags

In Visualizer, instead of a game leaderboard, I’m tracking all hashtags that are used in the ingested tweets. Hashtags are ranked according to their frequency, i.e. the more they’re used the higher their rank will be. The frontend can subscribe to the top n hashtags with a GraphQL subscription. When there is a change in the top n hashtags, i.e. the rank of some hashtags changes, the frontend will receive an update with the current top n hashtags.

Backend

Visualizer — Partial architecture

Both Visualizer API and Visualizer Ingestion are involved in offering an up to date list of the top n hashtags.

Whenever the frontend subscribes to the top n hashtags via a GraphQL subscription, Visualizer API keeps track of the subscription amount and the subscription object.

private readonly IDatabase _database;
private readonly ConcurrentDictionary<int, ReplaySubject<RankedHashtag[]>> _amountToRankedHashtagsMap;
public TweetHashtagService(IDatabase database, ISubscriber subscriber)
{
_database = database;
_amountToRankedHashtagsMap = new ConcurrentDictionary<int, ReplaySubject<RankedHashtag[]>>();
var rankedHashtagChannel = new RedisChannel(HashtagConstants.RankedHashtagChannelName, RedisChannel.PatternMode.Literal);
var rankedHashtagChannelMessageQueue = subscriber.Subscribe(rankedHashtagChannel);
rankedHashtagChannelMessageQueue.OnMessage(OnRankedHashtagMessage);
}
public IObservable<RankedHashtag[]> GetTopRankedHashtagsObservable(int amount = 10)
{
var rankedHashtagsObservable = _amountToRankedHashtagsMap.GetOrAdd(amount, a => new ReplaySubject<RankedHashtag[]>(1));
return rankedHashtagsObservable.AsObservable();
}

Meanwhile, Visualizer Ingestion might be ingesting tweets. Every encountered hashtag is first added to the hashtags Sorted Set which automatically ranks it and then the hashtag is published with its new rank in the HashtagRanked Pub/Sub channel.

Back in Visualizer API, every time a ranked hashtag message arrives in the HashtagRanked *channel, every subscriber is updated if necessary. For each subscriber, the key to the Sorted Set containing the previous top *n** hashtags is computed. Then, each PreviousRankedHashtagsForAmount:n Sorted Set is compared with the current top n hashtags from the hashtags Sorted Set. In case a discrepancy between the current top n hashtags and the previous top n hashtags is detected, then the current top n hashtags are published to the GraphQL subscription and the PreviousRankedHashtagsForAmount:n *Sorted Set **is overwritten with the current value.

private async void OnRankedHashtagMessage(ChannelMessage message)
{
// Deserialise the message.
string rankedHashtagStr = message.Message;
var rankedHashtag = JsonConvert.DeserializeObject<RankedHashtag>(rankedHashtagStr);
// Every time a hashtag is ranked, check if any subscribers need to be informed or if housekeeping can be performed.
foreach (var (amount, rankedHashtagStream) in _amountToRankedHashtagsMap)
{
// Housekeeping if necessary - removes the subscription from the dictionary and deletes the temporary sorted set
var previousRankedHashtagsForCurrentAmountKey = new RedisKey($"PreviousRankedHashtagsForAmount:{amount}");
var housekeepingPerformed = await TrimSubscriptionIfNecessary(rankedHashtagStream, amount, previousRankedHashtagsForCurrentAmountKey).ConfigureAwait(false);
if (housekeepingPerformed)
{
continue;
}
// Compare the current top "amount" of ranked hashtags with the previous top "amount".
// Inform subscribers if distinct and then store new top "amount" of ranked hashtags.
var currentTopRankedHashtags = await GetTopHashtags(amount).ConfigureAwait(false);
var previousTopRankedHashtags = await GetRankedHashtags(previousRankedHashtagsForCurrentAmountKey).ConfigureAwait(false);
var distinct = currentTopRankedHashtags.Length != previousTopRankedHashtags.Length || currentTopRankedHashtags.Zip(previousTopRankedHashtags)
.Any(tuple => tuple.First.Name != tuple.Second.Name || Math.Abs(tuple.First.Rank - tuple.Second.Rank) > float.Epsilon);
if (distinct)
{
// Inform subscribers
rankedHashtagStream.OnNext(currentTopRankedHashtags);
// Store current ranked hashtags as previous
await ReplaceCurrentRankedHashtags(previousRankedHashtagsForCurrentAmountKey, currentTopRankedHashtags).ConfigureAwait(false);
}
}
}
private async Task<RankedHashtag[]> GetRankedHashtags(RedisKey key)
{
var sortedSet = await _database.SortedSetRangeByRankWithScoresAsync(key, 0, order: Order.Descending).ConfigureAwait(false);
return ToRankedHashtags(sortedSet);
}
private async Task ReplaceCurrentRankedHashtags(RedisKey previousKey, RankedHashtag[] currentRankedHashtags)
{
await _database.KeyDeleteAsync(previousKey).ConfigureAwait(false);
var sortedSetEntries = currentRankedHashtags.Select(scoredHashtag => new SortedSetEntry(new RedisValue(scoredHashtag.Name), scoredHashtag.Rank)).ToArray();
await _database.SortedSetAddAsync(previousKey, sortedSetEntries).ConfigureAwait(false);
}
private async Task<bool> TrimSubscriptionIfNecessary(ReplaySubject<RankedHashtag[]> rankedHashtagsSubject, int amount, RedisKey previousKey)
{
if (rankedHashtagsSubject.HasObservers == false)
{
if (_amountToRankedHashtagsMap.TryRemove(amount, out var _))
{
Console.WriteLine($"Removed subscription {amount}");
if (await _database.KeyDeleteAsync(previousKey).ConfigureAwait(false))
{
Console.WriteLine($"Removed the sorted set for {amount}");
}
return true;
}
}
return false;
}
private static RankedHashtag[] ToRankedHashtags(SortedSetEntry[] range)
{
return range.Select(entry => new RankedHashtag {Name = entry.Element.ToString(), Rank = entry.Score}).ToArray();
}
private async void OnRankedHashtagMessage(ChannelMessage message)
{
// Deserialise the message.
string rankedHashtagStr = message.Message;
var rankedHashtag = JsonConvert.DeserializeObject<RankedHashtag>(rankedHashtagStr);
// Every time a hashtag is ranked, check if any subscribers need to be informed or if housekeeping can be performed.
foreach (var (amount, rankedHashtagStream) in _amountToRankedHashtagsMap)
{
// Housekeeping if necessary - removes the subscription from the dictionary and deletes the temporary sorted set
var previousRankedHashtagsForCurrentAmountKey = new RedisKey($"PreviousRankedHashtagsForAmount:{amount}");
var housekeepingPerformed = await TrimSubscriptionIfNecessary(rankedHashtagStream, amount, previousRankedHashtagsForCurrentAmountKey).ConfigureAwait(false);
if (housekeepingPerformed)
{
continue;
}
// Compare the current top "amount" of ranked hashtags with the previous top "amount".
// Inform subscribers if distinct and then store new top "amount" of ranked hashtags.
var currentTopRankedHashtags = await GetTopHashtags(amount).ConfigureAwait(false);
var previousTopRankedHashtags = await GetRankedHashtags(previousRankedHashtagsForCurrentAmountKey).ConfigureAwait(false);
var distinct = currentTopRankedHashtags.Length != previousTopRankedHashtags.Length || currentTopRankedHashtags.Zip(previousTopRankedHashtags)
.Any(tuple => tuple.First.Name != tuple.Second.Name || Math.Abs(tuple.First.Rank - tuple.Second.Rank) > float.Epsilon);
if (distinct)
{
// Inform subscribers
rankedHashtagStream.OnNext(currentTopRankedHashtags);
// Store current ranked hashtags as previous
await ReplaceCurrentRankedHashtags(previousRankedHashtagsForCurrentAmountKey, currentTopRankedHashtags).ConfigureAwait(false);
}
}
}
private async Task<RankedHashtag[]> GetRankedHashtags(RedisKey key)
{
var sortedSet = await _database.SortedSetRangeByRankWithScoresAsync(key, 0, order: Order.Descending).ConfigureAwait(false);
return ToRankedHashtags(sortedSet);
}
private async Task ReplaceCurrentRankedHashtags(RedisKey previousKey, RankedHashtag[] currentRankedHashtags)
{
await _database.KeyDeleteAsync(previousKey).ConfigureAwait(false);
var sortedSetEntries = currentRankedHashtags.Select(scoredHashtag => new SortedSetEntry(new RedisValue(scoredHashtag.Name), scoredHashtag.Rank)).ToArray();
await _database.SortedSetAddAsync(previousKey, sortedSetEntries).ConfigureAwait(false);
}
private async Task<bool> TrimSubscriptionIfNecessary(ReplaySubject<RankedHashtag[]> rankedHashtagsSubject, int amount, RedisKey previousKey)
{
if (rankedHashtagsSubject.HasObservers == false)
{
if (_amountToRankedHashtagsMap.TryRemove(amount, out var _))
{
Console.WriteLine($"Removed subscription {amount}");
if (await _database.KeyDeleteAsync(previousKey).ConfigureAwait(false))
{
Console.WriteLine($"Removed the sorted set for {amount}");
}
return true;
}
}
return false;
}
private static RankedHashtag[] ToRankedHashtags(SortedSetEntry[] range)
{
return range.Select(entry => new RankedHashtag {Name = entry.Element.ToString(), Rank = entry.Score}).ToArray();
}

**Note
*: there may be a more efficient way of checking if an update has to be sent out to the subscribers using ZDIFF/ZDIFFSTORE (see all sorted set command), but I haven’t tried. Let me know if you’re interested in that.

To finish the backend part, let’s have a look at the GraphQL subscription definition.

public class VisualizerSubscription : ObjectGraphType
{
private readonly ITweetHashtagService _tweetHashtagService;
public VisualizerSubscription(IServiceProvider provider)
{
_tweetHashtagService = provider.CreateScope().ServiceProvider.GetRequiredService<ITweetHashtagService>();
AddField(new FieldType
{
Name = "topRankedHashtags",
Description = "Top X ranked hashtags are published whenever they change",
Type = typeof(ListGraphType<RankedHashtagTypeQl>),
Arguments = new QueryArguments(new QueryArgument<IntGraphType> {Name = "amount", DefaultValue = 10}),
Resolver = new FuncFieldResolver<RankedHashtag[]>(ResolveRankedHashtags),
StreamResolver = new SourceStreamResolver<RankedHashtag[]>(GetRankedHashtagsObservable)
});
}
private RankedHashtag[] ResolveRankedHashtags(IResolveFieldContext context)
{
var message = context.Source as RankedHashtag[];
return message!;
}
private IObservable<RankedHashtag[]> GetRankedHashtagsObservable(IResolveFieldContext context)
{
var amount = context.GetArgument<int>("amount");
return _tweetHashtagService.GetTopRankedHashtagsObservable(amount);
}
}

Subscribing to topRankedHashtags is handled in GetRankedHashtagsObservable(…) where _t*weetHashtagService.GetTopRankedHashtagsObservable(amount)* is called, which creates or returns an existing subscription for the requested amount (see the first code gist).

Frontend

Visualizer Frontend uses the excellent Apollo client together with the graphql-codegen to make my life easier when working with a graphQL API.

First, the frontend retrieves the current top n hashtags so that it can display something.

import { gql } from '@apollo/client'
export const GetHashtags = gql`
query getHashtags($amount: Int!) {
hashtag {
topRankedHashtags(amount: $amount) {
rank
name
}
}
}
`

But instead of manually calling this query, graphql-codegen generates a nice little hook that I can call
// Get the top hashtags from the database
const { loading: loadingHashtags, data: hashtagsData } = useGetHashtagsQuery({ variables: { amount: topHashtagAmount } })

Next, the frontend subscribes to topRankedHashtags.
import { gql } from '@apollo/client'
export const TopRankedHashtags = gql`
subscription topRankedHashtagsChanged($amount: Int) {
topRankedHashtags(amount: $amount) {
name
rank
}
}
`

Which looks like this with the generated hook
// Subscribe to the top hashtags
const { loading: loadingRankedHashtagsChanged, data: rankedHashtagsChanged } = useTopRankedHashtagsChangedSubscription({
variables: { amount: topHashtagAmount },
fetchPolicy: 'network-only',
})

From here on nothing super interesting happens. Whenever new data is available from the subscription, the useTopRankedHashtagsChangedSubscription hook triggers a re-render and the current data is shown in a word cloud — permalink to the whole component https://github.com/mariusmuntean/VisualizerFrontend/blob/8db3b9f1770b9ea74a60ac88fedc4a89c0301e71/src/components/rankedHashtagsChanged/liveRankedHashtags.tsx#L130

So the essence of a feature like a live updating leaderboard is to leverage Redis’ Sorted Sets to get an up to date view of the data and compare it to the previous state.

Thanks for reading. If you liked this post give it a ❤️ and follow me to stay up to date with Redis.

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read more

Top comments (0)

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more