DEV Community

Cover image for GraphBit's Internal Execution Flow Architecture: Overview and Key Components
Yeahia Sarker
Yeahia Sarker

Posted on

GraphBit's Internal Execution Flow Architecture: Overview and Key Components

GraphBit's Internal Execution Flow Architecture: Overview and Key Components 

Overview: Internal Execution Flow Architecture 

GraphBit’s execution flow is implemented in Rust (graphbit_core) and orchestrated from Python bindings when used from Python. The core engine manages: 

  • Workflow validation and preparation 

  • Dependency-aware batching and concurrent execution 

  • Per-node execution with retries and circuit breakers 

  • Context updates and state transitions 

  • Performance modes mapping to concurrency configurations 

Python’s Executor wraps the core, adds execution modes, global timeout handling, and post-processing for LLM tool-calls (Python tool registry execution + final LLM call). 

Key Components and Responsibilities 

  • Core Rust 

  • Workflow and builder: building graphs, edges, validation 

  • WorkflowExecutor: execution lifecycle and concurrency 

  • ConcurrencyManager: per-node-type concurrency with atomics 

  • Node execution: agent, transform, condition, delay, document loader 

  • RetryConfig and CircuitBreaker integration 

  • WorkflowContext: node outputs, variables, state, stats 

  • Python bindings 

  • Workflow/Node wrappers: convenience APIs for building workflows 

  • Executor: maps to core executor modes; wraps execution in timeouts; handles tool-calls post-processing 

  • Tool system: registry, bridge to call Python functions, and execution results handling 

Concrete Code Examples (verified from repository) 

Core: validation and initial state 

// Set initial workflow state 

context.state = WorkflowState::Running { current_node: NodeId::new() }; 

// Validate workflow before execution 

workflow.validate()?; 

Core: auto-register agents and resolve config 

if !agent_exists { 

    // Resolve node-level/executor-level LLM config and system prompt 

    resolved_llm_config = self.resolve_llm_config_for_node(&node.config); 

    let mut default_config = AgentConfig::new(..., resolved_llm_config).with_id(agent_id.clone()); 

    if !system_prompt.is_empty() { default_config = default_config.with_system_prompt(system_prompt); } 

    match Agent::new(default_config).await { Ok(agent) => { ... } Err(e) => { return Err(...); } } 

Core: dependency metadata precomputation 

context.set_metadata("node_dependencies".to_string(), serde_json::to_value(deps_map).unwrap_or(json!({}))); 

context.set_metadata("node_id_to_name".to_string(), serde_json::to_value(id_name_map).unwrap_or(json!({}))); 

Core: dependency-aware batches 

while !remaining.is_empty() { 

    let mut ready_ids = Vec::new(); 

    for nid in remaining.iter() { 

        let deps = graph_clone.get_dependencies(nid); 

        if deps.iter().all(|d| completed.contains(d)) { ready_ids.push(nid.clone()); } 

    } 

    if ready_ids.is_empty() { return Err(GraphBitError::workflow_execution("No dependency-ready nodes found; graph may be cyclic or invalid".to_string())); } 

    // build batch from ready_ids ... 

    // mark completed/remove from remaining ... 

Core: concurrent execution of a batch 

let shared_context = Arc::new(Mutex::new(context)); 

let mut tasks = Vec::with_capacity(batch_size); 

for node in batch { 

    let task = tokio::spawn(async move { 

        let task_info = TaskInfo::from_node_type(&node.node_type, &node.id); 

        let _permits = if matches!(node.node_type, NodeType::Agent { .. }) { 

            Some(concurrency_manager.acquire_permits(&task_info).await?) 

        } else { None }; 

        Self::execute_node_with_retry(node, context_clone, agents_clone, circuit_breakers_clone, circuit_breaker_config, retry_config).await 

    }); 

    tasks.push(task); 

Core: per-node retries and circuit breaker 

if let Some(ref mut breaker) = circuit_breaker { if !breaker.should_allow_request() { return Ok(NodeExecutionResult::failure(...)); } } 

let result = match &node.node_type { NodeType::Agent {..} => Self::execute_agent_node_static(...).await, /* others */ }; 

match result { 

  Ok(output) => { /* store outputs; breaker.record_success() */ } 

  Err(error) => { 

    if let Some(ref config) = retry_config { if config.should_retry(&error, attempt) { attempt += 1; tokio::time::sleep(Duration::from_millis(config.calculate_delay(attempt))).await; continue; } } 

    return Ok(NodeExecutionResult::failure(error.to_string(), node.id.clone()).with_retry_count(attempt)); 

  } 

Core: storing outputs and variables into context 

let mut ctx = context.lock().await; 

ctx.set_node_output(&node.id, output.clone()); 

ctx.set_node_output_by_name(&node.name, output.clone()); 

if let Ok(output_str) = serde_json::to_string(&output) { 

    ctx.set_variable(node.name.clone(), serde_json::Value::String(output_str.clone())); 

    ctx.set_variable(node.id.to_string(), serde_json::Value::String(output_str)); 

Core: state transitions, stats and completion 

if should_fail_fast { let mut ctx = shared_context.lock().await; ctx.fail(failure_message); return Ok(Arc::try_unwrap(shared_context).unwrap().into_inner()); } 

// ... 

context.set_stats(stats); 

context.complete(); 

Core: implicit preamble and template resolution for agent prompts 

let combined = format!("{implicit_preamble}[Task]\n{prompt_template}"); 

let resolved = Self::resolve_template_variables(&combined, &ctx); 

for cap in NODE_REF_PATTERN.captures_iter(template) { 

    if let Some(reference) = cap.get(1) { 

        if let Some(value) = context.get_nested_output(reference.as_str()) { 

            result = result.replace(&cap[0], &value_str); 

        } 

    } 

Core: concurrency manager’s atomic acquisition loop 

loop { 

    let current = current_count.load(Ordering::Acquire); 

    if current < max_concurrent { 

        match current_count.compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire) { 

            Ok(_) => break, Err(_) => continue 

        } 

    } 

    wait_queue.notified().await; 

Core: graph caches to reduce recomputation 

dependencies_cache: HashMap>, 

dependents_cache: HashMap>, 

Python: executor validation, timeout, and dispatch to core 

if let Err(e) = workflow.inner.validate() { return Err(validation_error("workflow", None, &format!("Invalid workflow: {}", e))); } 

let result = get_runtime().block_on(async move { 

    tokio::time::timeout(timeout_duration, async move { 

        Self::execute_workflow_internal(llm_config, workflow_clone, config).await 

    }).await 

}); 

Python: mapping modes to core executor strategies 

match config.mode { 

  HighThroughput => CoreWorkflowExecutor::new_high_throughput().with_default_llm_config(llm_config.clone()), 

  LowLatency => CoreWorkflowExecutor::new_low_latency().with_default_llm_config(llm_config.clone()).without_retries().with_fail_fast(true), 

  MemoryOptimized | Balanced => CoreWorkflowExecutor::new_high_throughput().with_default_llm_config(llm_config.clone()), 

Python: tool-calls post-processing and final LLM call 

if response_type == "tool_calls_required" { 

  let tool_results = Python::with_gil(|py| execute_production_tool_calls(py, tool_calls_json, node_tools))?; 

  let final_request = LlmRequest::new(final_prompt); 

  match llm_provider.complete(final_request).await { /* update context with final response or tool results */ } 

Python: ToolRegistry history trimming (memory safety) 

history.push(result); 

// Keep only last 1000 executions to prevent memory bloat 

if history.len() > 1000 { let len = history.len(); history.drain(0..len - 1000); } 

Step-by-Step Execution Process and Data Flow 

  1. Build and validate workflow 
  • Workflow.add_node/graph.add_edge define topology 

  • Workflow.validate validates graph (cycle checks etc.) 

  • Python Executor also calls validation 

  1. Executor initialization and mode selection 
  • Python Executor selects an ExecutionMode and maps to a core WorkflowExecutor with: 

  • HighThroughput: more concurrency, retries, non-fail-fast 

  • LowLatency: fewer retries (none), fail-fast enabled, shorter timeouts 

  • MemoryOptimized/Balanced: tailored configs 

  1. Preparation in core 
  • Set WorkflowState::Running 

  • Validate again (defensive) 

  • Auto-register any missing agents from NodeType::Agent nodes 

  • Resolve LLM config priority: node-level > executor-level 

  • Apply system_prompt from node config 

  • Create Agent; errors (e.g., invalid API key) fail early 

  • Precompute dependency metadata (node_dependencies, id->name) and store in context.metadata 

  1. Plan execution order 
  • create_dependency_batches produces layers of “ready” nodes with all parents completed; batches execute sequentially, nodes within a batch execute concurrently 
  1. Execute a batch 
  • Spawn a tokio task per node 

  • Acquire concurrency permits only for agent nodes (skips overhead for non-agent nodes) 

  • execute_node_with_retry handles: 

  • Circuit breaker gating 

  • Node execution by type 

  • Retry policy (exponential backoff/jitter as configured) 

  • On success: store JSON output in context by node id and name; also as string variables for compatibility; record breaker success 

  • On failure: record breaker failure; check retry; propagate final failure result 

  • Fail-fast logic: 

  • If enabled (low-latency mode) or certain auth/config errors are detected, executor immediately fails workflow and returns 

  1. Context updates and data flow 
  • For each node, outputs are stored as JSON by id and by name in WorkflowContext.node_outputs 

  • Variables are also set (stringified JSON) for compatibility and template resolution 

  • Agent prompt resolution uses: 

  • Implicit preamble of direct parents’ outputs (both titled and JSON “Context JSON” block) 

  • resolve_template_variables for explicit {{node...}} placeholders and legacy {var} placeholders 

  1. Tool-calls orchestration (if present) 
  • Agent execution with tools returns a structured “tool_calls_required” response 

  • Python Executor reads node output, executes tools against Python registry (execute_production_tool_calls), composes final prompt, and performs a final LLM completion 

  • Final completion is written back into WorkflowContext for that node (overwriting the intermediate tool-calls marker) 

  1. Completion and stats 
  • After all batches, WorkflowExecutionStats computed (counts, average, total time, concurrency) and stored in context 

  • WorkflowContext.complete() sets final state; on early fail, context.fail(message) sets failed state 

Detailed Implementation Insights 

Workflow validation and preparation 

  • Defensive validation at both Python and Rust layers 

  • Auto-registration of agents from graph ensures missing registrations do not break execution 

  • LLM config resolution hierarchy avoids hidden defaults; explicit configuration is encouraged 

Node dependency resolution and ordering 

  • Strict layer-by-layer readiness ensures parents complete before children 

  • Cycles or unresolved dependencies lead to an explicit error 

Concurrency management and resource allocation 

  • ConcurrencyManager tracks per-node-type limits using atomic counters (no global semaphore bottleneck) 

  • Only agent nodes acquire permits (reduces overhead for lightweight nodes like conditions/transforms/delays) 

  • Detailed stats tracking inside ConcurrencyManager (available via get_concurrency_stats) 

State transitions and context updates 

  • Running → Completed or Failed 

  • Node outputs stored in JSON, by id and name, plus variables; enables flexible data access and template substitution 

  • Dependency metadata cached in context for efficient prompt preambles and resolution 

Error handling and retry mechanisms 

  • Circuit breakers per agent (open/closed) to avoid cascading failures 

  • RetryConfig supports retries with backoff; disabled in low-latency mode 

  • “Auth/config” error detection triggers fail-fast even when fail_fast is false 

Performance optimizations and execution modes 

  • HighThroughput: greater concurrency, retries on, fail_fast off 

  • LowLatency: minimal retries (none), fail_fast on, shorter overall timeouts 

  • MemoryOptimized: fewer resources allocated (e.g., smaller agent maps), metrics toggles via Python Executor 

  • Pre-allocations: tasks Vec with capacity; results Vec; lock scope minimization via drop() 

  • Skipping permit acquisition for non-agent nodes 

Memory management and cleanup 

  • Rust uses Arc/Arc to avoid copies; lock scopes are limited deliberately (drop(agents_guard)) 

  • Pre-allocations for vectors reduce re-allocations under load 

  • WorkflowGraph caches dependencies/dependents to avoid recomputation 

  • Python ToolRegistry trims execution history to last 1000 to cap memory growth 

  • Stats include placeholders for memory tracking; safe-by-default design leaves destructive operations out of execution flow 

Interaction Between Rust Core and Python Bindings 

  • Python Executor: 

  • Validates workflow 

  • Applies a global timeout around the core execution future 

  • Selects the core execution mode and provides default LLM config 

  • Stores the LLM config in context metadata for later tool orchestration 

  • Scans node outputs for “tool_calls_required,” executes Python tools, and then makes a final LLM completion using graphbit_core LLM provider 

  • Async: 

  • Core uses tokio tasks per node for concurrency; batches run sequentially 

  • Python wraps the async future, using tokio timeout to enforce end-to-end time limits 

Additional Useful Snippets 

  • Collect executable nodes helper: 

fn collect_executable_nodes(graph: &WorkflowGraph) -> GraphBitResult> { 

    let nodes: Vec = graph.get_nodes().values().cloned().collect(); 

    Ok(nodes) 

  • Agent execution mode switch (with tools): 

if has_tools { 

    Self::execute_agent_with_tools(agent_id, &resolved_prompt, node_config, agent).await 

} else { 

    let message = AgentMessage::new(agent_id.clone(), None, MessageContent::Text(resolved_prompt)); 

    agent.execute(message).await 

Summary 

The execution flow in GraphBit is a robust, dependency-aware, concurrent pipeline: 

  • Validate and prepare (agents, config, dependency maps) 

  • Batch nodes respecting dependencies; execute each batch concurrently 

  • Agent nodes are concurrency-limited; non-agent nodes avoid overhead 

  • Per-node retries with backoff and circuit breakers; fail-fast when appropriate 

  • Strong context updates for both JSON outputs and legacy variables 

  • Python layer orchestrates LLM tool-calls and final responses, under a global timeout and mode-based configuration

Github: https://github.com/InfinitiBit/graphbit

Top comments (0)