DEV Community

Cover image for Real-Time JSON Parsing from Semantic Kernel Streams in .NET
Matteo Bortolazzo
Matteo Bortolazzo

Posted on

Real-Time JSON Parsing from Semantic Kernel Streams in .NET

The problem

LLMs like GPT-4o are now
good at generating JSON, which opens up many possibilities.

Most of the time, we can wait for the LLM to complete the generation, parse the answer and return to the UI.

However, given the speed of LLMs, it can be frustrating for users to wait for the generation to be complete.

The best solution would be to display the generated content incrementally as soon as possible. This is relatively easy with
text, but it's a bit more complicated with JSON as we need to ensure the content is valid at each step.
We need to parse the JSON as it's generated, understand its structure, and act accordingly.

We'll build a Life’s to-do list generator. The LLM will generate a list of tasks, and we'll display them as soon as they are generated.

We'll make it more complex with the following schema:

{
  "listName": "Bucket List",
  "items": [
    {
      "recommendedAge": 30,
      "description": "Skydiving"
    },
    {
      "recommendedAge": 50,
      "description": "Visit all seven continents"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

There are two ready-to-use tools we can use:

It should be straightforward to combine these two tools to achieve our goal. There's even a section in the
documentation: Read from a stream using Utf8JsonReader!

The actual problem

There are multiple challenges:

  • The reader example uses a MemoryStream while Semantic Kernel uses IAsyncEnumerable<StreamingTextContent>.
  • Utf8JsonReader is a ref struct, so:
    • It doesn't work with streams anyway, only with ReadOnlySpan<byte> in the constructor.
    • It can't be passed as a parameter to an async method.
    • It can't be used across await or yield boundaries.
    • It's a lexer/tokenizer, not a parser, so we need to handle the JSON structure ourselves.

The solution

We need to solve two problems:

  • How to use Utf8JsonReader with IAsyncEnumerable<StreamingTextContent>.
  • How to parse the JSON structure incrementally.

Let's start with the latter, as it's simpler.

The parser

The primary method of Utf8JsonReader is Read(). A simple JSON like { "name": "test" } will generate the
following tokens:

  • StartObject
  • PropertyName
  • String
  • EndObject

Each time we call Read(), the reader moves forward by a token, and we use:

  • TokenType to know the type.
  • ValueSpan and other methods are used to get its value
  • The bool returned to know if more tokens exist to read.

The interface for this is quite simple:

public interface IIncrementalJsonStreamParser
{
    void ContinueParsing(ref Utf8JsonReader reader);
}
Enter fullscreen mode Exit fullscreen mode

Once we have buffered enough data from the response, we try to parse it.

State machine

The easiest way I found to parse the JSON with this setup is a state machine.
We can update the machine's state with each token and act accordingly, for example, by triggering an event.

Here is the state machine for the TODO list:

TODO State Machine

In this case, we trigger two events:

  • onListNameParsed when we find a String token while in the ReadingListName state.
  • onItemParsed when we find an ObjectEnded token while in the ReadingItem state.

Visitor pattern

We can use the visitor pattern to hide the complexity of Uft8JsonReader.

The base abstract base looks something like this:

private T State { get; set; } = initialState;

public void ContinueParsing(ref Utf8JsonReader reader)
{
    while (reader.Read())
    {
        switch (reader.TokenType)
        {
            case JsonTokenType.PropertyName:
                State = VisitProperty(State, reader.GetString()!);
                break;
            case JsonTokenType.String:
                State = VisitStringValue(State, reader.GetString()!);
                break;

            // etc.
        }
    }
}    

protected virtual T VisitProperty(T state, string propertyName) => state;

protected virtual T VisitStringValue(T state, string value) => state;

// etc.
Enter fullscreen mode Exit fullscreen mode

We can then implement only what we need, changing the state and triggering events:

public enum TodoStateMachineState
{
    None,
    ReadingListName
    // etc.
}


public class TodoStateMachineJsonTokenParser(
    Action<string> onListNameParsed) : StateMachineJsonTokenParser<TodoStateMachineState>(TodoStateMachineState.None)
{
    protected override TodoStateMachineState VisitProperty(TodoStateMachineState stateMachineState, string propertyName)
    {
        return propertyName switch
        {
            "listName" => TodoStateMachineState.ReadingListName,
            // etc.
        };
    }

    protected override TodoStateMachineState VisitStringValue(TodoStateMachineState stateMachineState, string value)
    {
        switch (stateMachineState)
        {
            case TodoStateMachineState.ReadingListName:
                onListNameParsed(value); // Trigger event
                return TodoStateMachineState.None;
            // etc.
        }
    }

    // etc.
}
Enter fullscreen mode Exit fullscreen mode

The feeder

Let's now see how we can keep feeding the parser. Below the full implementation:

/// <summary>
/// Provides a way to feed an <see cref="IIncrementalJsonStreamParser"/> with an <see cref="IAsyncEnumerable{T}"/> of <see cref="StreamingTextContent"/>
/// </summary>
/// <param name="incrementalParser">The parser.</param>
/// <param name="chunkBufferSize">The number of chunks to read before feeding the parser.</param>
public class JsonAsyncStreamTokenFeeder(IIncrementalJsonStreamParser incrementalParser, int chunkBufferSize)
{
    /// <summary>
    /// Start feeding the parser with the text content stream
    /// </summary>
    /// <param name="textContentStream">The source.</param>
    public async Task FeedAsync(IAsyncEnumerable<StreamingTextContent> textContentStream)
    {
        // Control the pace of the stream by reading in chunks
        var e = textContentStream.GetAsyncEnumerator();

        var completed = false;
        var buffer = new ArrayBufferWriter<byte>();

        try
        {
            JsonReaderState jsonReaderState = new();
            while (!completed)
            {
                // Load the buffer with the next chunk of text
                for (var i = 0; i < chunkBufferSize; i++)
                {
                    var readSuccess = await e.MoveNextAsync();
                    // Reached the end of the stream
                    if (!readSuccess)
                    {
                        completed = true;
                        break;
                    }

                    if (e.Current.Text == null) continue;
                    var bytes = Encoding.UTF8.GetBytes(e.Current.Text);
                    buffer.Write(bytes);
                }

                // Load the reader with the buffer
                var reader = new Utf8JsonReader(
                    buffer.WrittenSpan,
                    isFinalBlock: false,
                    state: jsonReaderState);

                // Parse as much as possible
                incrementalParser.ContinueParsing(ref reader);

                // Save the parsing state
                jsonReaderState = reader.CurrentState;

                // Create a new buffer with the leftover bytes that were not consumed by the parser
                // This happens when the parser is in the middle of a token
                var remainingBytes = buffer.WrittenSpan[(int)reader.BytesConsumed..];
                buffer = new ArrayBufferWriter<byte>();
                buffer.Write(remainingBytes);
            }
        }
        finally
        {
            await e.DisposeAsync();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Explanation:

  • We manually load a given number of chunks into a buffer.
  • We create a Utf8JsonReader with the buffer.
    • IsFinalBlock is false as we don't know if we have reached the end of the stream.
    • We pass the JsonReaderState to keep track of the parsing state.
  • Call ContinueParsing on the parser. The parser returns once there are no more tokens to read.
  • We save the state of the reader.
  • We create a new buffer with the remaining bytes not consumed by the parser.
  • We start again until we reach the end of the stream.

Usage

Here's an example of the usage of everything we created:

OpenAIPromptExecutionSettings openAiPromptExecutionSettings = new()
{
    ResponseFormat = "json_object",
};
var textContentStream = textGenerationService
    .GetStreamingTextContentsAsync(prompt, openAiPromptExecutionSettings);

var parser = new TodoStateMachineJsonTokenParser(PrintName, PrintItem); // Callbacks
var feeder = new JsonAsyncStreamTokenFeeder(parser, chunkBufferSize: 32);
await feeder.FeedAsync(textContentStream);
Enter fullscreen mode Exit fullscreen mode

FeedAsync waits till the stream is completed; at that point, all events have been fired.

Repository

matteobortolazzo/semantic-kernel-json-streaming-parser

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

Top comments (0)