This post will look at working with the JavaScript Streams API which allows making a fetch HTTP call and receiving a streaming response in chunks, which allows a client to start responding to a server response more quickly and build UIs like ChatGPT.
As a motivating example, we’ll implement a function to handle the streaming LLM response from OpenAI (or any server using the same http streaming API), using no npm dependencies—just the built-in fetch
. The full code is here including retries with exponential backoff, embeddings, non-streaming chat, and a simpler APIs for interacting with chat completions and embeddings.
If you’re interested in seeing how to also return an HTTP stream to clients, check out this post.
Full example code
Here’s the full example. We’ll look at each piece below:
async function createChatCompletion(body: ChatCompletionCreateParams) {
// Making the request
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
const response = await fetch(baseUrl + "/v1/chat/completions", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": "Bearer " + process.env.LLM_API_KEY,
},
body: JSON.stringify(body),
});
// Handling errors
if (!response.ok) {
const error = await response.text();
throw new Error(`Failed (${response.status}): ${error}`,
}
if (!body.stream) { // the non-streaming case
return response.json();
}
const stream = response.body;
if (!stream) throw new Error("No body in response");
// Returning an async iterator
return {
[Symbol.asyncIterator]: async function* () {
for await (const data of splitStream(stream)) {
// Handling the OpenAI HTTP streaming protocol
if (data.startsWith("data:")) {
const json = data.substring("data:".length).trimStart();
if (json.startsWith("[DONE]")) {
return;
}
yield JSON.parse(json);
}
}
},
};
}
// Reading the stream
async function* splitStream(stream: ReadableStream<Uint8Array>) {
const reader = stream.getReader();
let lastFragment = "";
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
// Flush the last fragment now that we're done
if (lastFragment !== "") {
yield lastFragment;
}
break;
}
const data = new TextDecoder().decode(value);
lastFragment += data;
const parts = lastFragment.split("\n\n");
// Yield all except for the last part
for (let i = 0; i < parts.length - 1; i += 1) {
yield parts[i];
}
// Save the last part as the new last fragment
lastFragment = parts[parts.length - 1];
}
} finally {
reader.releaseLock();
}
}
See the code here for a version that has nice typed overloads for streaming & non-streaming parameter variants, along with retries and other improvements.
The rest of the post is about understanding what this code does.
Making the request
This part is actually very easy. A streaming HTTP response comes from a normal HTTP request:
const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
const response = await fetch(baseUrl + "/v1/chat/completions", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": "Bearer " + process.env.LLM_API_KEY,
},
body: JSON.stringify(body),
});
The HTTP headers are sent up per usual, and don’t have to set anything in particular to enable streaming. And you can still leverage regular caching headers for HTTP streaming.
Handling errors
The story around errors on the client side is a little unfortunate for HTTP streaming. The upside is that for HTTP streaming, the client gets status codes right away in the initial response and can detect failure there. The downside to the http protocol is that if the server returns success but then breaks mid-stream, there isn’t anything at the protocol level that will tell the client that the stream was interrupted. We’ll see below how OpenAI encodes an “all done” sentinel at the end to work around this.
if (!response.ok) {
const error = await response.text();
throw new Error(`Failed (${response.status}): ${error}`,
}
Reading the stream
In order to read an HTTP streaming response, the client can use the response.body
property which is a ReadableStream allowing you to iterate over the chunks as they come in from the server using the .getReader(
) method.1
const reader = request.body.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
const text = TextDecoder().decode(value);
//... do something with the chunk
}
} finally {
reader.releaseLock();
}
This handles every bit of data that we get back, but for the OpenAI HTTP protocol we are expecting the data to be JSON separated by newlines, so instead we will split up the response body and “yield” each line as they’re completed. We buffer the in-progress line into lastFragment
and only return full lines that have been separated by two newlines:
// stream here is request.body
async function* splitStream(stream: ReadableStream<Uint8Array>) {
const reader = stream.getReader();
let lastFragment = "";
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
// Flush the last fragment now that we're done
if (lastFragment !== "") {
yield lastFragment;
}
break;
}
const data = new TextDecoder().decode(value);
lastFragment += data;
const parts = lastFragment.split("\n\n");
// Yield all except for the last part
for (let i = 0; i < parts.length - 1; i += 1) {
yield parts[i];
}
// Save the last part as the new last fragment
lastFragment = parts[parts.length - 1];
}
} finally {
reader.releaseLock();
}
}
If this function*
and yield
syntax is unfamiliar to you, just treat function*
as a function that can return multiple things in a loop, and yield
as the way of returning something multiple times from a function.
You can then loop over this splitStream
function like:
for await (const data of splitStream(response.body)) {
// data here is a full line of text. For OpenAI, it might look like
// "data: {...some json object...}" or "data: [DONE]" at the end
}
If this "for await" syntax throws you off, it's using what’s called an “async iterator” - like a regular iterator you’d use with a for loop, but every time it gets the next value, it’s awaited.
For our example, when we’ve gotten some text from OpenAI and we’re waiting for more, the for loop will wait until splitStream
yields another value, which will happen when await reader.read()
returns a value that finishes one or more lines of text.
Next up we’ll look at another way of returning an async iterator that isn’t a function like splitStream
, so a caller can use a “for await” loop to iterate over this data.
Returning an async iterator
Now that we have an async iterator returning full lines of text, we could just return splitStream(response.body)
, but we want to intercept each of the lines and transform them, while still letting the caller of our function to iterate.
The approach is similar to to the async function*
syntax above. Here we’ll return an async iterator directly, instead of an async function that returns one when it’s called. The difference is the type is AsyncIterator
instead of AsyncGenerator
which needs to be called first. An AsyncIterator
can be defined by having a certain named function: Symbol.asyncIterator
.2
return {
[Symbol.asyncIterator]: async function* () {
for await (const data of splitStream(stream)) {
//handle the data
yield data;
}
},
};
This is useful when you want to return something different from the data coming from splitStream
. Every time a new line comes in from the streaming HTTP request, splitStream
will yield
it, this function will receive it in data
and can do something before yielding it to its caller.
Next we’ll look at how to interpret this data specifically in the case of OpenAI’s streaming chat completion API.
Handling the OpenAI HTTP streaming protocol
The OpenAI response protocol is a series of lines that start with data:
or event:
, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE]
if the stream is done, otherwise it’s just JSON.
for await (const data of splitStream(stream)) {
if (data.startsWith("data:")) {
const json = data.substring("data:".length).trimStart();
if (json.startsWith("[DONE]")) {
return;
}
yield JSON.parse(json);
} else {
console.debug("Unexpected data:", data);
}
}
Bringing it all together
Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai
npm package:
const response = await createChatCompletion({
model: "llama3",
messages: [...your messages...],
stream: true,
});
for await (const chunk of response) {
if (chunk.choices[0].delta?.content) {
console.log(chunk.choices[0].delta.content);
}
}
See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content
:
const response = await chatStream(messages);
for await (const content of response) {
console.log(content);
}
Before you copy the code, try to implement it yourself as an exercise in async functions.
More resources
- For information about returning HTTP streaming data from your own server endpoint, check out this post on AI Chat with HTTP Streaming that both streams data from OpenAI (or similar) to your server and simultaneously streams it down to a client, while doing custom logic as it goes (such as saving chunks to a database).
- The MDN docs, as always, are great. Beyond the links above, here’s a guide on the readable streams API that shows how to connect a readable stream to an
<img>
tag to stream in an image request. Note: this guide usesresponse.body
as an async iterator, but currently that is not widely implemented and not in the TypeScript types.
-
Note: you can only have one reader of the stream at a time, so you generally don’t call
.getReader()
multiple times - you probabaly want.tee()
in that case, and if you want to use.getReader()
multiple times for some reason, make sure to have the first.releaseLock()
first. ↩ -
Or alternatively you can If you aren’t familiar with
Symbol
, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key namedasyncIterator
. You could access the function withmyIterator[Symbol.asyncIterator]()
. ↩
Top comments (0)