DEV Community

Cover image for Building Concurrent User Search Workflows with Microsoft Agent Framework and AI Agents Part -III
Seenivasa Ramadurai
Seenivasa Ramadurai

Posted on

Building Concurrent User Search Workflows with Microsoft Agent Framework and AI Agents Part -III

Introduction

Harnessing Fan-In and Fan-Out Patterns to Build Agentic AI Workflows

In this third part of our Microsoft Agent Framework series, we’ll build a practical workflow-based AI Agent system that performs parallel web searches across platforms like LinkedIn, Facebook, and Twitter and then aggregates the results into a JSON summary.

This hands-on example demonstrates how to combine AI Agents and Workflows using Microsoft’s Agent Framework, showcasing how you can design concurrent, intelligent, and structured Agentic AI applications.

What We’ll Build

We’ll develop a small but powerful workflow where multiple AI agents work together, concurrently fetching and formatting information.

Step-by-step Overview:

Input Phase:

The workflow begins by taking a user or person’s name as input this name will be passed along the workflow for multi-source searching.

Parallel Execution (Fan-Out Pattern):

The workflow triggers two parallel executors (AI agents): One searches LinkedIn. The other searches social platforms like Facebook and Twitter. This Fan-Out pattern enables both agents to run simultaneously, speeding up processing and improving efficiency.

Aggregation and Summarization (Fan-In Pattern):

Once both searches complete, the workflow aggregates their results and summarizes them into a unified JSON output. To achieve this, we’ll use the OpenAI API (or any LLM endpoint) to format and structure the results, turning unstructured search data into a clean, usable response.

Why This Matters

By combining Fan-In/Fan-Out patterns with AI Agent workflows, the Microsoft Agent Framework enables developers to:

  1. Orchestrate parallel AI processes seamlessly.
  2. Manage complex multi-agent interactions efficiently.
  3. Integrate LLMs and external APIs for intelligent data enrichment.
  4. This approach reflects how real-world Agentic AI systems operate delegating, reasoning, and collaborating to deliver accurate and structured insights faster than ever.

Workflow Design Patterns

Our workflow implements two key Microsoft Agent Framework patterns:

Fan-Out Pattern: Single input distributed to multiple executors
Fan-In Pattern: Results collected from multiple sources and aggregated

Technology Stack

Microsoft Agent Framework: Concurrent workflow orchestration
Tavily API: Multi-platform web search
OpenAI GPT-4: Intelligent result analysis and formatting
Python: Implementation language with async/await support
Implementation

Implementation

Step 1: Dispatcher Executor (Fan-Out Source)

class UserSearchDispatcher(Executor):
    """
    Dispatcher executor that initiates the concurrent user search workflow.
    Takes a user name and distributes it to multiple search agents.
    """

    @handler
    async def handle(self, user_name: str, ctx: WorkflowContext[str]):
        if not user_name or not user_name.strip():
            raise RuntimeError("User name must be a valid non-empty string.")

        print(f"πŸš€ Dispatcher: Starting search for user: {user_name}")
        await ctx.send_message(user_name.strip())
Enter fullscreen mode Exit fullscreen mode

Responsibilities:

  1. Validates input user name
  2. Distributes input to multiple search agents
  3. Initiates the fan-out pattern

Step 2: Concurrent Search Agents (Fan-Out Targets)

LinkedIn Search Agent

class LinkedInSearchAgent(Executor):
    """
    LinkedIn search agent that searches for professional information about the user.
    """

    @handler
    async def handle(self, user_name: str, ctx: WorkflowContext[dict]):
        print(f"πŸ” LinkedIn Agent: Searching for {user_name} on LinkedIn...")

        # Initialize Tavily client and search for LinkedIn profiles
        client = TavilyClient(api_key=api_key)
        search_query = f"{user_name} LinkedIn profile professional"
        response = client.search(query=search_query, search_depth="basic", max_results=5)

        # Format LinkedIn-specific results
        result = {
            'platform': 'LinkedIn',
            'user_name': user_name,
            'results': linkedin_results,
            'total_found': len(linkedin_results)
        }

        await ctx.send_message(result)
Enter fullscreen mode Exit fullscreen mode

Social Platform Search Agent


class SocialPlatformSearchAgent(Executor):
    """
    Social platform search agent that searches for social media presence.
    """

    @handler
    async def handle(self, user_name: str, ctx: WorkflowContext[dict]):
        print(f"πŸ“± Social Agent: Searching for {user_name} on social platforms...")

        # Search for social media presence across multiple platforms
        search_query = f"{user_name} social media Twitter Instagram Facebook"
        response = client.search(query=search_query, search_depth="basic", max_results=5)

        # Format social media results
        result = {
            'platform': 'Social Media',
            'user_name': user_name,
            'results': social_results,
            'total_found': len(social_results)
        }

        await ctx.send_message(result)
Enter fullscreen mode Exit fullscreen mode

Responsibilities:

  1. Search specific platforms concurrently
  2. Format platform-specific results
  3. Handle errors gracefully

Step 3: AI Formatter Agent (Fan-In Target)

class AIFormatterAgent(Executor):
    """
    AI formatter agent that uses OpenAI to format and enhance the search results.
    """

    @handler
    async def handle(self, search_results: list[dict], ctx: WorkflowContext[dict]):
        print(f"πŸ€– AI Formatter: Processing {len(search_results)} search results...")

        # Initialize OpenAI client
        client = AsyncOpenAI(api_key=openai_api_key)

        # Create AI prompt for analysis
        prompt = f"""
        Please analyze and format the following user search results into a comprehensive JSON report.

        Search Results: {json.dumps(search_results, indent=2)}

        Please provide a formatted response that includes:
        1. A professional summary of the person
        2. Key findings from each platform
        3. Overall assessment of their online presence
        4. Recommendations for further research
        """

        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "You are an expert researcher..."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.3
        )

        # Parse and format AI response
        result = formatted_result
        await ctx.send_message(result)
Enter fullscreen mode Exit fullscreen mode

Responsibilities:

  1. Receives results from multiple search agents (Fan-In)
  2. Uses OpenAI for intelligent analysis
  3. Formats results into structured JSON

Step 4: Aggregator Executor (Final Output)

class SearchResultsAggregator(Executor):
    """
    Aggregator executor that collects results from the search agents and AI formatter.
    """

    @handler
    async def handle(self, results: list[dict], ctx: WorkflowContext[Never, dict]):
        """
        Receive the results from the source executors and yield the final output.
        """
        print(f"πŸ“Š Aggregator: Collecting {len(results)} results...")

        # Separate results by type
        search_results = []
        ai_formatted_result = None

        for result in results:
            if 'platform' in result and result['platform'] in ['LinkedIn', 'Social Media']:
                search_results.append(result)
            elif 'user_summary' in result or 'ai_analysis' in result:
                ai_formatted_result = result

        # Create final aggregated result
        final_result = {
            'search_metadata': {
                'total_search_results': len(search_results),
                'ai_analysis_available': ai_formatted_result is not None,
                'timestamp': asyncio.get_event_loop().time()
            },
            'search_results': search_results,
            'ai_analysis': ai_formatted_result
        }

        await ctx.yield_output(final_result)
Enter fullscreen mode Exit fullscreen mode

Responsibilities:

  1. Collects all results from upstream executors
  2. Organizes results by type and source
  3. Yields final structured output
  4. Workflow Construction
  5. Building the Concurrent Workflow

Create the executors

dispatcher = UserSearchDispatcher(id="dispatcher")
linkedin_agent = LinkedInSearchAgent(id="linkedin_agent")
social_agent = SocialPlatformSearchAgent(id="social_agent")
ai_formatter = AIFormatterAgent(id="ai_formatter")
aggregator = SearchResultsAggregator(id="aggregator")

# Build the concurrent workflow with fan-out and fan-in patterns
workflow = (
    WorkflowBuilder()
    .set_start_executor(dispatcher)
    .add_fan_out_edges(dispatcher, [linkedin_agent, social_agent])
    .add_fan_in_edges([linkedin_agent, social_agent], ai_formatter)
    .add_edge(ai_formatter, aggregator)
    .build()
)
Enter fullscreen mode Exit fullscreen mode

Key Workflow Patterns

Fan-Out Edge: add_fan_out_edges(dispatcher, [linkedin_agent, social_agent])

Distributes single input to multiple executors
Enables parallel processing
Fan-In Edge: add_fan_in_edges([linkedin_agent, social_agent], ai_formatter)

Collects results from multiple source executors
Aggregates different result types
Sequential Edge: add_edge(ai_formatter, aggregator)

Connects AI formatter to final aggregator
Ensures proper data flow

Data Flow

The workflow demonstrates sophisticated data flow:

User Input β†’ Dispatcher β†’ [LinkedIn Agent, Social Agent] β†’ AI Formatter β†’ Aggregator β†’ Final Output

Execution Steps

1. Input Distribution: Dispatcher sends user name to both search agents
2. Concurrent Search: LinkedIn and Social agents search simultaneously
3. Result Collection: AI Formatter receives results from both agents
4. AI Analysis: OpenAI processes and enhances the results
5. Final Aggregation: Aggregator combines all results into structured output

Key Features

1. Concurrent Processing

The workflow demonstrates true concurrent processing:

  1. Multiple search agents work simultaneously
  2. Reduced overall execution time
  3. Scalable design for adding more agents

2. AI-Powered Analysis

OpenAI integration provides:

  1. Intelligent result analysis
  2. Professional summary generation
  3. Structured JSON formatting
  4. Key insights and recommendations

3. Multi-Platform Search

Comprehensive search across:

LinkedIn: Professional profiles and career information

Social Media: Twitter, Instagram, Facebook, GitHub presence
Extensible: Easy to add more platforms

4. Structured Output

Final results include:

  1. Search metadata and timestamps
  2. Platform-specific results
  3. AI analysis and insights
  4. Professional recommendations

Output:

Conclusion

The Concurrent User Search Workflow built with the Microsoft Agent Framework is a powerful example of how AI agents and workflow orchestration can come together to solve real-world problems efficiently.

By implementing fan-out and fan-in patterns, we’ve shown how to design a workflow that performs parallel searches across multiple platforms, gathers the results, and then uses AI-powered analysis to produce a clean, structured JSON output. This approach not only improves performance but also makes the workflow highly scalable and maintainable.

What makes this system special is its balance of engineering discipline and intelligence:

  1. It distributes workloads intelligently using concurrent agents.
  2. It leverages OpenAI to transform raw search data into meaningful insights.
  3. It maintains clarity and modularity, making it easy to extend with more agents or new APIs.
  4. In short, this workflow represents a practical blueprint for Agentic AI applications β€” systems where AI agents think, collaborate, and execute tasks in parallel under a well-defined workflow.

The combination of Microsoft Agent Framework, Tavily API, and OpenAI GPT models creates an end-to-end intelligent search engine capable of synthesizing data from multiple sources into professional, actionable summaries.

This foundation can easily evolve into more complex applications like recruitment intelligence tools, brand monitoring systems, or academic research assistants. In enterprise environments, similar workflows can power supply chain visibility platforms, order management assistants, customer support automation, inventory tracking, and vendor risk analysis all benefiting from concurrent data collection and AI-driven summarization.

As organizations continue to adopt AI workflows, this pattern fan-out for parallel intelligence, fan-in for unified insight will become a cornerstone for building scalable, agent-based enterprise systems that think, collaborate, and act intelligently across business domains.

Thanks
Sreeni Ramadorai

Top comments (0)