1. The Gap Between Open-Source Scripts and Enterprise Services
In the previous two articles, we built a complete data pipeline featuring MinerU multimodal parsing and a dynamic-aware chunking algorithm. However, before deploying GraphRAG into production, the official package only provides CLI scripts and low-level Python function calls via graphrag.api, leaving three critical pain points unresolved:
- No API Layer: There is no RESTful interface to connect with the customer service system or support automated operations. After encapsulation, the LangGraph Agent can call GraphRAG through a standard API with zero awareness of the underlying implementation.
- No Streaming Support: The official package only provides synchronous query functions with no HTTP-level streaming response, resulting in a poor real-time conversation experience. After encapsulation, SSE-based streaming delivers a frontend-friendly real-time output.
- Scattered Scheduling: Full/incremental indexing and four query modes require callers to handle all low-level logic themselves, with no unified service entry point and poor engineering reusability. After encapsulation, a single unified entry point lets callers focus purely on business parameters.
This article covers the engineering transformation of the official graphrag.api module (prompt_tune.py, index.py, query.py) into four core API capabilities: dynamic prompt generation, index building, incremental index updates, and query service — ultimately delivering a production-grade GraphRAG service with high availability, high performance, and high extensibility, laying the foundation for Part 4's multi-Agent architecture.
2. System Architecture: Defining the Encapsulation Layer
┌──────────────────┐ ┌──────────────────────────┐
│ CSV Order Data │ │ PDF Product Manuals │
└────────┬─────────┘ │ MinerU + LitServe │
│ └──────────┬────────────────┘
│ │
└────────────┬────────────┘
▼
┌─────────────────────────────────────────────┐
│ GraphRAG Service Layer (This Article) │
│ │
│ FastAPI Router │
│ ├── POST /api/graphrag/prompt │
│ ├── POST /api/graphrag/index │
│ ├── POST /api/query │
│ └── POST /api/query_stream │
│ │ │
│ graphrag.api Call Layer │
│ ├── generate_indexing_prompts() │
│ ├── build_index() full / incremental │
│ └── basic/local/global/drift_search() │
│ │ │
│ Storage: LanceDB + Parquet + FilePipelineStorage │
└────────────────┬────────────────────────────┘
│ RESTful API
┌───────────┴───────────┐
▼ ▼
Customer Service Agent Back-end Admin System
(LangGraph Agent) (Incremental Data Push)
→ See Part 4: Multi-Agent Architecture
3. Four Core API Implementations
3.1 Prompt Tuning API
Endpoint: POST /api/graphrag/prompt
Wraps the official generate_indexing_prompts() as an async endpoint with dynamic parameter support and Chinese-language optimization. Core design decisions:
-
Full parameter alignment: Preserves all official API parameters including
loggerandselection_method(DocSelectionType enum); -
Chinese language optimization: Explicitly passes
language="Chinese"to avoid auto-detection errors; -
Observable progress: Integrates
RichProgressLoggerfor real-time progress feedback.
import graphrag.api as api
from graphrag.prompt_tune.types import DocSelectionType
from graphrag.logger.rich_progress import RichProgressLogger
@router.post("/prompt")
async def run_prompt_tune(req: PromptTuneRequest):
# Abstract: config loading and logger initialization
config = load_graphrag_config(req)
progress_logger = RichProgressLogger(prefix="graphrag-prompt-tune")
# Core: enum mapping for document selection method
selection_map = {
"auto": DocSelectionType.AUTO,
"all": DocSelectionType.ALL,
"top": DocSelectionType.TOP,
}
doc_selection = selection_map.get(req.selection_method.lower(), DocSelectionType.RANDOM)
# Core: call official API to generate prompts
entity_prompt, community_prompt, summarize_prompt = await api.generate_indexing_prompts(
config=config,
logger=progress_logger,
root=req.root,
chunk_size=req.chunk_size,
overlap=req.overlap,
limit=req.limit,
selection_method=doc_selection,
domain=req.domain,
language="Chinese", # ⚠️ Always explicit — auto-detection can misidentify Chinese as English
max_tokens=req.max_tokens,
discover_entity_types=req.discover_entity_types,
min_examples_required=req.min_examples_required,
n_subset_max=req.n_subset_max,
k=req.k,
)
# Abstract: prompt persistence logic
save_prompts(req.output_dir, entity_prompt, community_prompt, summarize_prompt)
return {"status": "ok", "output_dir": req.output_dir}
Production Pitfall: When
languageis omitted, auto-detection occasionally misidentifies Chinese corpora as English, causing the generated prompt templates to be in the wrong language. Always passlanguage="Chinese"explicitly.
3.2 Index Building & Incremental Update API
Endpoint: POST /api/graphrag/index
A unified entry point for both full builds and incremental updates, controlled by the is_update flag — directly mapped to the official build_index parameter is_update_run. Core design decisions:
- Unified entry point: Eliminates the need for separate full/incremental endpoints, reducing caller complexity;
-
Configurable indexing strategy: Supports both
StandardandFastmodes to balance accuracy and speed; - Structured result response: Returns categorized workflow execution status for operational diagnostics.
import graphrag.api as api
from graphrag.config.enums import IndexingMethod
@router.post("/index")
async def run_index(req: IndexRequest):
# Abstract: config loading and logger initialization
config = load_graphrag_config(req)
progress_logger = RichProgressLogger(prefix="graphrag-index")
# Core: indexing strategy selection (Standard vs Fast)
index_strategy = IndexingMethod.Fast if req.index_method == "fast" else IndexingMethod.Standard
# Core: unified full/incremental build via build_index
index_result: list[PipelineRunResult] = await api.build_index(
config=config,
method=index_strategy,
is_update_run=req.is_update, # incremental update flag, natively supported
memory_profile=req.memory_profile,
progress_logger=progress_logger
)
# Abstract: result parsing and status response
error_results = [res for res in index_result if res.errors]
return {
"status": "ok" if not error_results else "partial_error",
"workflow_results": parse_workflow_results(index_result)
}
Multi-index isolation for incremental updates: In enterprise scenarios, CSV and PDF data require different chunking strategies. Isolation is achieved by specifying separate root directories per data source:
/graphrag/
├── data_source_a/ ← CSV order data, file_pattern: .*\.csv$
│ └── settings.yaml
└── data_source_b/ ← PDF product docs, file_pattern: .*\.md$
└── settings.yaml
Production Recommendation: Beyond directory isolation, each data source should have its own
settings.yamlwith independent chunking strategies and entity extraction rules to prevent cross-contamination.
3.3 Synchronous Query API
Endpoint: POST /api/query
Supports all four official query modes with full parameter alignment. Core design decisions:
-
Unified multi-mode entry point: Routes to the appropriate query function via
query_type, reducing caller complexity; - Traceable context: Custom callbacks capture query context to support result debugging and optimization;
- Layered exception handling: Separates parameter errors, business exceptions, and system errors per RESTful conventions.
Four Query Modes (fully aligned with the official API):
| Mode | Use Case | Data Dependencies | Speed |
|---|---|---|---|
basic |
Simple keyword matching | text_units | ⚡ Fastest |
local |
Precise entity queries (e.g. "Order #123 shipping") | entities, relationships, covariates | ⚡ Fast |
global |
Cross-section semantic understanding (e.g. "all return policies") | entities, communities, reports | 🐢 Slower |
drift |
Exploratory reasoning, multi-hop association | entities, communities, reports | 🐢 Slowest |
Implementation (core logic abstracted):
import graphrag.api as api
@router.post("/query")
async def query(request: QueryRequest):
try:
# Abstract: config and index data loading
config = load_graphrag_config(request)
index_data = await load_index_data(request)
# Core: custom callback to capture query context
context_container = {}
callbacks = NoopQueryCallbacks()
callbacks.on_context = lambda ctx: context_container.update({"ctx": ctx})
# Core: dictionary-based routing for all four query modes
query_type = request.query_type.lower()
query_map = {
"basic": api.basic_search,
"local": api.local_search,
"global": api.global_search,
"drift": api.drift_search,
}
if query_type not in query_map:
raise ValueError(f"Unsupported query type: '{query_type}'. Valid options: {list(query_map.keys())}")
response, _ = await query_map[query_type](
config=config,
data=index_data,
query=request.query,
callbacks=[callbacks]
)
# Abstract: context serialization (resolves DataFrame JSON serialization issue)
return {
"query": request.query,
"response": response,
"query_type": query_type,
"context": format_context(context_container["ctx"])
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error("Query error", exc_info=True)
raise HTTPException(status_code=500, detail="Query processing failed")
3.4 Streaming Query API
Endpoint: POST /api/query_stream
Based on the production implementation: execute the full query first, then stream the response in segments to support frontend SSE rendering. Core design decisions:
- Reuses core query logic: Guarantees consistency between synchronous and streaming results;
- SSE protocol compliance: Standard SSE format output, compatible with mainstream frontend frameworks;
- Exception safety: Errors during streaming do not drop the connection — they are returned as SSE error events.
@router.post("/query_stream")
async def query_stream(request: QueryRequest):
async def event_stream():
try:
yield "data: Processing query...\n\n"
# Reuse core query logic
response = await core_query_logic(request)
# Core: segment and stream the response
for segment in split_response(response, batch_size=25):
yield f"data: {segment}\n\n"
await asyncio.sleep(0.1)
yield "data: --- Stream complete ---\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error("Streaming query error", exc_info=True)
yield f"data: Error: {str(e)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
4. Production Pitfalls & Lessons Learned
4.1 DataFrame Serialization Error
Symptom: After local_search loads Parquet files, passing covariates raises TypeError: Object of type DataFrame is not JSON serializable.
Solution: Implement a format_context function at the data loading layer to uniformly convert DataFrames and custom objects into serializable strings/dicts before they reach the response layer.
4.2 SSE Stream Disconnection (Nginx Timeout)
Symptom: Global Search queries exceeding 30 seconds hit Nginx's default timeout, causing SSE connections to drop and the frontend to receive incomplete results.
Solution: Set proxy_read_timeout 120s in Nginx config. Additionally, emit intermediate status messages at the start and during processing to prevent the frontend from closing the connection due to inactivity.
4.3 Data Inconsistency After Incremental Updates
Symptom: After adding new files and running an incremental update, relationships between new and existing entities are not correctly rebuilt, leading to incomplete query results.
Solution: Before incremental updates, compare file MD5 hashes to identify added/modified/deleted files and process only the changed ones. After the update completes, re-run community detection to ensure entity relationship integrity.
5. Measurable Results
| Metric | Before (Raw CLI) | After (Production API) |
|---|---|---|
| Average response latency | ~3.0s | ~1.2s (with data preloading) |
| Index update method | Full rebuild (~30 min) | Incremental update (~5 min) |
| Streaming output | ❌ | ✅ SSE real-time push |
| Multi-index isolation | ❌ | ✅ Isolated by root directory |
| Automated operations support | ❌ | ✅ Full RESTful API coverage |
6. Summary & What's Next
This article completed the core transformation of GraphRAG from an open-source CLI tool into a production-grade service:
- Four core APIs: Fully aligned with the designed capabilities — prompt generation, index building, incremental updates, and query service — enabling standard RESTful integration with the customer service system.
- Streaming experience: SSE-based streaming delivers a frontend-friendly real-time response, significantly improving the conversational UX.
- Incremental stability: Multi-index isolation combined with incremental update optimization ensures data consistency in production environments.
In Part 4, we'll build on this service layer to design the multi-Agent orchestration architecture — covering how LangGraph coordinates the retrieval, reasoning, and response generation agents to handle complex customer service scenarios end to end.
GitHub Repository: Link TBD
Series Navigation: This is Part 3 of 8 Weeks from Zero to One: Full-Stack Engineering Practice for a Production-Grade LLM Customer Service System. It builds on the data pipeline from Part 2 and lays the foundation for the multi-Agent architecture in Part 4.
Top comments (0)