Introduction
The Microsoft Agent Framework (MAF) empowers developers to build automated workflows that blend AI agents with business processes. It allows us to orchestrate complex tasks by connecting intelligent components that can act, reason, and collaborate.
How Are Workflows Different from AI Agents?
Although both AI agents and workflows can include multiple steps to achieve a goal, they serve different purposes and operate at different levels of abstraction.
🤖 AI Agent
Think of every human as a kind of AI agent our brain acts like a large language model (LLM), while our eyes and ears serve as tools that bring in external context. Similarly, AI agents use tools or MCP servers to gather external information and maintain short-term and long-term memory, just like humans remember past conversations or experiences.
🔄 Workflow
Workflows, on the other hand, are like the human body’s digestive system a predefined and structured sequence of steps. Each step performs a specific function and passes the result to the next.
For example, when you receive sensory input, your brain transforms it into signals, leading to actions. This resembles how business workflows process data through a series of interconnected executors.
In short:
Agents make dynamic decisions in real-time.
Workflows follow explicitly defined paths for predictable and controlled execution.
⚙️ Microsoft Agent Framework – Core Components
MAF workflows are made up of four main building blocks:
- Executors
- Edges
- Workflows
- Events
🧠 Executors — The Brains of the Workflow
Executors are the fundamental processing units in a workflow.
They:
- Receive typed messages
- Perform operations
- Produce outputs as new messages or events
- Every executor inherits from the base Executor class and can process specific message types using methods decorated with the @handler decorator.
💻 Example: Building a Sentiment Analyzer Executor
from agent_framework import Executor, WorkflowContext, handler
from textblob import TextBlob
class SentimentAnalyzer(Executor):
@handler
async def analyze_sentiment(self, text: str, ctx: WorkflowContext[dict]) -> None:
"""Analyze sentiment of the input text and forward result as dict."""
analysis = TextBlob(text)
result = {
"text": text,
"polarity": analysis.sentiment.polarity,
"subjectivity": analysis.sentiment.subjectivity
}
await ctx.send_message(result)
Creating an Executor Using a Function
You can also create an executor directly from a function using the @executor decorator:
from agent_framework import WorkflowContext, executor
from textblob import TextBlob
@executor(id="sentiment_executor")
async def analyze_sentiment(text: str, ctx: WorkflowContext[dict]) -> None:
"""
Analyze the sentiment of the input text and forward a result dictionary.
"""
analysis = TextBlob(text)
result = {
"text": text,
"polarity": analysis.sentiment.polarity,
"subjectivity": analysis.sentiment.subjectivity
}
await ctx.send_message(result)
🧠 Handling Multiple Input Types
Executors can have multiple handlers to process different types of messages:
from agent_framework import Executor, WorkflowContext, handler
class RealWorldExecutor(Executor):
@handler
async def analyze_sentiment(self, text: str, ctx: WorkflowContext[dict]) -> None:
"""Analyze sentiment of input text."""
@handler
async def translate_to_french(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Translate text to French."""
@handler
async def summarize_text(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Summarize text."""
@handler
async def extract_entities(self, text: str, ctx: WorkflowContext[dict]) -> None:
"""Extract named entities."""
@handler
async def detect_spam(self, message: str, ctx: WorkflowContext[bool]) -> None:
"""Detect spam messages."""
The WorkflowContext Object
WorkflowContext allows handlers to interact with the workflow during execution. It defines what type of messages a handler emits and what outputs it can produce.
send_message – sends data to downstream executors
yield_output – sends output back to the workflow caller
Example:
from agent_framework import WorkflowContext
class SomeHandler(Executor):
@handler
async def some_handler(message: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message("Updated the all records")
Or, to yield an output directly:
from agent_framework import WorkflowContext
class SomeHandler(Executor):
@handler
async def some_handler(message: str, ctx: WorkflowContext[None, str]) -> None:
await ctx.yield_output("I am done ...")
If a handler doesn’t send or yield messages, no type parameter is required:
from agent_framework import WorkflowContext
class SomeHandler(Executor):
@handler
async def some_handler(message: str, ctx: WorkflowContext) -> None:
print("doing or processing something...")
🔗 Edges — Connecting Executors Together
Edges define how executors are connected within a workflow.
They determine how messages flow from one executor to another optionally under specific conditions. In other words, edges define the data flow or process flow path between the building blocks of your workflow.
The Microsoft Agent Framework supports four main types of edges:
Direct Edges – Simple one-to-one connections
Conditional Edges – Connections based on a condition
Fan-out Edges – One executor sending messages to multiple targets
Fan-in Edges – Multiple executors sending messages to a single target
1️⃣ ## Direct Edges
The simplest connection between two executors — like connecting an email parser to a classifier.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder()
builder.add_edge(email_parser, spam_classifier)
builder.set_start_executor(email_parser)
workflow = builder.build()
2️⃣ Conditional Edges
Edges that activate only when certain conditions are met for example, sending flagged transactions to a fraud detector.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder()
builder.add_edge(transaction_checker, fraud_detector,
condition=lambda result: result["amount"] > 5000)
builder.add_edge(transaction_checker, normal_flow_handler,
condition=lambda result: result["amount"] <= 5000)
builder.set_start_executor(transaction_checker)
workflow = builder.build()
3️⃣ Switch-Case Edges
Route messages to different executors based on complex conditions similar to switch or if-else logic.
from agent_framework import Case, Default, WorkflowBuilder
builder = WorkflowBuilder()
builder.set_start_executor(priority_router)
builder.add_switch_case_edge_group(
priority_router,
[
Case(condition=lambda msg: msg.level == "LOW", target=low_priority_executor),
Case(condition=lambda msg: msg.level == "MEDIUM", target=medium_priority_executor),
Default(target=high_priority_executor),
],
)
workflow = builder.build()
4️⃣ Fan-out Edges
Send messages from one executor to multiple downstream executors for example, broadcasting notifications.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder()
builder.set_start_executor(notification_sender)
builder.add_fan_out_edges(notification_sender, [sms_executor, email_executor, push_executor])
workflow = builder.build()
You can even add logic to selectively send messages:
builder.add_fan_out_edges(
notification_sender,
[sms_executor, email_executor, push_executor],
selection_func=lambda msg, _: (
[0] if msg.channel == "SMS"
else [1, 2] if msg.channel == "EMAIL"
else list(range(3))
)
)
5️⃣ Fan-in Edges
Collect messages from multiple executors into a single one — for example, merging outputs from multiple scrapers into one aggregator.
builder.add_fan_in_edge([scraper_a, scraper_b, scraper_c], aggregator_executor)
Workflows
A workflow orchestrates a series of tasks, handling execution, message routing, and event streaming between different executors. Think of it as a project manager ensuring each step runs smoothly and in the right order.
Building Workflows
Here’s an example of a workflow for processing job applications:
from agent_framework import WorkflowBuilder
# Define executors for a resume screening workflow
resume_uploader = ResumeUploader() # Uploads resumes to the system
keyword_analyzer = KeywordAnalyzer() # Extracts and scores keywords
candidate_ranker = CandidateRanker() # Ranks candidates based on score
# Build the workflow
builder = WorkflowBuilder()
builder.set_start_executor(resume_uploader) # Start with uploading resumes
builder.add_edge(resume_uploader, keyword_analyzer) # Pass resumes for keyword analysis
builder.add_edge(keyword_analyzer, candidate_ranker) # Pass analyzed data for ranking
workflow = builder.build()
Workflow Execution
Workflows can run in streaming or non-streaming modes:
Streaming Mode – Receive updates as each step completes:
from agent_framework import WorkflowCompletedEvent
async for event in workflow.run_stream(new_resume_file):
if isinstance(event, WorkflowCompletedEvent):
print(f"Workflow completed. Final candidate ranking: {event.data}")
Non-Streaming Mode
Wait until the workflow finishes and get the final result:
events = await workflow.run(new_resume_file)
print(f"Final ranking: {events.get_completed_event()}")
Workflow Validation
- The framework ensures your workflow is correct before execution:
- Type Compatibility: Resumes, scores, and rankings must match expected formats
- Graph Connectivity: Each executor must be reachable from the starting point
- Executor Binding: All executors must be instantiated properly
- Edge Validation: Prevents duplicate or invalid connections
Execution Model
The workflow uses a Pregel-inspired execution model, processing messages in supersteps for parallel and efficient execution.
What is Pregel?
Pregel is a distributed computing model developed by Google, designed to efficiently process large scale graphs. It works by dividing computation into supersteps, where each node (or executor) in a graph performs its computation in parallel, exchanges messages with other nodes, and waits for the next superstep. This ensures a clear, predictable flow of data while taking advantage of parallelism.
Workflow Events
Workflows not only manage task execution but also emit events at key points, giving you visibility into what’s happening in real time. These events help monitor, debug, and understand the workflow’s progress.
Built-in Event Types
The framework provides several built-in events for different aspects of workflow execution:
Workflow Lifecycle Events
WorkflowStartedEvent – Triggered when a workflow begins execution
WorkflowOutputEvent – Triggered when the workflow produces a result
WorkflowErrorEvent – Triggered if the workflow encounters an error
Executor Events
ExecutorInvokeEvent – Triggered when an executor starts processing
ExecutorCompleteEvent – Triggered when an executor finishes processing
Request Events
RequestInfoEvent – Triggered when a request is issued
Observing Events
You can consume these events in streaming mode to see progress as it happens. For example, in a resume screening workflow:
from agent_framework import (
ExecutorInvokeEvent,
ExecutorCompleteEvent,
WorkflowOutputEvent,
WorkflowErrorEvent,
)
async for event in workflow.run_stream(resume_file):
match event:
case ExecutorInvokeEvent() as invoke:
print(f"Executor started: {invoke.executor_id}")
case ExecutorCompleteEvent() as complete:
print(f"Executor finished: {complete.executor_id} -> {complete.data}")
case WorkflowOutputEvent() as output:
print(f"Workflow output: {output.data}")
break
case WorkflowErrorEvent() as error:
print(f"Workflow error: {error.exception}")
break
Here, you can see exactly which executor is running, when it finishes, and what output or errors occur during execution.
Custom Events
You can also define your own events for more granular observability. For instance, logging every resume processed:
from agent_framework import handler, Executor, WorkflowContext, WorkflowEvent
# Define a custom event
class ResumeProcessedEvent(WorkflowEvent):
def __init__(self, candidate_name: str, score: float):
super().__init__(f"{candidate_name} scored {score}")
self.candidate_name = candidate_name
self.score = score
# Custom executor that emits this event
class RankingExecutor(Executor):
@handler
async def handle(self, candidate: dict, ctx: WorkflowContext[dict]) -> None:
score = candidate['score']
# Emit custom event for observability
await ctx.add_event(ResumeProcessedEvent(candidate['name'], score))
# Continue processing...
With custom events, you can track domain-specific milestones, like candidate scoring, document validation, or any other step meaningful to your workflow.
Conclusion
The Microsoft Agent Framework provides a powerful, flexible way to build intelligent, observable workflows. By combining executors to perform discrete tasks, edges to control message flow, and workflows to orchestrate execution, you can design systems that are both modular and scalable.
Events—both built-in and custom give you real-time visibility into your workflow, making it easier to monitor progress, debug issues, and gain insights into how data flows through your system. Whether you are building AI agents for resume screening, customer support, or other enterprise applications, this framework helps you structure your logic cleanly while keeping execution transparent and maintainable.
In essence, the Microsoft Agent Framework empowers developers to move beyond simple linear processes, enabling dynamic, parallel, and intelligent workflows that can adapt to complex, real-world scenarios.
In my next blog Part-II , I’ll show you how to build an AI agent workflow using the Microsoft Agent Framework to automate real business processes.
Thanks
Sreeni Ramadorai
Top comments (0)