DEV Community

Ian Macartney
Ian Macartney

Posted on • Originally published at stack.convex.dev

Streaming HTTP Responses using fetch

This post will look at working with the JavaScript Streams API which allows making a fetch HTTP call and receiving a streaming response in chunks, which allows a client to start responding to a server response more quickly and build UIs like ChatGPT.

As a motivating example, we’ll implement a function to handle the streaming LLM response from OpenAI (or any server using the same http streaming API), using no npm dependencies—just the built-in fetch. The full code is here including retries with exponential backoff, embeddings, non-streaming chat, and a simpler APIs for interacting with chat completions and embeddings.

If you’re interested in seeing how to also return an HTTP stream to clients, check out this post.

Full example code

Here’s the full example. We’ll look at each piece below:

async function createChatCompletion(body: ChatCompletionCreateParams) {
  // Making the request
  const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
  const response = await fetch(baseUrl + "/v1/chat/completions", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "Authorization": "Bearer " + process.env.LLM_API_KEY,
    },
    body: JSON.stringify(body),
  });
  // Handling errors
  if (!response.ok) {
    const error = await response.text();
    throw new Error(`Failed (${response.status}): ${error}`,
  }
  if (!body.stream) { // the non-streaming case
    return response.json();
  }
  const stream = response.body;
  if (!stream) throw new Error("No body in response");
  // Returning an async iterator
  return {
    [Symbol.asyncIterator]: async function* () {
      for await (const data of splitStream(stream)) {
        // Handling the OpenAI HTTP streaming protocol
        if (data.startsWith("data:")) {
          const json = data.substring("data:".length).trimStart();
          if (json.startsWith("[DONE]")) {
            return;
          }
          yield JSON.parse(json);
        }
      }
    },
  };
}

// Reading the stream  
async function* splitStream(stream: ReadableStream<Uint8Array>) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment += data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i < parts.length - 1; i += 1) {
        yield parts[i];
      }
      // Save the last part as the new last fragment
      lastFragment = parts[parts.length - 1];
    }
  } finally {
    reader.releaseLock();
  }
}
Enter fullscreen mode Exit fullscreen mode

See the code here for a version that has nice typed overloads for streaming & non-streaming parameter variants, along with retries and other improvements.

The rest of the post is about understanding what this code does.

Making the request

This part is actually very easy. A streaming HTTP response comes from a normal HTTP request:

const baseUrl = process.env.LLM_BASE_URL || "https://api.openai.com";
const response = await fetch(baseUrl + "/v1/chat/completions", {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    "Authorization": "Bearer " + process.env.LLM_API_KEY,
  },
  body: JSON.stringify(body),
});
Enter fullscreen mode Exit fullscreen mode

The HTTP headers are sent up per usual, and don’t have to set anything in particular to enable streaming. And you can still leverage regular caching headers for HTTP streaming.

Handling errors

The story around errors on the client side is a little unfortunate for HTTP streaming. The upside is that for HTTP streaming, the client gets status codes right away in the initial response and can detect failure there. The downside to the http protocol is that if the server returns success but then breaks mid-stream, there isn’t anything at the protocol level that will tell the client that the stream was interrupted. We’ll see below how OpenAI encodes an “all done” sentinel at the end to work around this.

if (!response.ok) {
  const error = await response.text();
  throw new Error(`Failed (${response.status}): ${error}`,
}
Enter fullscreen mode Exit fullscreen mode

Reading the stream

In order to read an HTTP streaming response, the client can use the response.body property which is a ReadableStream allowing you to iterate over the chunks as they come in from the server using the .getReader() method.1

const reader = request.body.getReader();
try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) break;
      const text = TextDecoder().decode(value);
      //... do something with the chunk
    }
} finally {
  reader.releaseLock();
}
Enter fullscreen mode Exit fullscreen mode

This handles every bit of data that we get back, but for the OpenAI HTTP protocol we are expecting the data to be JSON separated by newlines, so instead we will split up the response body and “yield” each line as they’re completed. We buffer the in-progress line into lastFragment and only return full lines that have been separated by two newlines:

// stream here is request.body
async function* splitStream(stream: ReadableStream<Uint8Array>) {
  const reader = stream.getReader();
  let lastFragment = "";
  try {
    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        // Flush the last fragment now that we're done
        if (lastFragment !== "") {
          yield lastFragment;
        }
        break;
      }
      const data = new TextDecoder().decode(value);
      lastFragment += data;
      const parts = lastFragment.split("\n\n");
      // Yield all except for the last part
      for (let i = 0; i < parts.length - 1; i += 1) {
        yield parts[i];
      }
      // Save the last part as the new last fragment
      lastFragment = parts[parts.length - 1];
    }
  } finally {
    reader.releaseLock();
  }
}
Enter fullscreen mode Exit fullscreen mode

If this function* and yield syntax is unfamiliar to you, just treat function* as a function that can return multiple things in a loop, and yield as the way of returning something multiple times from a function.

You can then loop over this splitStream function like:

for await (const data of splitStream(response.body)) {
  // data here is a full line of text. For OpenAI, it might look like
  // "data: {...some json object...}" or "data: [DONE]" at the end
}
Enter fullscreen mode Exit fullscreen mode

If this "for await" syntax throws you off, it's using what’s called an “async iterator” - like a regular iterator you’d use with a for loop, but every time it gets the next value, it’s awaited.

For our example, when we’ve gotten some text from OpenAI and we’re waiting for more, the for loop will wait until splitStream yields another value, which will happen when await reader.read() returns a value that finishes one or more lines of text.

Next up we’ll look at another way of returning an async iterator that isn’t a function like splitStream, so a caller can use a “for await” loop to iterate over this data.

Returning an async iterator

Now that we have an async iterator returning full lines of text, we could just return splitStream(response.body), but we want to intercept each of the lines and transform them, while still letting the caller of our function to iterate.

The approach is similar to to the async function* syntax above. Here we’ll return an async iterator directly, instead of an async function that returns one when it’s called. The difference is the type is AsyncIterator instead of AsyncGenerator which needs to be called first. An AsyncIterator can be defined by having a certain named function: Symbol.asyncIterator.2

      return {
        [Symbol.asyncIterator]: async function* () {
          for await (const data of splitStream(stream)) {
            //handle the data
            yield data;
          }
        },
      };
Enter fullscreen mode Exit fullscreen mode

This is useful when you want to return something different from the data coming from splitStream. Every time a new line comes in from the streaming HTTP request, splitStream will yield it, this function will receive it in data and can do something before yielding it to its caller.

Next we’ll look at how to interpret this data specifically in the case of OpenAI’s streaming chat completion API.

Handling the OpenAI HTTP streaming protocol

The OpenAI response protocol is a series of lines that start with data: or event:, but we’ll just handle the data responses, since that’s the useful part for chat completions. There’s a sentinel of [DONE] if the stream is done, otherwise it’s just JSON.

for await (const data of splitStream(stream)) {
  if (data.startsWith("data:")) {
    const json = data.substring("data:".length).trimStart();
    if (json.startsWith("[DONE]")) {
      return;
    }
    yield JSON.parse(json);
  } else {
    console.debug("Unexpected data:", data);
  }
}
Enter fullscreen mode Exit fullscreen mode

Bringing it all together

Now that you understand HTTP streaming, you can feel confident working directly with streaming APIs without relying on sdks or libraries. This allows you to hide latency, as your UI can immediately start updating, without consuming more bandwidth with multiple requests. You can use the above function like you would with the official openai npm package:

  const response = await createChatCompletion({
    model: "llama3",
    messages: [...your messages...],
    stream: true,
  });
  for await (const chunk of response) {
    if (chunk.choices[0].delta?.content) {
      console.log(chunk.choices[0].delta.content);
    }
  }
Enter fullscreen mode Exit fullscreen mode

See the code here that also lets you make some utility functions to make this even easier by pre-configuring the model and extracting the .choices[0].delta.content:

const response = await chatStream(messages);
for await (const content of response) {
  console.log(content);
}
Enter fullscreen mode Exit fullscreen mode

Before you copy the code, try to implement it yourself as an exercise in async functions.

More resources

  • For information about returning HTTP streaming data from your own server endpoint, check out this post on AI Chat with HTTP Streaming that both streams data from OpenAI (or similar) to your server and simultaneously streams it down to a client, while doing custom logic as it goes (such as saving chunks to a database).
  • The MDN docs, as always, are great. Beyond the links above, here’s a guide on the readable streams API that shows how to connect a readable stream to an <img> tag to stream in an image request. Note: this guide uses response.body as an async iterator, but currently that is not widely implemented and not in the TypeScript types.

  1. Note: you can only have one reader of the stream at a time, so you generally don’t call .getReader() multiple times - you probabaly want .tee() in that case, and if you want to use .getReader() multiple times for some reason, make sure to have the first .releaseLock() first. 

  2. Or alternatively you can If you aren’t familiar with Symbol, it’s used in a way to have keys in an object that aren’t strings or numbers. That way they don’t conflict if you added a key named asyncIterator. You could access the function with myIterator[Symbol.asyncIterator]()

Top comments (0)