DEV Community

Cover image for GPT Streaming With Persistent Reactivity
Ian Macartney
Ian Macartney

Posted on • Originally published at stack.convex.dev

GPT Streaming With Persistent Reactivity

Building ChatGPT-powered experiences feel snappier when the responses show up incrementally. Instead of waiting for the full response before showing the user anything, streaming the text in allows them to start reading immediately.

OpenAI exposes a streaming API for chat completions. But how do you manage a streaming request, when you have a server between the client and OpenAI? You might be tempted to use HTTP streaming end to end - both from the client to the server and the server to OpenAI. However, there’s another way that comes with some big benefits. Spoiler: it’s possible to use a database as a layer of reactivity that separates client request lifecycles from server requests. Don’t worry if that doesn’t make sense yet - we’ll take it one step at a time.

This post will look at working with streams with OpenAI’s beta v4.0.0 Node SDK. Beyond just getting streaming for a single user, we’ll look at an approach that enables:

  • Persisting the response even if the user closes their browser.
  • Multiplayer chat, including streaming multiple ChatGPT messages at once.
  • Resuming a stream when a user refreshes their browser mid-stream.
  • Streaming to multiple users at once.
  • Implement custom stream granularity, such as only updating on full words or sentences, rather than on each token.

To do this, we’ll use Convex to store the messages and make the request to OpenAI. This code is on GitHub for you to clone and play with.

Diagram of browsers talking to Convex, which talks to OpenAI

Persisting messages

Let’s say we have a chat app, like the one pictured in the gif above. We want to store the messages from each user, as well as messages populated by responses from OpenAI. First let’s look at how data is stored (2), assuming a client sends a message (1).

When a user sends a message, we immediately commit it to the database, so they’re correctly ordered by creation time. This code is executed on the server:

export const send = mutation({
  args: { body: v.string(), author: v.string() },
  handler: async ({ db, scheduler }, { body, author }) => {
    // Save our message to the DB.
    await db.insert("messages", { body, author });

    if (body.indexOf("@gpt") !== -1) {
      // ...see below
    }
  }
});
Enter fullscreen mode Exit fullscreen mode

This mutation saves the message to the database. When the user wants a response from the GPT model (by adding “@gpt” to the message), we will:

  1. Store a placeholder message to update later.
  2. Make a streaming request to OpenAI in an asynchronous background function.
  3. Progressively update the message as the response streams in.

By running the streaming request asynchronously (versus blocking in a user request), we can interact with ChatGPT and save the data to the database even if the client has closed their browser. It also allows us to run many requests in parallel, from the same or multiple users.

We also run it asynchronously because, in Convex, mutations are pure transactions and as such can’t do non-deterministic things like making API requests. In order to talk to third-party services, we can use an action. Actions are non-transactional serverless functions that can talk to third-party services. We trigger the background job to call ChatGPT and update the message body by scheduling the action like so:

// ...when the user wants to send a message to OpenAI's GPT model
const messages = // fetch recent messages to send as context
// Insert a message with a placeholder body.
const messageId = await db.insert("messages", {
  author: "ChatGPT",
  body: "...",
});
// Schedule an action that calls ChatGPT and updates the message.
await scheduler.runAfter(0, internal.openai.chat, { messages, messageId });
Enter fullscreen mode Exit fullscreen mode

We schedule it for zero milliseconds later, similar to doing setTimeout(fn, 0). The message writing and action scheduling happens transactionally in a mutation, so we will only run the action if the messages are successfully committed to the database.

When the action wants to update the body of a message as the streaming results come in, it can invoke an update mutation with the messageId from above:

export const update = internalMutation({
  args: { messageId: v.id("messages"), body: v.string() },
  handler: async ({ db }, { messageId, body }) => {
    await db.patch(messageId, { body });
  },
});
Enter fullscreen mode Exit fullscreen mode

Note: An internalMutation is just a mutation that isn’t exposed as part of the public API. Next we’ll look at the code that calls this update function.

Convex has end-to-end reactivity, so when we update the messages in the database, the UI automatically updates. See below what it looks like to reactively query data.

Streaming with the OpenAI node SDK

Streaming is currently available in the beta version of OpenAI’s node SDK. To install it:

npm install openai@4.0.0-beta.2
Enter fullscreen mode Exit fullscreen mode

This is very similar to previous releases of the openai package, with a few nuances you can read about here.

The internal.openai.chat action we referenced above will live in convex/openai.ts - see the full code here. One important detail is that it needs to run in the node runtime to support some dependencies in the openai package, which means it needs to have "use node"; as the first line in the file.

"use node";
import { OpenAI } from "openai";
import { internalAction } from "./_generated/server";
//...
type ChatParams = {
  messages: Doc<"messages">[];
  messageId: Id<"messages">;
};
export const chat = internalAction({
  handler: async ({ runMutation }, { messages, messageId }: ChatParams) => {
    //...Create and handle a stream request
Enter fullscreen mode Exit fullscreen mode

Creating a stream request

// inside the chat function in convex/openai.ts
const apiKey = process.env.OPENAI_API_KEY!;
const openai = new OpenAI({ apiKey });

const stream = await openai.chat.completions.create({
  model: "gpt-3.5-turbo", // "gpt-4" also works, but is so slow!
  stream: true,
  messages: [
    {
      role: "system",
      content: "You are a terse bot in a group chat responding to q's.",
    },
    ...messages.map(({ body, author }) => ({
      role:
        author === "ChatGPT" ? "assistant" : "user",
      content: body,
    })),
  ],
});
//...handling the stream
Enter fullscreen mode Exit fullscreen mode

The main difference here from previous openai versions (aside from their new simplified client configuration) is passing stream: true. This changes the return format, which unfortunately does not currently provide token usage as the non-streaming version does. I hope this is fixed in a future release, as keeping track of token usage is useful to know how different users or features are affecting your costs.

Handling the stream

The API exposed by the openai SDK makes handling the stream very easy. We use an async iterator to handle each chunk, appending it to the body and updating the message body with everything we’ve received so far:

let body = "";
for await (const part of stream) {
  if (part.choices[0].delta?.content) {
    body += part.choices[0].delta.content;
    await runMutation(internal.messages.update, {
      messageId,
      body,
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

Note that here we’re updating the message every time the body updates, but we could implement custom granularity by deciding when to call runMutation, such as on word breaks or at the end of full sentences.

This action allows us to stream messages from OpenAI to our server function and into the database. But how does this translate to clients updating in real time? Next, let’s see how the client reactively updates as messages are created and updated.

Client “streaming” via subscriptions

After the previous sections, you might be surprised how little is required to get the client to show live updating messages. I put streaming in quotes since we aren’t using HTTP streaming here - instead, we’re just using the reactivity provided out-of-the-box by Convex.

On the client, we use the useQuery hook, which calls the api.messages.list server function in the messages module, which we’ll see in a second. This hook will give us an updated list of messages every time a message is added or modified. This is a special property of a Convex query: it tracks the database requests, and when any of the data is changed it:

  1. Invalidates the query cache (which is managed transparently by Convex).
  2. Recomputes the result.
  3. Pushes the new data over a WebSocket to all subscribed clients.
export default function App() {
  const messages = useQuery(api.messages.list);
  ...
  return (
    ...
    {messages?.map((message) => (
      <article key={message._id}>
        <div>{message.author}</div>
        <p>{message.body}</p>
      </article>
    ))}
Enter fullscreen mode Exit fullscreen mode

Because this query is decoupled from the HTTP streaming response from OpenAI, multiple browsers can be subscribed to updates as messages change. And if a user refreshes or restarts their browser, it will just pick up the latest results of the query.

On the server, this is the query that grabs the most recent 100 messages:

export const list = query({
  handler: async ({ db }): Promise<Doc<"messages">[]> => {
    // Grab the most recent messages.
    const messages = await db.query("messages").order("desc").take(100);
    // Reverse the list so that it's in chronological order.
    // Alternatively, return it reversed and flip the order via flex-direction.
    return messages.reverse();
  },
});
Enter fullscreen mode Exit fullscreen mode

Convex is doing some magic under the hood. If any message is inserted or updated into the database that would match this query - for instance if a new message is added or one of the first 100 messages is edited - then it will automatically re-execute this query (if there are any clients subscribed to it via useQuery). If the results differ, it will push the new results over a WebSocket to the clients, which will trigger an update to the components using useQuery for that query.

To give you a sense of performance, list takes ~17ms and update takes ~7ms for me on the server, so the total latency between a new token coming from OpenAI and a new set of messages being sent to the client is very fast. The gifs in this article are real recordings, not sped up.

GPT response streaming in quickly

Summary

We looked at how to stream ChatGPT responses into Convex, allowing clients to watch the responses, without the flakiness of browser-based HTTP streaming requests. The full code is available here. Let us know in Discord what you think!

Extra Credit 🤓

Beyond what’s covered here, it would be easy to extend this demo to:

  • Store whether a message has finished streaming by storing a boolean on the message updated at the end of the stream.
  • Add error handling, to mark a message as failed if the stream fails. See this post for an example of updating a message in the case of failure.
  • Schedule a function to serve as a watchdog, that marks a message as timed out if it hasn’t finished within a certain timeframe, just in case the action failed. See this post for more details, as well as other patterns for background jobs.
  • Organize the messages by thread or user, using indexes.

Top comments (1)

Collapse
 
aditya_raj_1010 profile image
A.R

Can you explain the server-side implementation for storing messages, including the use of mutations and actions in Convex?
How does the server asynchronously handle streaming requests to OpenAI and update the message body as responses stream in?

"Followback for more insightful discussion"