GraphBit's Internal Execution Flow Architecture: Overview and Key Components
Overview: Internal Execution Flow Architecture
GraphBit’s execution flow is implemented in Rust (graphbit_core) and orchestrated from Python bindings when used from Python. The core engine manages:
Workflow validation and preparation
Dependency-aware batching and concurrent execution
Per-node execution with retries and circuit breakers
Context updates and state transitions
Performance modes mapping to concurrency configurations
Python’s Executor wraps the core, adds execution modes, global timeout handling, and post-processing for LLM tool-calls (Python tool registry execution + final LLM call).
Key Components and Responsibilities
Core Rust
Workflow and builder: building graphs, edges, validation
WorkflowExecutor: execution lifecycle and concurrency
ConcurrencyManager: per-node-type concurrency with atomics
Node execution: agent, transform, condition, delay, document loader
RetryConfig and CircuitBreaker integration
WorkflowContext: node outputs, variables, state, stats
Python bindings
Workflow/Node wrappers: convenience APIs for building workflows
Executor: maps to core executor modes; wraps execution in timeouts; handles tool-calls post-processing
Tool system: registry, bridge to call Python functions, and execution results handling
Concrete Code Examples (verified from repository)
Core: validation and initial state
// Set initial workflow state
context.state = WorkflowState::Running { current_node: NodeId::new() };
// Validate workflow before execution
workflow.validate()?;
Core: auto-register agents and resolve config
if !agent_exists {
// Resolve node-level/executor-level LLM config and system prompt
resolved_llm_config = self.resolve_llm_config_for_node(&node.config);
let mut default_config = AgentConfig::new(..., resolved_llm_config).with_id(agent_id.clone());
if !system_prompt.is_empty() { default_config = default_config.with_system_prompt(system_prompt); }
match Agent::new(default_config).await { Ok(agent) => { ... } Err(e) => { return Err(...); } }
}
Core: dependency metadata precomputation
context.set_metadata("node_dependencies".to_string(), serde_json::to_value(deps_map).unwrap_or(json!({})));
context.set_metadata("node_id_to_name".to_string(), serde_json::to_value(id_name_map).unwrap_or(json!({})));
Core: dependency-aware batches
while !remaining.is_empty() {
let mut ready_ids = Vec::new();
for nid in remaining.iter() {
let deps = graph_clone.get_dependencies(nid);
if deps.iter().all(|d| completed.contains(d)) { ready_ids.push(nid.clone()); }
}
if ready_ids.is_empty() { return Err(GraphBitError::workflow_execution("No dependency-ready nodes found; graph may be cyclic or invalid".to_string())); }
// build batch from ready_ids ...
// mark completed/remove from remaining ...
}
Core: concurrent execution of a batch
let shared_context = Arc::new(Mutex::new(context));
let mut tasks = Vec::with_capacity(batch_size);
for node in batch {
let task = tokio::spawn(async move {
let task_info = TaskInfo::from_node_type(&node.node_type, &node.id);
let _permits = if matches!(node.node_type, NodeType::Agent { .. }) {
Some(concurrency_manager.acquire_permits(&task_info).await?)
} else { None };
Self::execute_node_with_retry(node, context_clone, agents_clone, circuit_breakers_clone, circuit_breaker_config, retry_config).await
});
tasks.push(task);
}
Core: per-node retries and circuit breaker
if let Some(ref mut breaker) = circuit_breaker { if !breaker.should_allow_request() { return Ok(NodeExecutionResult::failure(...)); } }
let result = match &node.node_type { NodeType::Agent {..} => Self::execute_agent_node_static(...).await, /* others */ };
match result {
Ok(output) => { /* store outputs; breaker.record_success() */ }
Err(error) => {
if let Some(ref config) = retry_config { if config.should_retry(&error, attempt) { attempt += 1; tokio::time::sleep(Duration::from_millis(config.calculate_delay(attempt))).await; continue; } }
return Ok(NodeExecutionResult::failure(error.to_string(), node.id.clone()).with_retry_count(attempt));
}
}
Core: storing outputs and variables into context
let mut ctx = context.lock().await;
ctx.set_node_output(&node.id, output.clone());
ctx.set_node_output_by_name(&node.name, output.clone());
if let Ok(output_str) = serde_json::to_string(&output) {
ctx.set_variable(node.name.clone(), serde_json::Value::String(output_str.clone()));
ctx.set_variable(node.id.to_string(), serde_json::Value::String(output_str));
}
Core: state transitions, stats and completion
if should_fail_fast { let mut ctx = shared_context.lock().await; ctx.fail(failure_message); return Ok(Arc::try_unwrap(shared_context).unwrap().into_inner()); }
// ...
context.set_stats(stats);
context.complete();
Core: implicit preamble and template resolution for agent prompts
let combined = format!("{implicit_preamble}[Task]\n{prompt_template}");
let resolved = Self::resolve_template_variables(&combined, &ctx);
for cap in NODE_REF_PATTERN.captures_iter(template) {
if let Some(reference) = cap.get(1) {
if let Some(value) = context.get_nested_output(reference.as_str()) {
result = result.replace(&cap[0], &value_str);
}
}
}
Core: concurrency manager’s atomic acquisition loop
loop {
let current = current_count.load(Ordering::Acquire);
if current < max_concurrent {
match current_count.compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire) {
Ok(_) => break, Err(_) => continue
}
}
wait_queue.notified().await;
}
Core: graph caches to reduce recomputation
dependencies_cache: HashMap>,
dependents_cache: HashMap>,
Python: executor validation, timeout, and dispatch to core
if let Err(e) = workflow.inner.validate() { return Err(validation_error("workflow", None, &format!("Invalid workflow: {}", e))); }
let result = get_runtime().block_on(async move {
tokio::time::timeout(timeout_duration, async move {
Self::execute_workflow_internal(llm_config, workflow_clone, config).await
}).await
});
Python: mapping modes to core executor strategies
match config.mode {
HighThroughput => CoreWorkflowExecutor::new_high_throughput().with_default_llm_config(llm_config.clone()),
LowLatency => CoreWorkflowExecutor::new_low_latency().with_default_llm_config(llm_config.clone()).without_retries().with_fail_fast(true),
MemoryOptimized | Balanced => CoreWorkflowExecutor::new_high_throughput().with_default_llm_config(llm_config.clone()),
}
Python: tool-calls post-processing and final LLM call
if response_type == "tool_calls_required" {
let tool_results = Python::with_gil(|py| execute_production_tool_calls(py, tool_calls_json, node_tools))?;
let final_request = LlmRequest::new(final_prompt);
match llm_provider.complete(final_request).await { /* update context with final response or tool results */ }
}
Python: ToolRegistry history trimming (memory safety)
history.push(result);
// Keep only last 1000 executions to prevent memory bloat
if history.len() > 1000 { let len = history.len(); history.drain(0..len - 1000); }
Step-by-Step Execution Process and Data Flow
- Build and validate workflow
Workflow.add_node/graph.add_edge define topology
Workflow.validate validates graph (cycle checks etc.)
Python Executor also calls validation
- Executor initialization and mode selection
Python Executor selects an ExecutionMode and maps to a core WorkflowExecutor with:
HighThroughput: more concurrency, retries, non-fail-fast
LowLatency: fewer retries (none), fail-fast enabled, shorter timeouts
MemoryOptimized/Balanced: tailored configs
- Preparation in core
Set WorkflowState::Running
Validate again (defensive)
Auto-register any missing agents from NodeType::Agent nodes
Resolve LLM config priority: node-level > executor-level
Apply system_prompt from node config
Create Agent; errors (e.g., invalid API key) fail early
Precompute dependency metadata (node_dependencies, id->name) and store in context.metadata
- Plan execution order
- create_dependency_batches produces layers of “ready” nodes with all parents completed; batches execute sequentially, nodes within a batch execute concurrently
- Execute a batch
Spawn a tokio task per node
Acquire concurrency permits only for agent nodes (skips overhead for non-agent nodes)
execute_node_with_retry handles:
Circuit breaker gating
Node execution by type
Retry policy (exponential backoff/jitter as configured)
On success: store JSON output in context by node id and name; also as string variables for compatibility; record breaker success
On failure: record breaker failure; check retry; propagate final failure result
Fail-fast logic:
If enabled (low-latency mode) or certain auth/config errors are detected, executor immediately fails workflow and returns
- Context updates and data flow
For each node, outputs are stored as JSON by id and by name in WorkflowContext.node_outputs
Variables are also set (stringified JSON) for compatibility and template resolution
Agent prompt resolution uses:
Implicit preamble of direct parents’ outputs (both titled and JSON “Context JSON” block)
resolve_template_variables for explicit {{node...}} placeholders and legacy {var} placeholders
- Tool-calls orchestration (if present)
Agent execution with tools returns a structured “tool_calls_required” response
Python Executor reads node output, executes tools against Python registry (execute_production_tool_calls), composes final prompt, and performs a final LLM completion
Final completion is written back into WorkflowContext for that node (overwriting the intermediate tool-calls marker)
- Completion and stats
After all batches, WorkflowExecutionStats computed (counts, average, total time, concurrency) and stored in context
WorkflowContext.complete() sets final state; on early fail, context.fail(message) sets failed state
Detailed Implementation Insights
Workflow validation and preparation
Defensive validation at both Python and Rust layers
Auto-registration of agents from graph ensures missing registrations do not break execution
LLM config resolution hierarchy avoids hidden defaults; explicit configuration is encouraged
Node dependency resolution and ordering
Strict layer-by-layer readiness ensures parents complete before children
Cycles or unresolved dependencies lead to an explicit error
Concurrency management and resource allocation
ConcurrencyManager tracks per-node-type limits using atomic counters (no global semaphore bottleneck)
Only agent nodes acquire permits (reduces overhead for lightweight nodes like conditions/transforms/delays)
Detailed stats tracking inside ConcurrencyManager (available via get_concurrency_stats)
State transitions and context updates
Running → Completed or Failed
Node outputs stored in JSON, by id and name, plus variables; enables flexible data access and template substitution
Dependency metadata cached in context for efficient prompt preambles and resolution
Error handling and retry mechanisms
Circuit breakers per agent (open/closed) to avoid cascading failures
RetryConfig supports retries with backoff; disabled in low-latency mode
“Auth/config” error detection triggers fail-fast even when fail_fast is false
Performance optimizations and execution modes
HighThroughput: greater concurrency, retries on, fail_fast off
LowLatency: minimal retries (none), fail_fast on, shorter overall timeouts
MemoryOptimized: fewer resources allocated (e.g., smaller agent maps), metrics toggles via Python Executor
Pre-allocations: tasks Vec with capacity; results Vec; lock scope minimization via drop()
Skipping permit acquisition for non-agent nodes
Memory management and cleanup
Rust uses Arc/Arc to avoid copies; lock scopes are limited deliberately (drop(agents_guard))
Pre-allocations for vectors reduce re-allocations under load
WorkflowGraph caches dependencies/dependents to avoid recomputation
Python ToolRegistry trims execution history to last 1000 to cap memory growth
Stats include placeholders for memory tracking; safe-by-default design leaves destructive operations out of execution flow
Interaction Between Rust Core and Python Bindings
Python Executor:
Validates workflow
Applies a global timeout around the core execution future
Selects the core execution mode and provides default LLM config
Stores the LLM config in context metadata for later tool orchestration
Scans node outputs for “tool_calls_required,” executes Python tools, and then makes a final LLM completion using graphbit_core LLM provider
Async:
Core uses tokio tasks per node for concurrency; batches run sequentially
Python wraps the async future, using tokio timeout to enforce end-to-end time limits
Additional Useful Snippets
- Collect executable nodes helper:
fn collect_executable_nodes(graph: &WorkflowGraph) -> GraphBitResult> {
let nodes: Vec = graph.get_nodes().values().cloned().collect();
Ok(nodes)
}
- Agent execution mode switch (with tools):
if has_tools {
Self::execute_agent_with_tools(agent_id, &resolved_prompt, node_config, agent).await
} else {
let message = AgentMessage::new(agent_id.clone(), None, MessageContent::Text(resolved_prompt));
agent.execute(message).await
}
Summary
The execution flow in GraphBit is a robust, dependency-aware, concurrent pipeline:
Validate and prepare (agents, config, dependency maps)
Batch nodes respecting dependencies; execute each batch concurrently
Agent nodes are concurrency-limited; non-agent nodes avoid overhead
Per-node retries with backoff and circuit breakers; fail-fast when appropriate
Strong context updates for both JSON outputs and legacy variables
Python layer orchestrates LLM tool-calls and final responses, under a global timeout and mode-based configuration
Top comments (0)