DEV Community

Cover image for Visual Representation Concepts for Workflow Execution and Agent Coordination
Yeahia Sarker
Yeahia Sarker

Posted on

Visual Representation Concepts for Workflow Execution and Agent Coordination

Executive summary

  • Workflows are the top-level orchestration: they contain a WorkflowGraph of nodes and edges, metadata, and validation.
  • Nodes are the atomic execution units in a workflow: a WorkflowNode holds a NodeType (Agent, Condition, Transform, DocumentLoader, etc.), its config and policies (retry, timeout).
  • Agents implement behavior for Agent nodes: an Agent (via AgentTrait) consumes an AgentMessage and returns structured JSON; it has an AgentConfig and an LLM provider.

Hierarchy and relationships:

  • A Workflow contains a WorkflowGraph.
  • The WorkflowGraph contains WorkflowNodes (and WorkflowEdges).
  • An Agent node is a WorkflowNode whose NodeType is Agent, which references an Agent by AgentId.
  • During execution, the WorkflowExecutor schedules nodes, and Agent nodes resolve the Agent and call AgentTrait::execute; outputs are written into WorkflowContext for downstream nodes.

A compact visual:

  • Workflows contain Nodes, and Agent Nodes contain Agents.

Mermaid diagram (click to expand in your UI):

[Diagram: GraphBit core concepts: Workflows, Nodes, Agents]

Core definitions from the codebase

Workflow: structure and orchestration

  • Types and builder:

#[derive(Debug, Clone, Serialize, Deserialize)]

pub struct Workflow {

pub id: WorkflowId,

pub name: String,

pub description: String,

pub graph: WorkflowGraph,

pub metadata: HashMap<String, serde_json::Value>,

}

impl Workflow {

pub fn new(name: impl Into<String>, description: impl Into<String>) -> Self { /*...*/ }

pub fn add_node(&mut self, node: WorkflowNode) -> GraphBitResult<NodeId> { /*...*/ }

pub fn connect_nodes(&mut self, from: NodeId, to: NodeId, edge: WorkflowEdge) -> GraphBitResult<()> { /*...*/ }

pub fn validate(&self) -> GraphBitResult<()> { self.graph.validate() }

}

pub struct WorkflowBuilder { workflow: Workflow }

impl WorkflowBuilder {

pub fn new(name: impl Into<String>) -> Self { /*...*/ }

pub fn add_node(mut self, node: WorkflowNode) -> GraphBitResult<(Self, NodeId)> { /*...*/ }

pub fn connect(mut self, from: NodeId, to: NodeId, edge: WorkflowEdge) -> GraphBitResult<Self> { /*...*/ }

pub fn build(self) -> GraphBitResult<Workflow> { self.workflow.validate()?; Ok(self.workflow) }

}

  • Graph and validation:

#[derive(Debug, Clone, Serialize, Deserialize)]

pub struct WorkflowGraph {

#[serde(skip)] graph: DiGraph<WorkflowNode, WorkflowEdge>,

#[serde(skip)] node_map: HashMap<NodeId, NodeIndex>,

nodes: HashMap<NodeId, WorkflowNode>,

edges: Vec<(NodeId, NodeId, WorkflowEdge)>,

/* caches... */

}

impl WorkflowGraph {

pub fn add_node(&mut self, node: WorkflowNode) -> GraphBitResult<()> { /* add; index; cache */ }

pub fn add_edge(&mut self, from: NodeId, to: NodeId, edge: WorkflowEdge) -> GraphBitResult<()> { /* ... */ }

pub fn validate(&self) -> GraphBitResult<()> {

   `if self.has_cycles() { return Err(GraphBitError::graph("Workflow graph contains cycles")); }`

   `for node in self.nodes.values() { node.validate()?; }`

   `// Enforce unique agent IDs across all agent nodes`

   `/* ... */`

   `Ok(())`
Enter fullscreen mode Exit fullscreen mode

}

}

Node: types and role

  • WorkflowNode structure and builder helpers:

#[derive(Debug, Clone, Serialize, Deserialize)]

pub struct WorkflowNode {

pub id: NodeId,

pub name: String,

pub description: String,

pub node_type: NodeType,

pub config: HashMap<String, serde_json::Value>,

pub input_schema: Option<serde_json::Value>,

pub output_schema: Option<serde_json::Value>,

pub retry_config: RetryConfig,

pub timeout_seconds: Option<u64>,

pub tags: Vec<String>,

}

impl WorkflowNode {

pub fn new(name: impl Into<String>, description: impl Into<String>, node_type: NodeType) -> Self { /*...*/ }

pub fn with_retry_config(mut self, retry_config: RetryConfig) -> Self { self.retry_config = retry_config; self }

pub fn validate(&self) -> GraphBitResult<()> {

   `match &self.node_type {`

       `NodeType::Agent { agent_id, .. } => { if agent_id.to_string().is_empty() { return Err(GraphBitError::graph("Agent node must have a valid agent_id")); } }`

       `/* other type checks */`

       `_ => {}`

   `}`

   `Ok(())`
Enter fullscreen mode Exit fullscreen mode

}

}

  • NodeType variants:

#[derive(Debug, Clone, Serialize, Deserialize)]

#[serde(tag = "type")]

pub enum NodeType {

Agent { agent_id: crate::types::AgentId, prompt_template: String },

Condition { expression: String },

Transform { transformation: String },

Split, Join,

Delay { duration_seconds: u64 },

HttpRequest { url: String, method: String, headers: HashMap<String, String> },

Custom { function_name: String },

DocumentLoader { document_type: String, source_path: String, encoding: Option<String> },

}

Agent: definition and lifecycle

  • AgentTrait and Agent struct:

#[async_trait]

pub trait AgentTrait: Send + Sync {

fn id(&self) -> &AgentId;

fn config(&self) -> &AgentConfig;

async fn process_message(&self, message: AgentMessage, context: &mut WorkflowContext) -> GraphBitResult<AgentMessage>;

async fn execute(&self, message: AgentMessage) -> GraphBitResult<serde_json::Value>;

async fn validate_output(&self, output: &str, schema: &serde_json::Value) -> ValidationResult;

fn llm_provider(&self) -> &LlmProvider;

}

pub struct Agent {

config: AgentConfig,

llm_provider: LlmProvider,

validator: TypeValidator,

}

  • AgentConfig (identity, system prompt, LLM config):

#[derive(Debug, Clone, Serialize, Deserialize)]

pub struct AgentConfig {

pub id: AgentId,

pub name: String,

pub description: String,

pub capabilities: Vec<AgentCapability>,

pub system_prompt: String,

pub llm_config: crate::llm::LlmConfig,

pub max_tokens: Option<u32>,

pub temperature: Option<f32>,

pub custom_config: HashMap<String, serde_json::Value>,

}

  • Agent creation validates LLM configuration:

impl Agent {

pub async fn new(config: AgentConfig) -> GraphBitResult<Self> {

   `let provider = crate::llm::LlmProviderFactory::create_provider(config.llm_config.clone())?;`

   `let llm_provider = LlmProvider::new(provider, config.llm_config.clone());`

   `let test_request = LlmRequest::new("test").with_max_tokens(1);`

   `if let Err(e) = llm_provider.complete(test_request).await {`

       `return Err(crate::errors::GraphBitError::config(format!("LLM configuration validation failed: {e}")));`

   `}`

   `Ok(Self { config, llm_provider, validator: TypeValidator::new() })`
Enter fullscreen mode Exit fullscreen mode

}

}

How they interact at runtime

  • Workflows schedule nodes; Agent nodes call the corresponding Agent’s execute:

match &node.node_type {

NodeType::Agent { agent_id, prompt_template } => {

   `Self::execute_agent_node_static(&node.id, agent_id, prompt_template, &node.config, context.clone(), agents.clone()).await`
Enter fullscreen mode Exit fullscreen mode

}

/* other node kinds */

}

  • Agent execution within a node:

async fn execute_agent_node_static(

current_node_id: &NodeId,

agent_id: &crate::types::AgentId,

prompt_template: &str,

node_config: &HashMap<String, serde_json::Value>,

context: Arc<Mutex<WorkflowContext>>,

agents: Arc<RwLock<HashMap<AgentId, Arc<dyn AgentTrait>>>>,

) -> GraphBitResult<serde_json::Value> {

let agents_guard = agents.read().await;

let agent = agents_guard.get(agent_id).ok_or_else(|| GraphBitError::agent_not_found(agent_id.to_string()))?.clone();

/* build prompt incl. parent outputs; then: */

let message = AgentMessage::new(agent_id.clone(), None, MessageContent::Text(resolved_prompt));

agent.execute(message).await

}

  • Writing outputs into WorkflowContext:

let mut ctx = shared_context.lock().await;

if let Some(node) = workflow.graph.get_node(&node_result.node_id) {

ctx.set_node_output(&node.id, node_result.output.clone());

ctx.set_node_output_by_name(&node.name, node_result.output.clone());

if let Ok(output_str) = serde_json::to_string(&node_result.output) {

   `ctx.set_variable(format!("{}_output_str", node.name), serde_json::Value::String(output_str));`
Enter fullscreen mode Exit fullscreen mode

}

}

Hierarchical relationships and data flow

  • Containment
    • Workflow owns WorkflowGraph; WorkflowGraph owns WorkflowNodes and WorkflowEdges.
    • Agent nodes embed an AgentId which the executor resolves to a registered Agent (AgentTrait).
  • Execution
    • WorkflowExecutor batches nodes by dependency; for Agent nodes it:
    • Resolves the agent (registered or auto-created depending on executor config)
    • Builds the AgentMessage (prompt template + implicit preamble from parent outputs)
    • Calls AgentTrait::execute and returns JSON output
    • Outputs are committed to WorkflowContext.node_outputs keyed by node id and name.
  • Data flow
    • Node outputs (JSON) flow via context to downstream nodes; prompt templates can reference upstream outputs and the executor auto-injects a preamble from direct parents to Agent prompts.

Step-by-step:

  1. Build Workflow via WorkflowBuilder; add WorkflowNodes; connect with WorkflowEdges.
  2. Executor::execute validates the workflow and plans batches based on graph dependencies.
  3. For each node in a batch:
    • If Agent node: resolve Agent by AgentId, build prompt (template + preamble), call Agent::execute
    • Otherwise, run the node-specific handler (Transform, Condition, DocumentLoader, etc.)
  4. Store node outputs in WorkflowContext by id and by name; variables get a stringified form for compatibility.
  5. Downstream nodes read from context; Agent nodes can read parent outputs via template/preamble.
  6. Workflow completes when all nodes execute or fail-fast triggers.

Structural differences distilled

  • Agent
    • Behavior provider implementing AgentTrait
    • Has identity/config (AgentConfig) and an LLM provider
    • Unit of execution logic, not part of the graph structure directly (referenced by AgentId)
  • Node
    • Structural element in the graph; wraps behavior via NodeType
    • Agent node variant bridges the graph to an Agent
    • Holds policies (retry, timeout) and schemas for validation
  • Workflow
    • Container and orchestrator; owns the graph and metadata
    • Built with a builder; validated before execution
    • Executed by WorkflowExecutor which coordinates nodes and agents

Implementation insights

  • Agents are embedded into nodes indirectly: NodeType::Agent contains the AgentId; at runtime the executor looks up the Agent and invokes it. This keeps the graph serializable and agents swap-able.
  • Node types isolate concerns: Agent (LLM call), Condition (branching), Transform (data ops), DocumentLoader (I/O), etc. The executor dispatches based on NodeType.
  • WorkflowGraph maintains caches to accelerate dependency queries; validation enforces no cycles and unique Agent IDs across nodes to avoid ambiguous routing.
  • WorkflowExecutor centralizes execution policies like retries, circuit breakers, and fail-fast. It writes results into WorkflowContext for data passing.

Visual representation concepts

  • Nested boxes/tree:
    • Workflow (name, id)
    • WorkflowGraph
      • Node[A] (NodeType::Agent → AgentId)
      • Node[B] (NodeType::Transform)
      • Edge A → B (data flow)
    • A separate Agents registry:
    • AgentId → Agent (AgentConfig, LLM)
    • Execution path:
    • Executor coordinates nodes in dependency-respecting batches; for Agent nodes, it resolves AgentId → Agent and calls execute()
  • Sequence diagram idea:
    • Executor → WorkflowGraph: get next executable nodes
    • Executor → Node(Agent): resolve AgentId; fetch Agent
    • Executor → Agent.execute(message)
    • Agent → LLMProvider.complete(request)
    • Executor → WorkflowContext.set_node_output(node_id, output)
    • Downstream Node reads from context

GraphBit High‑Level Architecture — Workflows, Nodes, Agents, Orchestration, and Data Flow

GraphBit — Structural hierarchy-Workflow-Node-Agent

GraphBit — Execution path flowchart-Workflow-Node-Agent-- (Orchestration and data flow)

GraphBit — Agent node runtime sequence

Top comments (0)