DEV Community

James Lee
James Lee

Posted on

Engineering GraphRAG for Production: API Design, Query Optimization, and Service Reliability

1. The Gap Between Open-Source Scripts and Enterprise Services

In the previous two articles, we built a complete data pipeline featuring MinerU multimodal parsing and a dynamic-aware chunking algorithm. However, before deploying GraphRAG into production, the official package only provides CLI scripts and low-level Python function calls via graphrag.api, leaving three critical pain points unresolved:

  1. No API Layer: There is no RESTful interface to connect with the customer service system or support automated operations. After encapsulation, the LangGraph Agent can call GraphRAG through a standard API with zero awareness of the underlying implementation.
  2. No Streaming Support: The official package only provides synchronous query functions with no HTTP-level streaming response, resulting in a poor real-time conversation experience. After encapsulation, SSE-based streaming delivers a frontend-friendly real-time output.
  3. Scattered Scheduling: Full/incremental indexing and four query modes require callers to handle all low-level logic themselves, with no unified service entry point and poor engineering reusability. After encapsulation, a single unified entry point lets callers focus purely on business parameters.

This article covers the engineering transformation of the official graphrag.api module (prompt_tune.py, index.py, query.py) into four core API capabilities: dynamic prompt generation, index building, incremental index updates, and query service — ultimately delivering a production-grade GraphRAG service with high availability, high performance, and high extensibility, laying the foundation for Part 4's multi-Agent architecture.


2. System Architecture: Defining the Encapsulation Layer

  ┌──────────────────┐    ┌──────────────────────────┐
  │   CSV Order Data  │    │   PDF Product Manuals     │
  └────────┬─────────┘    │   MinerU + LitServe       │
           │              └──────────┬────────────────┘
           │                         │
           └────────────┬────────────┘
                        ▼
      ┌─────────────────────────────────────────────┐
      │       GraphRAG Service Layer (This Article)  │
      │                                             │
      │  FastAPI Router                             │
      │  ├── POST /api/graphrag/prompt              │
      │  ├── POST /api/graphrag/index               │
      │  ├── POST /api/query                        │
      │  └── POST /api/query_stream                 │
      │                │                            │
      │  graphrag.api Call Layer                    │
      │  ├── generate_indexing_prompts()            │
      │  ├── build_index()  full / incremental      │
      │  └── basic/local/global/drift_search()      │
      │                │                            │
      │  Storage: LanceDB + Parquet + FilePipelineStorage │
      └────────────────┬────────────────────────────┘
                       │ RESTful API
           ┌───────────┴───────────┐
           ▼                       ▼
     Customer Service Agent    Back-end Admin System
     (LangGraph Agent)         (Incremental Data Push)
     → See Part 4: Multi-Agent Architecture
Enter fullscreen mode Exit fullscreen mode

3. Four Core API Implementations

3.1 Prompt Tuning API

Endpoint: POST /api/graphrag/prompt

Wraps the official generate_indexing_prompts() as an async endpoint with dynamic parameter support and Chinese-language optimization. Core design decisions:

  1. Full parameter alignment: Preserves all official API parameters including logger and selection_method (DocSelectionType enum);
  2. Chinese language optimization: Explicitly passes language="Chinese" to avoid auto-detection errors;
  3. Observable progress: Integrates RichProgressLogger for real-time progress feedback.
import graphrag.api as api
from graphrag.prompt_tune.types import DocSelectionType
from graphrag.logger.rich_progress import RichProgressLogger

@router.post("/prompt")
async def run_prompt_tune(req: PromptTuneRequest):
    # Abstract: config loading and logger initialization
    config = load_graphrag_config(req)
    progress_logger = RichProgressLogger(prefix="graphrag-prompt-tune")

    # Core: enum mapping for document selection method
    selection_map = {
        "auto": DocSelectionType.AUTO,
        "all":  DocSelectionType.ALL,
        "top":  DocSelectionType.TOP,
    }
    doc_selection = selection_map.get(req.selection_method.lower(), DocSelectionType.RANDOM)

    # Core: call official API to generate prompts
    entity_prompt, community_prompt, summarize_prompt = await api.generate_indexing_prompts(
        config=config,
        logger=progress_logger,
        root=req.root,
        chunk_size=req.chunk_size,
        overlap=req.overlap,
        limit=req.limit,
        selection_method=doc_selection,
        domain=req.domain,
        language="Chinese",  # ⚠️ Always explicit — auto-detection can misidentify Chinese as English
        max_tokens=req.max_tokens,
        discover_entity_types=req.discover_entity_types,
        min_examples_required=req.min_examples_required,
        n_subset_max=req.n_subset_max,
        k=req.k,
    )

    # Abstract: prompt persistence logic
    save_prompts(req.output_dir, entity_prompt, community_prompt, summarize_prompt)
    return {"status": "ok", "output_dir": req.output_dir}
Enter fullscreen mode Exit fullscreen mode

Production Pitfall: When language is omitted, auto-detection occasionally misidentifies Chinese corpora as English, causing the generated prompt templates to be in the wrong language. Always pass language="Chinese" explicitly.


3.2 Index Building & Incremental Update API

Endpoint: POST /api/graphrag/index

A unified entry point for both full builds and incremental updates, controlled by the is_update flag — directly mapped to the official build_index parameter is_update_run. Core design decisions:

  1. Unified entry point: Eliminates the need for separate full/incremental endpoints, reducing caller complexity;
  2. Configurable indexing strategy: Supports both Standard and Fast modes to balance accuracy and speed;
  3. Structured result response: Returns categorized workflow execution status for operational diagnostics.
import graphrag.api as api
from graphrag.config.enums import IndexingMethod

@router.post("/index")
async def run_index(req: IndexRequest):
    # Abstract: config loading and logger initialization
    config = load_graphrag_config(req)
    progress_logger = RichProgressLogger(prefix="graphrag-index")

    # Core: indexing strategy selection (Standard vs Fast)
    index_strategy = IndexingMethod.Fast if req.index_method == "fast" else IndexingMethod.Standard

    # Core: unified full/incremental build via build_index
    index_result: list[PipelineRunResult] = await api.build_index(
        config=config,
        method=index_strategy,
        is_update_run=req.is_update,  # incremental update flag, natively supported
        memory_profile=req.memory_profile,
        progress_logger=progress_logger
    )

    # Abstract: result parsing and status response
    error_results = [res for res in index_result if res.errors]
    return {
        "status": "ok" if not error_results else "partial_error",
        "workflow_results": parse_workflow_results(index_result)
    }
Enter fullscreen mode Exit fullscreen mode

Multi-index isolation for incremental updates: In enterprise scenarios, CSV and PDF data require different chunking strategies. Isolation is achieved by specifying separate root directories per data source:

/graphrag/
├── data_source_a/     ← CSV order data,    file_pattern: .*\.csv$
│   └── settings.yaml
└── data_source_b/     ← PDF product docs,  file_pattern: .*\.md$
    └── settings.yaml
Enter fullscreen mode Exit fullscreen mode

Production Recommendation: Beyond directory isolation, each data source should have its own settings.yaml with independent chunking strategies and entity extraction rules to prevent cross-contamination.


3.3 Synchronous Query API

Endpoint: POST /api/query

Supports all four official query modes with full parameter alignment. Core design decisions:

  1. Unified multi-mode entry point: Routes to the appropriate query function via query_type, reducing caller complexity;
  2. Traceable context: Custom callbacks capture query context to support result debugging and optimization;
  3. Layered exception handling: Separates parameter errors, business exceptions, and system errors per RESTful conventions.

Four Query Modes (fully aligned with the official API):

Mode Use Case Data Dependencies Speed
basic Simple keyword matching text_units ⚡ Fastest
local Precise entity queries (e.g. "Order #123 shipping") entities, relationships, covariates ⚡ Fast
global Cross-section semantic understanding (e.g. "all return policies") entities, communities, reports 🐢 Slower
drift Exploratory reasoning, multi-hop association entities, communities, reports 🐢 Slowest

Implementation (core logic abstracted):

import graphrag.api as api

@router.post("/query")
async def query(request: QueryRequest):
    try:
        # Abstract: config and index data loading
        config = load_graphrag_config(request)
        index_data = await load_index_data(request)

        # Core: custom callback to capture query context
        context_container = {}
        callbacks = NoopQueryCallbacks()
        callbacks.on_context = lambda ctx: context_container.update({"ctx": ctx})

        # Core: dictionary-based routing for all four query modes
        query_type = request.query_type.lower()
        query_map = {
            "basic":  api.basic_search,
            "local":  api.local_search,
            "global": api.global_search,
            "drift":  api.drift_search,
        }
        if query_type not in query_map:
            raise ValueError(f"Unsupported query type: '{query_type}'. Valid options: {list(query_map.keys())}")

        response, _ = await query_map[query_type](
            config=config,
            data=index_data,
            query=request.query,
            callbacks=[callbacks]
        )

        # Abstract: context serialization (resolves DataFrame JSON serialization issue)
        return {
            "query": request.query,
            "response": response,
            "query_type": query_type,
            "context": format_context(context_container["ctx"])
        }
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        logger.error("Query error", exc_info=True)
        raise HTTPException(status_code=500, detail="Query processing failed")
Enter fullscreen mode Exit fullscreen mode

3.4 Streaming Query API

Endpoint: POST /api/query_stream

Based on the production implementation: execute the full query first, then stream the response in segments to support frontend SSE rendering. Core design decisions:

  1. Reuses core query logic: Guarantees consistency between synchronous and streaming results;
  2. SSE protocol compliance: Standard SSE format output, compatible with mainstream frontend frameworks;
  3. Exception safety: Errors during streaming do not drop the connection — they are returned as SSE error events.
@router.post("/query_stream")
async def query_stream(request: QueryRequest):
    async def event_stream():
        try:
            yield "data: Processing query...\n\n"

            # Reuse core query logic
            response = await core_query_logic(request)

            # Core: segment and stream the response
            for segment in split_response(response, batch_size=25):
                yield f"data: {segment}\n\n"
                await asyncio.sleep(0.1)

            yield "data: --- Stream complete ---\n\n"
            yield "data: [DONE]\n\n"
        except Exception as e:
            logger.error("Streaming query error", exc_info=True)
            yield f"data: Error: {str(e)}\n\n"
            yield "data: [DONE]\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

4. Production Pitfalls & Lessons Learned

4.1 DataFrame Serialization Error

Symptom: After local_search loads Parquet files, passing covariates raises TypeError: Object of type DataFrame is not JSON serializable.

Solution: Implement a format_context function at the data loading layer to uniformly convert DataFrames and custom objects into serializable strings/dicts before they reach the response layer.

4.2 SSE Stream Disconnection (Nginx Timeout)

Symptom: Global Search queries exceeding 30 seconds hit Nginx's default timeout, causing SSE connections to drop and the frontend to receive incomplete results.

Solution: Set proxy_read_timeout 120s in Nginx config. Additionally, emit intermediate status messages at the start and during processing to prevent the frontend from closing the connection due to inactivity.

4.3 Data Inconsistency After Incremental Updates

Symptom: After adding new files and running an incremental update, relationships between new and existing entities are not correctly rebuilt, leading to incomplete query results.

Solution: Before incremental updates, compare file MD5 hashes to identify added/modified/deleted files and process only the changed ones. After the update completes, re-run community detection to ensure entity relationship integrity.


5. Measurable Results

Metric Before (Raw CLI) After (Production API)
Average response latency ~3.0s ~1.2s (with data preloading)
Index update method Full rebuild (~30 min) Incremental update (~5 min)
Streaming output ✅ SSE real-time push
Multi-index isolation ✅ Isolated by root directory
Automated operations support ✅ Full RESTful API coverage

6. Summary & What's Next

This article completed the core transformation of GraphRAG from an open-source CLI tool into a production-grade service:

  • Four core APIs: Fully aligned with the designed capabilities — prompt generation, index building, incremental updates, and query service — enabling standard RESTful integration with the customer service system.
  • Streaming experience: SSE-based streaming delivers a frontend-friendly real-time response, significantly improving the conversational UX.
  • Incremental stability: Multi-index isolation combined with incremental update optimization ensures data consistency in production environments.

In Part 4, we'll build on this service layer to design the multi-Agent orchestration architecture — covering how LangGraph coordinates the retrieval, reasoning, and response generation agents to handle complex customer service scenarios end to end.


GitHub Repository: Link TBD

Series Navigation: This is Part 3 of 8 Weeks from Zero to One: Full-Stack Engineering Practice for a Production-Grade LLM Customer Service System. It builds on the data pipeline from Part 2 and lays the foundation for the multi-Agent architecture in Part 4.

Top comments (0)