DEV Community

Cover image for Microsoft Agent Framework Building a Human-in-the-Loop User Search Workflow : Part-V
Seenivasa Ramadurai
Seenivasa Ramadurai

Posted on

Microsoft Agent Framework Building a Human-in-the-Loop User Search Workflow : Part-V

In this blog, we’ll explore how to build a Human-in-the-Loop (HITL) workflow using the Microsoft Agent FrameworkThis example demonstrates a user search workflow where AI and humans collaborate interactively the AI performs the initial search, while a human reviews, selects, and guides the process.

What is Human-in-the-Loop?

Human-in-the-Loop (HITL) is a powerful design approach where AI systems work hand-in-hand with humans to make critical decisions, verify results, and guide workflows.

Instead of relying solely on automated decisions, HITL ensures that human judgment and AI intelligence complement each other.

Here’s how it works in this example:

  1. AI performs an initial search to find potential users.
  2. Human selects the most relevant result.
  3. AI performs a detailed search on the selected user.
  4. Human reviews the final, formatted output.

This interaction loop ensures high-quality results with human oversight and AI efficiency ideal for use cases like recruitment, research, or business intelligence.

Key Components

Component Description
RequestInfoMessage - Defines typed request structures for human input.
RequestInfoExecutor - Coordinates between the workflow and human input requests.
RequestInfoEvent. - Handles asynchronous, event-driven responses from humans.
WorkflowBuilder - Constructs the workflow graph and connects executors.
Executor - Core processing units that handle messages and execute business logic.

By combining these, the workflow achieves type safety, pause-resume functionality, and full event traceability.

Request-Response Flow Explained

Let’s walk through how a Human-in-the-Loop flow operates in the Microsoft Agent Framework.

1. Workflow Initialization: The system starts by requesting a user’s name.
2. Request Generation: A RequestInfoExecutor emits a RequestInfoEvent asking for human input.
3. Workflow Pause: The process waits indefinitely until the human responds.
4. Human Response: The user provides input via send_responses_streaming().
5. Workflow Resumption: The system continues from the paused state using the response.
6. Detailed Search: AI executes a focused search for the selected profile.
7. Output Generation: The workflow yields formatted, detailed results.

This model allows interactive, event-driven collaboration between humans and agents.

Implementation Details

import asyncio
import os
from dotenv import load_dotenv
from agent_framework import (
    Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, 
    RequestInfoEvent, WorkflowStatusEvent, WorkflowRunState, handler,
    RequestInfoExecutor, RequestInfoMessage, RequestResponse
)
from typing_extensions import Never
import requests
from pydantic import BaseModel

# Load environment variables
load_dotenv()

# Data classes following Microsoft Agent Framework patterns exactly
class UserSearchRequest(RequestInfoMessage):
    """Request for user search input."""
    def __init__(self, prompt: str):
        super().__init__()
        self.prompt = prompt

class UserSelectionRequest(RequestInfoMessage):
    """Request for user selection from discovered users."""
    def __init__(self, prompt: str, users: list):
        super().__init__()
        self.prompt = prompt
        self.users = users

class SearchData(BaseModel):
    """Search data model."""
    user_name: str
    title_place: str
    search_type: str

class SearchResults(BaseModel):
    """Search results model."""
    summary: str
    users: list

class TurnManager(Executor):
    """Coordinates the workflow between AI search and human input following Microsoft Agent Framework."""

    def __init__(self, id: str = None):
        super().__init__(id=id or "turn_manager")
        self.current_search_data = None

    @handler
    async def start(self, _: str, ctx: WorkflowContext[UserSearchRequest]) -> None:
        """Start the workflow by requesting user name."""
        print("🔍 User Search Workflow - Human in the Loop")
        print("=" * 60)
        print("-" * 60)

        prompt = "Enter the name of the person to search for:"
        await ctx.send_message(UserSearchRequest(prompt=prompt))

    @handler
    async def on_user_name_response(self, user_name: RequestResponse[UserSearchRequest, str], ctx: WorkflowContext[SearchData]) -> None:
        """Handle user name input and start search."""
        name = user_name.data.strip() if user_name.data else ""
        if not name:
            await ctx.yield_output("❌ No name provided. Exiting.")
            return

        print(f"\n✅ User name: {name}")

        # Get title/place and search type from user input
        print("\n🏢 Additional information (optional but recommended for better results):")
        title_place = input("👤 Enter title/designation and place/city (e.g., 'Software Engineer, San Francisco'): ").strip()

        print("\n🔍 Search type options:")
        print("1. professional - LinkedIn, executive profiles (DEFAULT)")
        print("2. social - Twitter, Instagram, Facebook")
        print("3. comprehensive - All sources")

        search_type = input("👤 Enter search type (professional/social/comprehensive) [default: professional]: ").strip().lower()
        if not search_type or search_type not in ["professional", "social", "comprehensive"]:
            search_type = "professional"
            print(f"🔄 Using default search type: {search_type} (LinkedIn focus)")

        print(f"✅ Title/Place: {title_place}")
        print(f"✅ Search type: {search_type}")

        # Store search data for later use
        self.current_search_data = SearchData(
            user_name=name,
            title_place=title_place,
            search_type=search_type
        )

        await ctx.send_message(self.current_search_data)

    @handler
    async def on_search_results(self, search_results: SearchResults, ctx: WorkflowContext[UserSelectionRequest]) -> None:
        """Handle search completion and request user selection."""
        print(f"\n🔍 Search completed: {search_results.summary}")

        users = search_results.users
        print(f"👥 Found {len(users)} potential matches:")
        print("=" * 70)

        for i, user in enumerate(users, 1):
            print(f"\n{i}. 👤 {user['name']}")
            print(f"   📋 {user['title']}")
            print(f"   🔗 {user['url']}")
            print(f"   📝 {user['content']}")
            print(f"   ⭐ Score: {user['score']}")
            print("-" * 50)

        # Create user selection request
        prompt = f"Which user would you like to search for in detail? Enter a number (1-{len(users)}) or 'all' to search all users:"
        user_selection_request = UserSelectionRequest(prompt=prompt, users=users)

        await ctx.send_message(user_selection_request)

    @handler
    async def on_user_selection_response(self, selection_data: RequestResponse[UserSelectionRequest, str], ctx: WorkflowContext[WorkflowOutputEvent]) -> None:
        """Handle user selection and perform detailed search with formatted output."""
        user_input = selection_data.data.strip().lower() if selection_data.data else ""
        users = selection_data.original_request.users

        print(f"\n🔍 Processing selection: {user_input}")

        if user_input == "all":
            # Search all users
            print("🔄 Searching for all users...")
            selected_users = users
        else:
            try:
                selection = int(user_input)
                if 1 <= selection <= len(users):
                    selected_user = users[selection - 1]
                    print(f"✅ Selected: {selected_user['name']}")
                    selected_users = [selected_user]
                else:
                    await ctx.yield_output(f"❌ Invalid selection: {user_input}")
                    return
            except ValueError:
                await ctx.yield_output(f"❌ Invalid input: {user_input}")
                return

        # Perform detailed search for selected users
        for user in selected_users:
            await self._perform_detailed_search(user)

        result_text = f"✅ Detailed search completed for {len(selected_users)} user(s)."
        await ctx.yield_output(result_text)

    async def _perform_detailed_search(self, selected_user):
        """Perform detailed search for the selected user and display formatted results."""
        print(f"\n🔍 Performing detailed search for: {selected_user['name']}")

        try:
            api_key = os.getenv("TAVILY_API_KEY")
            if not api_key:
                raise ValueError("TAVILY_API_KEY not found")

            # Create detailed search query focused on the selected user
            user_name = selected_user['name']
            search_type = self.current_search_data.search_type if self.current_search_data else 'professional'

            if search_type == "professional":
                search_query = f'"{user_name}" site:linkedin.com'
            elif search_type == "social":
                search_query = f'"{user_name}" site:twitter.com OR site:instagram.com OR site:facebook.com'
            else:  # comprehensive
                search_query = f'"{user_name}"'

            print(f"🔍 Detailed search query: {search_query}")

            # Make API request
            response = requests.post(
                "https://api.tavily.com/search",
                json={
                    "api_key": api_key,
                    "query": search_query,
                    "search_depth": "basic",
                    "include_answer": False,
                    "max_results": 5
                },
                timeout=10
            )
            response.raise_for_status()

            data = response.json()
            results = data.get("results", [])

            # Display detailed results with formatting
            print(f"\n📋 Detailed Search Results for {user_name}")
            print("=" * 70)
            print(f"📊 Found {len(results)} detailed results")
            print("-" * 70)

            if results:
                for i, result in enumerate(results, 1):
                    title = result.get('title', 'No title')
                    url = result.get('url', 'No URL')
                    content = result.get('content', 'No content')
                    score = result.get('score', 'N/A')

                    # Clean up content
                    content = content.replace('\n', ' ').strip()
                    if len(content) > 250:
                        content = content[:250] + "..."

                    print(f"\n{i}. {title}")
                    print(f"   🔗 {url}")
                    print(f"   📝 {content}")
                    print(f"   ⭐ Score: {score}")
                    print("-" * 50)
            else:
                print("❌ No detailed results found")

            print("\n" + "=" * 70)

        except Exception as e:
            print(f"❌ Error performing detailed search: {e}")

class UserSearchExecutor(Executor):
    """Performs web search for a user using Tavily API."""

    def __init__(self, id: str = None):
        super().__init__(id=id or "user_search")

    @handler
    async def handle(self, search_data: SearchData, ctx: WorkflowContext[SearchResults]) -> None:
        """Handle user search request."""
        print(f"🔍 Searching for: {search_data.user_name}")
        if search_data.title_place:
            print(f"🏢 Title/Place: {search_data.title_place}")
        print(f"📊 Search type: {search_data.search_type}")

        try:
            api_key = os.getenv("TAVILY_API_KEY")
            if not api_key:
                raise ValueError("TAVILY_API_KEY not found in environment variables")

            # Create search query using name and title/place
            if search_data.title_place:
                if search_data.search_type == "professional":
                    search_query = f'"{search_data.user_name}" "{search_data.title_place}" site:linkedin.com'
                elif search_data.search_type == "social":
                    search_query = f'"{search_data.user_name}" "{search_data.title_place}" site:twitter.com OR site:instagram.com OR site:facebook.com'
                else:  # comprehensive
                    search_query = f'"{search_data.user_name}" "{search_data.title_place}"'
            else:
                if search_data.search_type == "professional":
                    search_query = f'"{search_data.user_name}" site:linkedin.com'
                elif search_data.search_type == "social":
                    search_query = f'"{search_data.user_name}" site:twitter.com OR site:instagram.com OR site:facebook.com'
                else:  # comprehensive
                    search_query = f'"{search_data.user_name}"'

            print(f"🔍 Search query: {search_query}")

            # Make API request
            response = requests.post(
                "https://api.tavily.com/search",
                json={
                    "api_key": api_key,
                    "query": search_query,
                    "search_depth": "basic",
                    "include_answer": False,
                    "max_results": 10
                },
                timeout=10
            )
            response.raise_for_status()

            data = response.json()
            results = data.get("results", [])

            # Process results and extract users
            users = []
            for result in results:
                users.append({
                    'name': search_data.user_name,
                    'title': result.get('title', 'No title'),
                    'url': result.get('url', 'No URL'),
                    'content': result.get('content', 'No content')[:200],
                    'score': result.get('score', 'N/A')
                })

            # Create summary
            summary = f"Found {len(results)} results for {search_data.user_name}"
            print(f"{summary}")

            # Create search results
            search_results = SearchResults(
                summary=summary,
                users=users
            )
            await ctx.send_message(search_results)

        except Exception as e:
            print(f"❌ Error performing search: {e}")
            error_msg = f"Error searching for {search_data.user_name}: {str(e)}"
            search_results = SearchResults(
                summary=error_msg,
                users=[]
            )
            await ctx.send_message(search_results)

async def run_interactive_workflow(workflow):
    """Run the workflow with proper RequestInfoEvent handling event driven architecture"""
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None

    print("🎯 User Search Game")
    print("I'll help you search for users and let you select which one to explore!")
    print("-" * 50)

    while not completed:
        # First iteration uses run_stream("start")
        # Subsequent iterations use send_responses_streaming with pending responses
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("start")
        )

        # Collect events for this turn
        events = [event async for event in stream]
        pending_responses = None

        # Process events to collect requests and detect completion
        requests: list[tuple[str, str]] = []  # (request_id, prompt)
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Handle different types of requests
                if hasattr(event.data, 'prompt'):
                    requests.append((event.request_id, event.data.prompt))
            elif isinstance(event, WorkflowOutputEvent):
                # Capture workflow output when yielded
                workflow_output = str(event.data)
                completed = True

        # Check workflow status
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )

        if pending_status:
            print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
        if idle_with_requests:
            print("⏸️  State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")

        # Handle human requests if any
        if requests and not completed:
            responses: dict[str, str] = {}
            for req_id, prompt in requests:
                print(f"\n🤖 {prompt}")
                answer = input("👤 Enter your response: ").strip()

                if answer.lower() == "exit":
                    print("👋 Exiting...")
                    return
                responses[req_id] = answer
            pending_responses = responses

    # Show final result
    if workflow_output:
        print(f"\n🎉 {workflow_output}")

async def main():
    print("=" * 60)
    print("🤖 MICROSOFT AGENT FRAMEWORK WITH DETAILED OUTPUT")
    print("=" * 60)
    print("=" * 60)

    # Create workflow components
    turn_manager = TurnManager(id="turn_manager")
    user_search = UserSearchExecutor(id="user_search")
    request_info_executor = RequestInfoExecutor(id="request_info")

    # Build the workflow graph following Microsoft Agent Framework pattern exactly
    workflow = (
        WorkflowBuilder()
        .set_start_executor(turn_manager)
        .add_edge(turn_manager, request_info_executor)  # Request user input
        .add_edge(request_info_executor, turn_manager)  # User input back to turn manager
        .add_edge(turn_manager, user_search)  # Start search
        .add_edge(user_search, turn_manager)  # Search results back to turn manager
        .build()
    )

    # Execute the interactive workflow
    await run_interactive_workflow(workflow)

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Output

Conclusion

This Human-in-the-Loop User Search workflow demonstrates how AI agents and humans can work together using Microsoft’s official Agent Framework design patterns.

By leveraging:

RequestInfoMessage / RequestInfoEvent

Typed, event-driven workflows

State preservation with pause-resume

AI-human collaboration

You can build enterprise-grade, compliant, and intelligent workflows that balance automation and human reasoning ideal for next-generation agentic AI applications.

Thanks
Sreeni Ramadorai

Top comments (0)