The problem
LLMs like GPT-4o are now
good at generating JSON
, which opens up many possibilities.
Most of the time, we can wait for the LLM to complete the generation, parse the answer and return to the UI.
However, given the speed of LLMs, it can be frustrating for users to wait for the generation to be complete.
The best solution would be to display the generated content incrementally as soon as possible. This is relatively easy with
text, but it's a bit more complicated with JSON
as we need to ensure the content is valid at each step.
We need to parse the JSON
as it's generated, understand its structure, and act accordingly.
We'll build a Life’s to-do list generator. The LLM will generate a list of tasks, and we'll display them as soon as they are generated.
We'll make it more complex with the following schema:
{
"listName": "Bucket List",
"items": [
{
"recommendedAge": 30,
"description": "Skydiving"
},
{
"recommendedAge": 50,
"description": "Visit all seven continents"
}
]
}
There are two ready-to-use tools we can use:
- Semantic Kernel: an SDK to interact with AI models.
-
Utf8JsonReader:
a high-performance, low-allocation, forward-only reader for
JSON
.
It should be straightforward to combine these two tools to achieve our goal. There's even a section in the
documentation: Read from a stream using Utf8JsonReader!
The actual problem
There are multiple challenges:
- The reader example uses a
MemoryStream
while Semantic Kernel usesIAsyncEnumerable<StreamingTextContent>
. -
Utf8JsonReader
is aref struct
, so:- It doesn't work with streams anyway, only with
ReadOnlySpan<byte>
in the constructor. - It can't be passed as a parameter to an
async
method. - It can't be used across
await
oryield
boundaries. - It's a lexer/tokenizer, not a parser, so we need to handle the
JSON
structure ourselves.
- It doesn't work with streams anyway, only with
The solution
We need to solve two problems:
- How to use
Utf8JsonReader
withIAsyncEnumerable<StreamingTextContent>
. - How to parse the
JSON
structure incrementally.
Let's start with the latter, as it's simpler.
The parser
The primary method of Utf8JsonReader
is Read()
. A simple JSON
like { "name": "test" }
will generate the
following tokens:
StartObject
PropertyName
String
EndObject
Each time we call Read()
, the reader moves forward by a token, and we use:
-
TokenType
to know the type. -
ValueSpan
and other methods are used to get its value - The
bool
returned to know if more tokens exist to read.
The interface for this is quite simple:
public interface IIncrementalJsonStreamParser
{
void ContinueParsing(ref Utf8JsonReader reader);
}
Once we have buffered enough data from the response, we try to parse it.
State machine
The easiest way I found to parse the JSON
with this setup is a state machine.
We can update the machine's state with each token and act accordingly, for example, by triggering an event.
Here is the state machine for the TODO
list:
In this case, we trigger two events:
-
onListNameParsed
when we find aString
token while in theReadingListName
state. -
onItemParsed
when we find anObjectEnded
token while in theReadingItem
state.
Visitor pattern
We can use the visitor pattern to hide the complexity of Uft8JsonReader
.
The base abstract
base looks something like this:
private T State { get; set; } = initialState;
public void ContinueParsing(ref Utf8JsonReader reader)
{
while (reader.Read())
{
switch (reader.TokenType)
{
case JsonTokenType.PropertyName:
State = VisitProperty(State, reader.GetString()!);
break;
case JsonTokenType.String:
State = VisitStringValue(State, reader.GetString()!);
break;
// etc.
}
}
}
protected virtual T VisitProperty(T state, string propertyName) => state;
protected virtual T VisitStringValue(T state, string value) => state;
// etc.
We can then implement only what we need, changing the state and triggering events:
public enum TodoStateMachineState
{
None,
ReadingListName
// etc.
}
public class TodoStateMachineJsonTokenParser(
Action<string> onListNameParsed) : StateMachineJsonTokenParser<TodoStateMachineState>(TodoStateMachineState.None)
{
protected override TodoStateMachineState VisitProperty(TodoStateMachineState stateMachineState, string propertyName)
{
return propertyName switch
{
"listName" => TodoStateMachineState.ReadingListName,
// etc.
};
}
protected override TodoStateMachineState VisitStringValue(TodoStateMachineState stateMachineState, string value)
{
switch (stateMachineState)
{
case TodoStateMachineState.ReadingListName:
onListNameParsed(value); // Trigger event
return TodoStateMachineState.None;
// etc.
}
}
// etc.
}
The feeder
Let's now see how we can keep feeding the parser. Below the full implementation:
/// <summary>
/// Provides a way to feed an <see cref="IIncrementalJsonStreamParser"/> with an <see cref="IAsyncEnumerable{T}"/> of <see cref="StreamingTextContent"/>
/// </summary>
/// <param name="incrementalParser">The parser.</param>
/// <param name="chunkBufferSize">The number of chunks to read before feeding the parser.</param>
public class JsonAsyncStreamTokenFeeder(IIncrementalJsonStreamParser incrementalParser, int chunkBufferSize)
{
/// <summary>
/// Start feeding the parser with the text content stream
/// </summary>
/// <param name="textContentStream">The source.</param>
public async Task FeedAsync(IAsyncEnumerable<StreamingTextContent> textContentStream)
{
// Control the pace of the stream by reading in chunks
var e = textContentStream.GetAsyncEnumerator();
var completed = false;
var buffer = new ArrayBufferWriter<byte>();
try
{
JsonReaderState jsonReaderState = new();
while (!completed)
{
// Load the buffer with the next chunk of text
for (var i = 0; i < chunkBufferSize; i++)
{
var readSuccess = await e.MoveNextAsync();
// Reached the end of the stream
if (!readSuccess)
{
completed = true;
break;
}
if (e.Current.Text == null) continue;
var bytes = Encoding.UTF8.GetBytes(e.Current.Text);
buffer.Write(bytes);
}
// Load the reader with the buffer
var reader = new Utf8JsonReader(
buffer.WrittenSpan,
isFinalBlock: false,
state: jsonReaderState);
// Parse as much as possible
incrementalParser.ContinueParsing(ref reader);
// Save the parsing state
jsonReaderState = reader.CurrentState;
// Create a new buffer with the leftover bytes that were not consumed by the parser
// This happens when the parser is in the middle of a token
var remainingBytes = buffer.WrittenSpan[(int)reader.BytesConsumed..];
buffer = new ArrayBufferWriter<byte>();
buffer.Write(remainingBytes);
}
}
finally
{
await e.DisposeAsync();
}
}
}
Explanation:
- We manually load a given number of chunks into a buffer.
- We create a
Utf8JsonReader
with the buffer.-
IsFinalBlock
isfalse
as we don't know if we have reached the end of the stream. - We pass the
JsonReaderState
to keep track of the parsing state.
-
- Call
ContinueParsing
on the parser. The parser returns once there are no more tokens to read. - We save the state of the reader.
- We create a new buffer with the remaining bytes not consumed by the parser.
- We start again until we reach the end of the stream.
Usage
Here's an example of the usage of everything we created:
OpenAIPromptExecutionSettings openAiPromptExecutionSettings = new()
{
ResponseFormat = "json_object",
};
var textContentStream = textGenerationService
.GetStreamingTextContentsAsync(prompt, openAiPromptExecutionSettings);
var parser = new TodoStateMachineJsonTokenParser(PrintName, PrintItem); // Callbacks
var feeder = new JsonAsyncStreamTokenFeeder(parser, chunkBufferSize: 32);
await feeder.FeedAsync(textContentStream);
FeedAsync
waits till the stream is completed; at that point, all events have been fired.
Top comments (0)