DEV Community

Sunim
Sunim

Posted on β€’ Edited on

2

Async Pipeline Haystack Streaming over FastAPI Endpoint

If you are Langchain lover &/ don't like to use the Haystack Experimental Features, this blog is not for you!


Need for this blog

Love the haystack team and I hope they roll out a RC soon on hayhooks, for better intuitive/downstream experience. Till then, you can use this workaround_(ish)_, where we create pipeline task and set "sync" streaming callbacks on the running event loop to collect and yeild the chunks.

FAT shoutout to vblagoje for the gist πŸ™ŒπŸ½

Hand Holding

You can just copy the code and it should do the magic ie. streaming as Server-Sent Events.

Packages

Make sure to have these packages installed. Give uv or poetry a shot.

python = ">=3.10,<3.13"

fastapi = "^0.111.0"
uvicorn = "^0.30.1"

haystack-ai = "^2.8.0"
haystack-experimental = "^0.4.0"

pydantic = "^2.7.2"
Enter fullscreen mode Exit fullscreen mode

FastAPI

This tutorial is not a guide for FastAPI, but just a skeleton to build some sort of generator to stream responses.

import os
import asyncio

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

from pydantic import BaseModel
from typing import Dict, Any, AsyncGenerator

class ModalPipeline:
   def __init__(self, api_key: str):
      # something

   async def process_user_input(self, query: string) -> AsyncGenerator[str, None]:
      # something

class ChatQuery(BaseModel):
   """Chat query request model with user's input message."""
   query: str
   api_key: str

@app.post("/chat")
async def chat(chat_query: ChatQuery):
   pipeline = ModalPipeline(chat_query.api_key)

   return StreamingResponse(
       pipeline.process_user_input(chat_query.query),
       media_type="text/event-stream",
       headers={
           "Cache-Control": "no-cache",
           "Connection": "keep-alive",
           "Content-Type": "text/event-stream",
           "X-Accel-Buffering": "no",
       }
   )

if __name__ == "__main__":
   import uvicorn
   uvicorn.run(app, host="0.0.0.0", port=8000)
Enter fullscreen mode Exit fullscreen mode

Pipeline

Next, we define the asynchronous pipeline. Here, given that I am passing the API_KEY through the the params, we add components to the pipeline dynamically.

from haystack_experimental.core import AsyncPipeline
from haystack.components.generators import OpenAIGenerator

template = """
Answer the question.
Question: {{question}}
Answer:
"""

class ModalPipeline:
   def __init__(self, api_key: str):
      self.api_key = api_key       
      self.generator = self.create_generator()

   def create_generator(self):
      return OpenAIGenerator(api_key=Secret.from_token(self.api_key), model="gpt-4o-mini")

   async def run_pipeline(self, pipeline: AsyncPipeline, input_data: Dict[str, Any]) -> AsyncGenerator[str, None]:
      # something

   async def process_user_input(self, query: string) -> AsyncGenerator[str, None]:
      rag_pipeline = AsyncPipeline()

      rag_pipeline.add_component("prompt_builder", PromptBuilder(template= template))
      rag_pipeline.add_component("generator", self.generator)

      rag_pipeline.connect("prompt_builder.prompt", "generator.prompt")

      input_data={
         "prompt_builder": {
             "query": query,
         },
         "generator": {}
      }

      async for chunk in self.run_pipeline(pipeline, input_data):
         yield chunk
Enter fullscreen mode Exit fullscreen mode

The Hard Part

If you have just defined functions, you can just follow the gist.

If not, follow the code snippet below and I'll try to explain why we have implemented the way it is.

   async def run_pipeline(self, pipeline: AsyncPipeline, input_data: Dict[str, Any]) -> AsyncGenerator[str, None]:
      request_collector = ChunkCollector() # code snippet is right below
      loop = asyncio.get_running_loop()

      # Create sync wrapper for async callback
      async def async_callback(chunk):
          await collect_chunk(request_collector.queue, chunk) # code snippet is below

      def sync_callback(chunk):
          # Use run_coroutine_threadsafe instead of create_task
          future = asyncio.run_coroutine_threadsafe(async_callback(chunk), loop)
          try:
              # Wait for the coroutine to complete
              future.result()
          except Exception as e:
              print(f"Error in sync_callback: {str(e)}")

      # Set callbacks using sync wrapper
      input_data["generator"]["streaming_callback"] = sync_callback

      async def pipeline_runner():
          try:
              async for _ in pipeline.run(input_data):
                  pass
          finally:
              await request_collector.queue.put(None)

      # Create task for pipeline
      pipeline_task = asyncio.create_task(pipeline_runner())

      try:
          # Start yielding chunks
          async for chunk in request_collector.generator():
              yield chunk
      finally:
          # Ensure pipeline task is cleaned up
          if not pipeline_task.done():
              pipeline_task.cancel()
              try:
                  await pipeline_task
              except asyncio.CancelledError:
                  pass
Enter fullscreen mode Exit fullscreen mode

Some Q/A's for you

* Why dont we do a direct async callback without wrapping?

    async def callback(chunk):
        await collect_chunk(request_collector.queue, chunk)

    input_data["generator"]["streaming_callback"] = callback
Enter fullscreen mode Exit fullscreen mode

The generator is calling the callback synchronously, but we're passing an async function. So, we need a sync wrapper around our async callback, instead of trying to await an async generator.

* Why dont we just create a task?

    def sync_callback(chunk):
        asyncio.create_task(async_callback(chunk))
Enter fullscreen mode Exit fullscreen mode

The callback is being called from a different thread where there's no event loop. So, we need a thread-safe way to schedule the callback.

Chunking in SSE Format

We need to define the request_collector which handles the queue, stores the chunks and also yeilds the chunks from the queue(in SSE format).

from typing import AsyncGenerator

import uuid
import json

from asyncio import Queue

from haystack.dataclasses import StreamingChunk

class ChunkCollector:
    """Collects and queues streaming chunks."""
    def __init__(self):
        self.queue = Queue()

    async def generator(self) -> AsyncGenerator[str, None]:
        """Yields chunks from the queue."""
        # Send initial metadata event
        yield 'event: metadata\n' + f'data: {{"run_id": "{uuid.uuid4()}"}}\n\n'

        while True:
            chunk = await self.queue.get()
            if chunk is None:
                # Send end event
                yield 'event: end\n\n'
                break
            # Send data event
            yield f'event: data\ndata: {json.dumps(chunk)}\n\n'

async def collect_chunk(queue: Queue, chunk: StreamingChunk):
    """
    Collect chunks and store them in the queue.

    :param queue: Queue to store the chunks
    :param chunk: StreamingChunk to be collected
    """
    if chunk and chunk.content:
        await queue.put(chunk.content)
Enter fullscreen mode Exit fullscreen mode

Frontend

You can directcly use EventSource or fetch. For this tutorial, let's use fetch-event-source:

import { fetchEventSource } from "@microsoft/fetch-event-source";

const abortController = new AbortController();

await fetchEventSource("/api/v1/multi-modal/stream", {
          signal: abortController.signal,
          method: "POST",
          headers: { "Content-Type": "application/json" },
          body: JSON.stringify(payload),
          onmessage(msg) {
            if (msg.event === "data") {
              const parsedData = JSON.parse(msg.data);
              console.log(parsedData);
            }
          },
          openWhenHidden: true,
          onclose() {
            // something
          },
          onerror(error) {
            console.error(error);
            throw error;
          },
        });
Enter fullscreen mode Exit fullscreen mode

Final Notes

I completely agree with vblagoje here as sockets will be just πŸ‘ŒπŸ½. For more, do follow Support both callback and generator-based streaming in all Chat Generators

vblagoje saying what if there was generator/iterator output socket on all ChatGenerators


If you found this blog helpful, just send a good vibe my wayβ€” for my (research || side project || gigs) ✌🏽!

Top comments (0)

Image of Docusign

πŸ› οΈ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more