Production Patterns: State Management, Sessions, and Observability
Part 4 of 5 in the Azure AI Agent Service series
You've built your agent, added tools, and everything works beautifully in development. Then you deploy to production and reality hits: conversations get lost, errors cascade silently, costs spiral, and debugging becomes archaeology.
Production-ready agents need more than clever prompts and tool integrations. They need robust state management, proper observability, and defensive patterns that gracefully handle the chaos of real-world usage.
In this article, we'll cover the infrastructure patterns that separate weekend projects from production systems.
Understanding Threads and Sessions
Azure AI Agent Service uses threads as the fundamental unit of conversation state. A thread maintains the full message history, context, and any attached files for a conversation.
public class ConversationManager
{
private readonly AgentsClient _client;
private readonly IDistributedCache _cache;
private readonly ILogger<ConversationManager> _logger;
public ConversationManager(
AgentsClient client,
IDistributedCache cache,
ILogger<ConversationManager> logger)
{
_client = client;
_cache = cache;
_logger = logger;
}
public async Task<string> GetOrCreateThreadAsync(
string userId,
string conversationId,
CancellationToken ct = default)
{
var cacheKey = $"thread:{userId}:{conversationId}";
// Check cache first
var threadId = await _cache.GetStringAsync(cacheKey, ct);
if (!string.IsNullOrEmpty(threadId))
{
// Verify thread still exists
try
{
await _client.GetThreadAsync(threadId, ct);
return threadId;
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
_logger.LogWarning("Cached thread {ThreadId} no longer exists", threadId);
}
}
// Create new thread
var thread = await _client.CreateThreadAsync(ct);
threadId = thread.Value.Id;
// Cache with expiration matching your retention policy
await _cache.SetStringAsync(
cacheKey,
threadId,
new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(7)
},
ct);
_logger.LogInformation(
"Created new thread {ThreadId} for user {UserId}",
threadId, userId);
return threadId;
}
}
Thread Lifecycle Patterns
Threads aren't free—they consume storage and count toward your quotas. Design your lifecycle strategy early:
public class ThreadLifecycleService : BackgroundService
{
private readonly AgentsClient _client;
private readonly IThreadRepository _repository;
private readonly ILogger<ThreadLifecycleService> _logger;
private readonly TimeSpan _maxThreadAge = TimeSpan.FromDays(30);
private readonly TimeSpan _inactivityThreshold = TimeSpan.FromDays(7);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await CleanupStaleThreadsAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Thread cleanup failed");
}
await Task.Delay(TimeSpan.FromHours(6), stoppingToken);
}
}
private async Task CleanupStaleThreadsAsync(CancellationToken ct)
{
var staleThreads = await _repository.GetThreadsOlderThanAsync(
_inactivityThreshold, ct);
foreach (var thread in staleThreads)
{
try
{
// Archive conversation before deletion if needed
if (thread.MessageCount > 5)
{
await ArchiveThreadAsync(thread, ct);
}
await _client.DeleteThreadAsync(thread.ThreadId, ct);
await _repository.MarkDeletedAsync(thread.ThreadId, ct);
_logger.LogInformation(
"Cleaned up thread {ThreadId} (inactive {Days} days)",
thread.ThreadId,
(DateTime.UtcNow - thread.LastActivity).TotalDays);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to cleanup thread {ThreadId}", thread.ThreadId);
}
}
}
}
State Persistence Patterns
Agents often need to maintain state beyond the conversation—user preferences, accumulated context, or workflow progress. Here's a pattern that layers application state on top of thread state:
public class AgentSession
{
public string ThreadId { get; set; } = string.Empty;
public string UserId { get; set; } = string.Empty;
public Dictionary<string, object> Metadata { get; set; } = new();
public List<string> ActiveWorkflows { get; set; } = new();
public DateTimeOffset CreatedAt { get; set; }
public DateTimeOffset LastActivity { get; set; }
public int TotalTokensUsed { get; set; }
public decimal EstimatedCost { get; set; }
}
public class SessionStateManager
{
private readonly IDistributedCache _cache;
private readonly ISessionRepository _repository;
private readonly JsonSerializerOptions _jsonOptions;
public async Task<AgentSession> GetSessionAsync(
string sessionId,
CancellationToken ct = default)
{
// Try cache first
var cached = await _cache.GetStringAsync($"session:{sessionId}", ct);
if (!string.IsNullOrEmpty(cached))
{
return JsonSerializer.Deserialize<AgentSession>(cached, _jsonOptions)!;
}
// Fall back to persistent storage
var session = await _repository.GetAsync(sessionId, ct);
if (session != null)
{
await CacheSessionAsync(session, ct);
}
return session ?? throw new SessionNotFoundException(sessionId);
}
public async Task UpdateSessionAsync(
AgentSession session,
CancellationToken ct = default)
{
session.LastActivity = DateTimeOffset.UtcNow;
// Update cache immediately for fast reads
await CacheSessionAsync(session, ct);
// Persist asynchronously (eventual consistency is usually fine)
_ = _repository.UpsertAsync(session, ct);
}
public async Task<T?> GetMetadataAsync<T>(
string sessionId,
string key,
CancellationToken ct = default)
{
var session = await GetSessionAsync(sessionId, ct);
if (session.Metadata.TryGetValue(key, out var value))
{
if (value is JsonElement element)
{
return element.Deserialize<T>(_jsonOptions);
}
return (T)value;
}
return default;
}
private async Task CacheSessionAsync(AgentSession session, CancellationToken ct)
{
var json = JsonSerializer.Serialize(session, _jsonOptions);
await _cache.SetStringAsync(
$"session:{session.ThreadId}",
json,
new DistributedCacheEntryOptions
{
SlidingExpiration = TimeSpan.FromHours(2)
},
ct);
}
}
Handling Session Recovery
When things go wrong (and they will), you need graceful recovery:
public class ResilientAgentService
{
private readonly AgentsClient _client;
private readonly SessionStateManager _sessions;
private readonly ILogger<ResilientAgentService> _logger;
public async Task<AgentResponse> ProcessMessageAsync(
string sessionId,
string userMessage,
CancellationToken ct = default)
{
AgentSession session;
try
{
session = await _sessions.GetSessionAsync(sessionId, ct);
}
catch (SessionNotFoundException)
{
_logger.LogWarning("Session {SessionId} not found, creating new", sessionId);
session = await CreateNewSessionAsync(sessionId, ct);
}
// Verify thread is still valid
try
{
await _client.GetThreadAsync(session.ThreadId, ct);
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
_logger.LogWarning(
"Thread {ThreadId} for session {SessionId} not found, recreating",
session.ThreadId, sessionId);
session = await RecoverSessionAsync(session, ct);
}
return await ExecuteWithRetryAsync(session, userMessage, ct);
}
private async Task<AgentSession> RecoverSessionAsync(
AgentSession oldSession,
CancellationToken ct)
{
// Create new thread
var thread = await _client.CreateThreadAsync(ct);
// Preserve session metadata but reset thread
var newSession = new AgentSession
{
ThreadId = thread.Value.Id,
UserId = oldSession.UserId,
Metadata = oldSession.Metadata,
CreatedAt = DateTimeOffset.UtcNow,
LastActivity = DateTimeOffset.UtcNow
};
// Optionally inject context about the recovery
await _client.CreateMessageAsync(
newSession.ThreadId,
MessageRole.User,
"System note: This is a recovered session. Previous context may be limited.",
cancellationToken: ct);
await _sessions.UpdateSessionAsync(newSession, ct);
return newSession;
}
}
Observability with Azure Monitor
You can't fix what you can't see. Production agents need comprehensive observability:
public class ObservableAgentService
{
private readonly AgentsClient _client;
private readonly ILogger<ObservableAgentService> _logger;
private readonly TelemetryClient _telemetry;
private readonly Meter _meter;
private readonly Counter<long> _messageCounter;
private readonly Histogram<double> _responseLatency;
private readonly Counter<long> _tokenCounter;
private readonly Counter<long> _errorCounter;
public ObservableAgentService(
AgentsClient client,
ILogger<ObservableAgentService> logger,
TelemetryClient telemetry,
IMeterFactory meterFactory)
{
_client = client;
_logger = logger;
_telemetry = telemetry;
_meter = meterFactory.Create("AgentService");
_messageCounter = _meter.CreateCounter<long>(
"agent.messages.total",
description: "\"Total messages processed\");"
_responseLatency = _meter.CreateHistogram<double>(
"agent.response.duration",
unit: "ms",
description: "\"Agent response time in milliseconds\");"
_tokenCounter = _meter.CreateCounter<long>(
"agent.tokens.total",
description: "\"Total tokens consumed\");"
_errorCounter = _meter.CreateCounter<long>(
"agent.errors.total",
description: "\"Total errors encountered\");"
}
public async Task<AgentResponse> ProcessWithTelemetryAsync(
string agentId,
string threadId,
string message,
CancellationToken ct = default)
{
var stopwatch = Stopwatch.StartNew();
var operationId = Activity.Current?.Id ?? Guid.NewGuid().ToString();
using var operation = _telemetry.StartOperation<RequestTelemetry>(
"AgentInteraction");
operation.Telemetry.Properties["AgentId"] = agentId;
operation.Telemetry.Properties["ThreadId"] = threadId;
_logger.LogInformation(
"Processing message for agent {AgentId} on thread {ThreadId}. " +
"OperationId: {OperationId}",
agentId, threadId, operationId);
try
{
// Create message
await _client.CreateMessageAsync(
threadId,
MessageRole.User,
message,
cancellationToken: ct);
// Run agent
var run = await _client.CreateRunAsync(threadId, agentId, ct);
// Poll for completion with telemetry
var result = await PollRunWithTelemetryAsync(
threadId, run.Value.Id, ct);
stopwatch.Stop();
// Record metrics
_messageCounter.Add(1,
new KeyValuePair<string, object?>("agent_id", agentId),
new KeyValuePair<string, object?>("status", "success"));
_responseLatency.Record(stopwatch.ElapsedMilliseconds,
new KeyValuePair<string, object?>("agent_id", agentId));
if (result.Usage != null)
{
_tokenCounter.Add(result.Usage.TotalTokens,
new KeyValuePair<string, object?>("agent_id", agentId),
new KeyValuePair<string, object?>("type", "total"));
}
_logger.LogInformation(
"Completed agent interaction in {ElapsedMs}ms. " +
"Tokens: {Tokens}. OperationId: {OperationId}",
stopwatch.ElapsedMilliseconds,
result.Usage?.TotalTokens ?? 0,
operationId);
return result;
}
catch (Exception ex)
{
_errorCounter.Add(1,
new KeyValuePair<string, object?>("agent_id", agentId),
new KeyValuePair<string, object?>("error_type", ex.GetType().Name));
_telemetry.TrackException(ex, new Dictionary<string, string>
{
["AgentId"] = agentId,
["ThreadId"] = threadId,
["OperationId"] = operationId
});
_logger.LogError(ex,
"Agent interaction failed. AgentId: {AgentId}, " +
"ThreadId: {ThreadId}, OperationId: {OperationId}",
agentId, threadId, operationId);
throw;
}
}
}
Custom Metrics Dashboard
Set up Application Insights queries for your agent metrics:
// Agent Response Time Percentiles
customMetrics
| where name == "agent.response.duration"
| summarize
p50 = percentile(value, 50),
p95 = percentile(value, 95),
p99 = percentile(value, 99)
by bin(timestamp, 1h), tostring(customDimensions.agent_id)
| render timechart
// Token Consumption by Agent
customMetrics
| where name == "agent.tokens.total"
| summarize TotalTokens = sum(value)
by bin(timestamp, 1d), tostring(customDimensions.agent_id)
| render columnchart
// Error Rate
customMetrics
| where name == "agent.errors.total"
| summarize Errors = sum(value)
by bin(timestamp, 1h), tostring(customDimensions.error_type)
| render timechart
Error Handling and Retry Patterns
Azure AI Agent Service calls can fail for various reasons. Here's a robust retry strategy using Polly:
public static class AgentResiliencePolicy
{
public static IAsyncPolicy<T> CreatePolicy<T>(ILogger logger)
{
// Retry with exponential backoff for transient failures
var retryPolicy = Policy<T>
.Handle<RequestFailedException>(ex => IsTransient(ex))
.Or<HttpRequestException>()
.Or<TaskCanceledException>()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: attempt =>
TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetry: (outcome, timespan, attempt, context) =>
{
logger.LogWarning(
"Retry {Attempt} after {Delay}ms due to {Error}",
attempt, timespan.TotalMilliseconds,
outcome.Exception?.Message);
});
// Circuit breaker for sustained failures
var circuitBreaker = Policy<T>
.Handle<RequestFailedException>()
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromMinutes(1),
onBreak: (outcome, breakDelay) =>
{
logger.LogError(
"Circuit breaker opened for {Duration}ms",
breakDelay.TotalMilliseconds);
},
onReset: () => logger.LogInformation("Circuit breaker reset"));
// Timeout for hanging requests
var timeout = Policy.TimeoutAsync<T>(
TimeSpan.FromMinutes(5),
TimeoutStrategy.Pessimistic);
return Policy.WrapAsync(timeout, retryPolicy, circuitBreaker);
}
private static bool IsTransient(RequestFailedException ex)
{
return ex.Status switch
{
429 => true, // Rate limited
500 => true, // Internal server error
502 => true, // Bad gateway
503 => true, // Service unavailable
504 => true, // Gateway timeout
_ => false
};
}
}
// Usage in your service
public class ResilientAgentClient
{
private readonly AgentsClient _client;
private readonly IAsyncPolicy<ThreadRun> _runPolicy;
public async Task<ThreadRun> CreateRunWithResilienceAsync(
string threadId,
string agentId,
CancellationToken ct = default)
{
return await _runPolicy.ExecuteAsync(async () =>
{
var response = await _client.CreateRunAsync(threadId, agentId, ct);
return response.Value;
});
}
}
Handling Rate Limits Gracefully
public class RateLimitHandler
{
private readonly SemaphoreSlim _semaphore;
private readonly ILogger<RateLimitHandler> _logger;
private DateTime _retryAfter = DateTime.MinValue;
public RateLimitHandler(int maxConcurrency, ILogger<RateLimitHandler> logger)
{
_semaphore = new SemaphoreSlim(maxConcurrency);
_logger = logger;
}
public async Task<T> ExecuteAsync<T>(
Func<Task<T>> operation,
CancellationToken ct = default)
{
// Wait if we're in a rate limit cooldown
var waitTime = _retryAfter - DateTime.UtcNow;
if (waitTime > TimeSpan.Zero)
{
_logger.LogInformation(
"Rate limit active, waiting {WaitMs}ms",
waitTime.TotalMilliseconds);
await Task.Delay(waitTime, ct);
}
await _semaphore.WaitAsync(ct);
try
{
return await operation();
}
catch (RequestFailedException ex) when (ex.Status == 429)
{
// Parse retry-after header
var retryAfterSeconds = ParseRetryAfter(ex);
_retryAfter = DateTime.UtcNow.AddSeconds(retryAfterSeconds);
_logger.LogWarning(
"Rate limited. Retry after {Seconds} seconds",
retryAfterSeconds);
throw;
}
finally
{
_semaphore.Release();
}
}
private int ParseRetryAfter(RequestFailedException ex)
{
// Default to 60 seconds if header not present
// In production, parse the actual header from the response
return 60;
}
}
Cost Management and Token Tracking
Agents can get expensive fast. Build cost awareness into your system from day one:
public class CostTracker
{
private readonly IMetricsService _metrics;
private readonly ICostRepository _repository;
private readonly ILogger<CostTracker> _logger;
// Pricing as of 2024 - verify current rates!
private static readonly Dictionary<string, (decimal Input, decimal Output)> Pricing = new()
{
["gpt-4o"] = (0.005m / 1000, 0.015m / 1000),
["gpt-4o-mini"] = (0.00015m / 1000, 0.0006m / 1000),
["gpt-4-turbo"] = (0.01m / 1000, 0.03m / 1000)
};
public async Task<UsageReport> TrackUsageAsync(
string userId,
string agentId,
string model,
int inputTokens,
int outputTokens,
CancellationToken ct = default)
{
var (inputRate, outputRate) = Pricing.GetValueOrDefault(
model, (0.01m / 1000, 0.03m / 1000));
var cost = (inputTokens * inputRate) + (outputTokens * outputRate);
var usage = new UsageRecord
{
UserId = userId,
AgentId = agentId,
Model = model,
InputTokens = inputTokens,
OutputTokens = outputTokens,
EstimatedCost = cost,
Timestamp = DateTimeOffset.UtcNow
};
await _repository.RecordUsageAsync(usage, ct);
// Check if user is approaching limits
var dailyUsage = await _repository.GetDailyUsageAsync(userId, ct);
var monthlyUsage = await _repository.GetMonthlyUsageAsync(userId, ct);
if (dailyUsage.TotalCost > 10m) // $10 daily threshold
{
_logger.LogWarning(
"User {UserId} exceeded daily cost threshold: ${Cost:F2}",
userId, dailyUsage.TotalCost);
}
return new UsageReport
{
CurrentRequest = usage,
DailyTotal = dailyUsage,
MonthlyTotal = monthlyUsage
};
}
}
public class BudgetEnforcementMiddleware
{
private readonly ICostRepository _repository;
private readonly ILogger<BudgetEnforcementMiddleware> _logger;
public async Task<bool> CanProcessRequestAsync(
string userId,
CancellationToken ct = default)
{
var budget = await _repository.GetUserBudgetAsync(userId, ct);
var usage = await _repository.GetMonthlyUsageAsync(userId, ct);
if (usage.TotalCost >= budget.MonthlyLimit)
{
_logger.LogWarning(
"User {UserId} blocked - monthly budget exhausted. " +
"Used: ${Used:F2}, Limit: ${Limit:F2}",
userId, usage.TotalCost, budget.MonthlyLimit);
return false;
}
// Warn at 80% threshold
if (usage.TotalCost >= budget.MonthlyLimit * 0.8m)
{
_logger.LogInformation(
"User {UserId} approaching budget limit: " +
"${Used:F2} of ${Limit:F2}",
userId, usage.TotalCost, budget.MonthlyLimit);
}
return true;
}
}
Optimizing Token Usage
public class TokenOptimizer
{
private readonly int _maxContextTokens;
private readonly ITokenCounter _tokenCounter;
public TokenOptimizer(int maxContextTokens, ITokenCounter tokenCounter)
{
_maxContextTokens = maxContextTokens;
_tokenCounter = tokenCounter;
}
public async Task<string> OptimizeContextAsync(
IList<Message> messages,
string systemPrompt,
CancellationToken ct = default)
{
var systemTokens = await _tokenCounter.CountAsync(systemPrompt, ct);
var availableTokens = _maxContextTokens - systemTokens - 1000; // Reserve for response
var optimizedMessages = new List<Message>();
var currentTokens = 0;
// Always keep the most recent messages
foreach (var message in messages.Reverse())
{
var messageTokens = await _tokenCounter.CountAsync(message.Content, ct);
if (currentTokens + messageTokens > availableTokens)
{
// Summarize older messages instead of dropping them
var summary = await SummarizeOlderMessagesAsync(
messages.Take(messages.Count - optimizedMessages.Count).ToList(),
ct);
optimizedMessages.Insert(0, new Message
{
Role = "system",
Content = $"[Earlier conversation summary: {summary}]"
});
break;
}
optimizedMessages.Insert(0, message);
currentTokens += messageTokens;
}
return JsonSerializer.Serialize(optimizedMessages);
}
}
Putting It All Together
Here's a complete production-ready agent service:
public class ProductionAgentService
{
private readonly AgentsClient _client;
private readonly SessionStateManager _sessions;
private readonly CostTracker _costTracker;
private readonly ObservableAgentService _telemetry;
private readonly RateLimitHandler _rateLimiter;
private readonly BudgetEnforcementMiddleware _budgetEnforcer;
private readonly ILogger<ProductionAgentService> _logger;
public async Task<AgentResponse> ProcessAsync(
AgentRequest request,
CancellationToken ct = default)
{
// 1. Budget check
if (!await _budgetEnforcer.CanProcessRequestAsync(request.UserId, ct))
{
throw new BudgetExceededException(request.UserId);
}
// 2. Get or create session
var session = await _sessions.GetSessionAsync(request.SessionId, ct);
// 3. Execute with rate limiting and telemetry
var response = await _rateLimiter.ExecuteAsync(async () =>
{
return await _telemetry.ProcessWithTelemetryAsync(
request.AgentId,
session.ThreadId,
request.Message,
ct);
}, ct);
// 4. Track costs
if (response.Usage != null)
{
await _costTracker.TrackUsageAsync(
request.UserId,
request.AgentId,
response.Model,
response.Usage.PromptTokens,
response.Usage.CompletionTokens,
ct);
}
// 5. Update session state
session.TotalTokensUsed += response.Usage?.TotalTokens ?? 0;
await _sessions.UpdateSessionAsync(session, ct);
return response;
}
}
Key Takeaways
Threads are your conversation state - Cache IDs, verify existence, and clean up stale threads regularly
Layer application state - Use sessions to track metadata, costs, and workflow state on top of thread state
Instrument everything - Response times, token counts, error rates, and costs should all be observable
Build resilience in - Retry transient failures, respect rate limits, and implement circuit breakers
Track costs religiously - Token usage can surprise you; build budgets and alerts from day one
Plan for recovery - Sessions will get corrupted, threads will disappear—handle it gracefully
In the final part of this series, we'll cover testing strategies and deployment patterns for Azure AI Agent Service, including integration testing, load testing, and CI/CD pipelines.
Coming up next: Part 5 - "Testing and Deployment: CI/CD for AI Agents"
Found this helpful? Follow me for more Azure AI content, and drop a comment with your production war stories!
Top comments (0)