Introduction to AI Agents
AI agents operate autonomously, making decisions and taking actions to achieve specific goals. Their most significant challenge is maintaining state across interruptions – when an agent’s task spans hours or days, any disruption can force it to restart from scratch, losing valuable progress.
The State Problem in Agent Design
State management is crucial for effective agents. Without it, any interruption resets all progress – a minor inconvenience for simple tasks but catastrophic for complex research.
A research agent that loses state midway through a task would forget:
- Sources already consulted
- Information gathered
- Conclusions drawn
- Remaining questions
- Its investigation strategy
This limitation fundamentally restricts what tasks we can reliably assign to AI agents.
Research Agents
Research tasks require methodical, iterative work that builds knowledge progressively. OpenAI’s agent framework provides tooling like web search, computer use, and file search, but used as is, it does not give any capabilities to maintain durable state. If your application crashes or your server restarts, the agent’s state vanishes.
This creates a significant challenge for research agents that need to maintain context over extended periods, especially when investigating complex topics that require multiple steps and substantial information gathering or involving human-in-the-loop interactions, which present a challenge. When a research agent needs to wait for human input – to verify a finding, provide additional context, or make a judgment call – that wait may extend for minutes, hours, or even days. Without proper state management, these collaborative workflows become impractical, as any disruption during the waiting period would reset the entire process and lose all accumulated context.
Choosing the Right Abstraction to Manage State
Research agents require persistence across system disruptions and human interaction delays. Without proper abstractions, the agent state vanishes when applications crash, or servers restart, forcing research to begin anew.
Consider structuring the code as Workflows and Activities:
- Workflows are deterministic programs that define the overall orchestration logic and maintain durable state. They automatically persist their execution context, allowing for seamless resumption after interruption through replay. This is the orchestrator part of the agent, which is “pure” as it does not directly cause side effects.
- Activities are non-deterministic operations interacting with external systems (databases, APIs, file systems, human inputs). They are executed by the workflow engine with automatic retry logic and failure handling. Their outputs must be serializable to persist and cache. This is the part that can cause side effects. Taking from the Research Agents use case, a somewhat naive structure can be written like so:
def research(topic: str) -> str:
"""Research a topic and return report.
This is the "workflow" part of the code.
"""
# Plan the research.
tasks: list[Task] = _plan(topic)
answers: list[str] = []
for task in plan:
# Execute the given task. This might be costly - so if we have any interruption
# between steps here, we don't want to repeat previously executed steps.
# As this is executed as an activity, if it is completed, on replay it will
# not be executed - we will just reuse the previous result.
result = _execute(task)
answers.append(result)
return _summarize(topic, answers)
@activity
def _plan(topic: str) -> list[Task]:
"""Generate tasks to perform in order to research a given topic"""
...
@activity
def _execute(task: Task) -> str:
"""Execute a specific research task"""
...
@activity
def _summarize(topic: str, answers: list[str]) -> str:
"""Summarize all the answers in a way that explain the topic"""
...
AutoKitteh (built on top of Temporal) dramatically simplifies this process.
AutoKitteh is an OSS serverless platform for durable workflows. It can take vanilla Python code and make it run durably over Temporal, taking advantage of the abstractions described above:
- AutoKitteh knows what should run as a workflow and what should run an activity. It knows how to parse the program automatically, intercept any non-deterministic functions, and run these as activities. You can read all about it at Hijacking Function Calls for Durability, and Hacking the Import System and Rewriting the AST For Durable Execution.
- It comes with “batteries included” – allowing the trigger of workflows from other systems using integrations such as Slack, JIRA, Linear, and others. It is serverless and allows users to “deploy with a click of a button.” Deployments usually take only a few seconds.
- It has a cute cat logo!
- Other things can be discussed in a different article. In this manner, a research agent can execute multi-stage investigations over extended periods, wait for human validation at critical decision points, and survive infrastructure disruptions – all while keeping its accumulated knowledge intact. The agent’s core logic remains focused on research methodology rather than state management, making the code more maintainable and the research process more reliable.
This is hard…
Let’s build an actual research workflow as described above. Traditionally, one would be a system with an explicit persistence layer, an asynchronous orchestration using queues, and an explicit persistence layer for the state. Building this way requires a relatively deep expertise in workflow systems and significant infrastructure work.
… but it doesn’t have to be!
AutoKitteh (built on top of Temporal) dramatically simplifies this process.
AutoKitteh is an OSS serverless platform for durable workflows. It can take vanilla Python code and make it run durably over Temporal, taking advantage of the abstractions described above:
- AutoKitteh knows what should run as a workflow and what should run an activity. It knows how to parse the program automatically, intercept any non-deterministic functions, and run these as activities. You can read all about it at Hijacking Function Calls for Durability, and Hacking the Import System and Rewriting the AST For Durable Execution.
- It comes with “batteries included” – allowing the trigger of workflows from other systems using integrations such as Slack, JIRA, Linear, and others.
- It is serverless and allows users to “deploy with a click of a button.” Deployments usually take only a few seconds.
- It has a cute cat logo!
- Other things can be discussed in a different article.
A Working Research Agent
Let’s go through a short demo of a working research agent. In this instance, we invoke the agent through the !r
command.
As you can see, we can converse with the agent to fine-tune the research plan. Additionally, in this case, we can customize some of the steps by limiting the time allowed to answer a question and the number of tokens allocated for a specific task. We then tell the agent that the plan looks good. This kicks off the next phase, which is the task execution phase.
The agent executed the tasks, including asking Itay according to the plan. When Itay answered, the final phase started.
Above we can see that a final report was composed, allowing us to modify it further to be more… factual.The captures above demonstrate a full implementation of a research agent connected to Slack using AutoKitteh. You can check it out (among other cool examples) at openai_agent_researcher. It is somewhat an extended version of what’s discussed above, so let’s review it part by part.
The Agents
The agents are defined in ai.py using openai-agents:
_plan_agent = Agent(
name="PlannerAgent",
instructions="""
You are a helpful research assistant. Given a query, come up with a set of tasks
to perform to best answer the query. Output between 3 and 10 tasks to perform.
A task can be either:
- A search task: search the web for a specific term and summarize the results.
- An ask someone task: ask a specific person a question and summarize the answer.
If a user explicitly specifies a time limit for a specific user, set it as such.
Do this only if the user explicitly specifies a person to ask.
For each task result, if applicable, default max tokens to None, unless user explicitly
specified otherwise. User cannot be allowed to specify max tokens below 16.
You can also modify an existing plan, by adding or removing searches.
Always provide the complete plan as output along with an indication if the user
considers it final.
Consider the plan as final only if the user explicitly specifies so.
""",
model="gpt-4o",
output_type=ResearchPlan,
)
_search_agent = Agent(
name="SearchAgent",
instructions="""
You are a research assistant. Given a search term, you search the web for that term and
produce a concise summary of the results. The summary must 2-3 paragraphs and less than
300 words. Capture the main points. Write succinctly, no need to have complete sentences
or good grammar. This will be consumed by someone synthesizing a report, so its vital
you capture the essence and ignore any fluff. Do not include any additional commentary
other than the summary itself.
""",
tools=[WebSearchTool()],
model_settings=ModelSettings(tool_choice="required"),
output_type=str,
)
_report_agent = Agent(
name="ReporterAgent",
instructions="""
Given a question and a set of search results, write a short summary of the findings.
Refine the report per user's feedback.
If the user wishes to send a slack report, use the appropriate tools to send the slack
report to the desired user.
""",
model="gpt-4o",
tools=[send_slack_report],
output_type=Report | str,
)
These agents are configured with:
- PlannerAgent: Creates research plans with detailed tasks. It uses GPT-4o and outputs structured data in the ResearchPlan format. This agent doesn’t have tools – it just plans what needs to be done.
-
SearchAgent: Performs web searches. It has access to the
WebSearchTool
and is configured to always use this tool. It returns plain string summaries of search results. - ReporterAgent: Synthesizes findings into reports. It has access to the send_slack_report tool and can return either a structured Report object or a string, depending on the conversation flow.
Managing Interactions
The planner agent and the reporter agent allow a user to chat with them, which means invoking the agents multiple times while preserving the chat context.
Let me explain the _chat and _run functions in ai.py, which are crucial for managing agent interactions:
The _run
Function
@activity
def _run(agent: str, history: list, q: str, rc: RunConfig) -> tuple[str, list]:
"""Run the agent with the given query and history."""
send("🤔")
while True:
try:
response = asyncio.run(
Runner.run(
agent,
history + [{"role": "user", "content": q}],
run_config=rc,
)
)
return response.final_output, response.to_input_list()
except RateLimitError as e:
# In case of a rate limit error, retry after waiting for 5 seconds.
send(f"Rate limit error: {e}\n\nWaiting 5 seconds and retrying...")
sleep(5)
This function handles a single invocation of a given agent. The function takes four parameters:
-
agent
: The agent to run (planner, searcher, or reporter) -
history
: The conversation history so far -
q
: The current query/message -
rc
: Run configuration for the agent
It then invokes the agent and returns its final output and the “input list,” which is used as history in the next iteration.
Normally, AutoKitteh would automatically run Runner.run as an activity. Still, there is a catch here – the Runner.run result is not serializable since it returns a function as one of the fields in its result. Therefore, we use an “escape hatch” here – we decorate the entire function with @activity, which is supplied by the AutoKitteh Python SDK, and return from the activity only what we care about, which is serializable.
Another cool thing is handling OpenAI’s rate limit errors: we simply catch the error, sleep, and then retry. In the traditional event-driven way, we would have needed to persist in the current state, set a timer, and all that mess.
The _chat
Function
def _chat(agent, is_final, q: str):
"""Chat with the agent until the response is final.
An interaction using this function can span some back and forth
between the user and the agent.
Args:
agent: The agent to chat with.
is_final: A function to check if the response is final.
q: The initial query.
"""
history = []
response = None
while not (response and is_final(response)):
if not q:
q = next_input()
response, history = _run(agent, history, q, RunConfig())
send(response)
q = None
return response
This function handles multi-turn conversations, utilizing the _run function, where the agent and user might go back and forth several times before reaching a final result. For example, with the planning agent, the user might refine the plan several times before marking it as final.
The combination of these functions with AutoKitteh’s durability features means that:
- If the system crashes during an agent run, it will restart from where it left off
- If the system crashes while waiting for user input, it will continue waiting once restarted
- The entire conversation history is preserved, even across system failures
- Rate limits and temporary errors are handled gracefully
These functions demonstrate how AutoKitteh makes it simple to build durable, stateful workflows that can handle long-running, multi-step processes with human-in-the-loop interactions.
Driving the Agents
Using the helpers described above, driving the agents becomes much simpler:
def plan(q: str) -> ResearchPlan:
"""Plan agent driver."""
return _chat(_plan_agent, lambda x: x.is_final, q)
def search(q: SearchResearchItem) -> str:
"""Search agent driver."""
return _run(
_search_agent,
[],
q.query,
RunConfig(
model_settings=ModelSettings(
tool_choice="required",
max_tokens=q.max_tokens,
),
),
)[0]
def report(q: str, tasks: dict[str, str]):
"""Report agent driver."""
q = f"Question: {q}\n\n\nTasks results: \n"
for key, value in tasks.items():
q += f"- {key}: {value}\n\n"
return _chat(_report_agent, lambda _: False, q)
Tying Everything Together – The Workflow Function
The workflow function in workflow.py
is the main orchestration function that ties together the entire research process. It’s well-structured and follows a clear three-phase approach. Let’s analyze it:
def workflow(q: str):
"""Run the entire interaction with the user.
There are three phases:
1. Plan the search.
2. Execute the search.
3. Report the results.
"""
# Plan the search.
search_plan = ai.plan(q)
# Iterate over all tasks in the plan and execute them.
slack.send("Now I will execute on the plan.\n")
tasks: dict[str, data.ResearchItem] = {}
for t in search_plan.tasks:
match type(t):
case data.SearchResearchItem:
slack.send(f"🔍 Searching for: {t.query}...")
tasks[f"Search query result for {t.query}"] = ai.search(t)
case data.AskSomeoneResearchItem:
slack.send(f"💬 Asking {t.who} the question: {t.question}...")
who, answer = slack.ask(t.question, t.who, t.wait_time_in_seconds)
if who:
if not answer:
slack.send(f"{who['real_name']} did not answer the question.")
answer = "No answer"
tasks[f"According to the user {who['real_name']}"] = answer
else:
tasks[f"According to the user {t.who}"] = (
f"could not figure out which user is {t.who}"
)
# Summarize and report the results.
slack.send("All tasks complete, summarizing results...")
ai.report(search_plan.question, tasks)
Here’s what makes this function powerful:
Phase 1: Planning
search_plan = ai.plan(q)
- Calls the planning agent to generate a structured research plan
- This could involve multiple back-and-forth interactions with the user to refine the plan
- Behind the scenes, AutoKitteh preserves the state during this potentially lengthy interaction
Phase 2: Execution
for t in search_plan.tasks:
match type(t):
case data.SearchResearchItem:
# Execute search tasks
case data.AskSomeoneResearchItem:
# Execute ask-someone tasks
This phase showcases durability strengths:
- Iterative Execution: It processes each task in the research plan one by one
- Long-running Human Interaction: For
AskSomeoneResearchItem
tasks, it can wait for human responses, potentially for long periods Progress Tracking: Stores all results in the tasks dictionary, which persists across system restarts
The slack.ask()
function is particularly important – it sends questions to specific users and waits for their response, which could take minutes, hours, or days. Thanks to durability, the workflow can wait indefinitely for these responses without losing state.
Phase 3: Reporting
ai.report(search_plan.question, tasks)
- Takes all the gathered information and generates a comprehensive report
- May involve additional user interaction to refine the report
What makes this Durable
Durability ensures that:
- If the system crashes during planning, it can resume the planning conversation
- If it crashes during a search, it doesn’t need to redo previous searches
- If it crashes while waiting for a human response, it continues waiting
- If it crashes during report generation, it doesn’t lose the collected data
The workflow function doesn’t need to implement any of this durability logic itself – AutoKitteh handles it automatically. This is why the code is so clean and focused on business logic rather than error handling and state management.
This is an example of how workflow-based programming simplifies complex, long-running processes. The developer can focus on the “happy path” while the underlying system handles failures, retries, and state persistence.
Try it Yourself!
Start a project in the free AutoKitteh cloud: Start from Template, then just initialize the connections and deploy.
Top comments (0)