DEV Community

Cover image for Azure AI Agent Service Part 4: Production Patterns—State Management, Sessions, and Observability
Brian Spann
Brian Spann

Posted on

Azure AI Agent Service Part 4: Production Patterns—State Management, Sessions, and Observability

Production Patterns: State Management, Sessions, and Observability

Part 4 of 5 in the Azure AI Agent Service series


You've built your agent, added tools, and everything works beautifully in development. Then you deploy to production and reality hits: conversations get lost, errors cascade silently, costs spiral, and debugging becomes archaeology.

Production-ready agents need more than clever prompts and tool integrations. They need robust state management, proper observability, and defensive patterns that gracefully handle the chaos of real-world usage.

In this article, we'll cover the infrastructure patterns that separate weekend projects from production systems.

Understanding Threads and Sessions

Azure AI Agent Service uses threads as the fundamental unit of conversation state. A thread maintains the full message history, context, and any attached files for a conversation.

public class ConversationManager
{
    private readonly AgentsClient _client;
    private readonly IDistributedCache _cache;
    private readonly ILogger<ConversationManager> _logger;

    public ConversationManager(
        AgentsClient client,
        IDistributedCache cache,
        ILogger<ConversationManager> logger)
    {
        _client = client;
        _cache = cache;
        _logger = logger;
    }

    public async Task<string> GetOrCreateThreadAsync(
        string userId,
        string conversationId,
        CancellationToken ct = default)
    {
        var cacheKey = $"thread:{userId}:{conversationId}";

        // Check cache first
        var threadId = await _cache.GetStringAsync(cacheKey, ct);
        if (!string.IsNullOrEmpty(threadId))
        {
            // Verify thread still exists
            try
            {
                await _client.GetThreadAsync(threadId, ct);
                return threadId;
            }
            catch (RequestFailedException ex) when (ex.Status == 404)
            {
                _logger.LogWarning("Cached thread {ThreadId} no longer exists", threadId);
            }
        }

        // Create new thread
        var thread = await _client.CreateThreadAsync(ct);
        threadId = thread.Value.Id;

        // Cache with expiration matching your retention policy
        await _cache.SetStringAsync(
            cacheKey,
            threadId,
            new DistributedCacheEntryOptions
            {
                AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(7)
            },
            ct);

        _logger.LogInformation(
            "Created new thread {ThreadId} for user {UserId}",
            threadId, userId);

        return threadId;
    }
}
Enter fullscreen mode Exit fullscreen mode

Thread Lifecycle Patterns

Threads aren't free—they consume storage and count toward your quotas. Design your lifecycle strategy early:

public class ThreadLifecycleService : BackgroundService
{
    private readonly AgentsClient _client;
    private readonly IThreadRepository _repository;
    private readonly ILogger<ThreadLifecycleService> _logger;
    private readonly TimeSpan _maxThreadAge = TimeSpan.FromDays(30);
    private readonly TimeSpan _inactivityThreshold = TimeSpan.FromDays(7);

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await CleanupStaleThreadsAsync(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Thread cleanup failed");
            }

            await Task.Delay(TimeSpan.FromHours(6), stoppingToken);
        }
    }

    private async Task CleanupStaleThreadsAsync(CancellationToken ct)
    {
        var staleThreads = await _repository.GetThreadsOlderThanAsync(
            _inactivityThreshold, ct);

        foreach (var thread in staleThreads)
        {
            try
            {
                // Archive conversation before deletion if needed
                if (thread.MessageCount > 5)
                {
                    await ArchiveThreadAsync(thread, ct);
                }

                await _client.DeleteThreadAsync(thread.ThreadId, ct);
                await _repository.MarkDeletedAsync(thread.ThreadId, ct);

                _logger.LogInformation(
                    "Cleaned up thread {ThreadId} (inactive {Days} days)",
                    thread.ThreadId, 
                    (DateTime.UtcNow - thread.LastActivity).TotalDays);
            }
            catch (Exception ex)
            {
                _logger.LogWarning(ex, 
                    "Failed to cleanup thread {ThreadId}", thread.ThreadId);
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

State Persistence Patterns

Agents often need to maintain state beyond the conversation—user preferences, accumulated context, or workflow progress. Here's a pattern that layers application state on top of thread state:

public class AgentSession
{
    public string ThreadId { get; set; } = string.Empty;
    public string UserId { get; set; } = string.Empty;
    public Dictionary<string, object> Metadata { get; set; } = new();
    public List<string> ActiveWorkflows { get; set; } = new();
    public DateTimeOffset CreatedAt { get; set; }
    public DateTimeOffset LastActivity { get; set; }
    public int TotalTokensUsed { get; set; }
    public decimal EstimatedCost { get; set; }
}

public class SessionStateManager
{
    private readonly IDistributedCache _cache;
    private readonly ISessionRepository _repository;
    private readonly JsonSerializerOptions _jsonOptions;

    public async Task<AgentSession> GetSessionAsync(
        string sessionId,
        CancellationToken ct = default)
    {
        // Try cache first
        var cached = await _cache.GetStringAsync($"session:{sessionId}", ct);
        if (!string.IsNullOrEmpty(cached))
        {
            return JsonSerializer.Deserialize<AgentSession>(cached, _jsonOptions)!;
        }

        // Fall back to persistent storage
        var session = await _repository.GetAsync(sessionId, ct);
        if (session != null)
        {
            await CacheSessionAsync(session, ct);
        }

        return session ?? throw new SessionNotFoundException(sessionId);
    }

    public async Task UpdateSessionAsync(
        AgentSession session,
        CancellationToken ct = default)
    {
        session.LastActivity = DateTimeOffset.UtcNow;

        // Update cache immediately for fast reads
        await CacheSessionAsync(session, ct);

        // Persist asynchronously (eventual consistency is usually fine)
        _ = _repository.UpsertAsync(session, ct);
    }

    public async Task<T?> GetMetadataAsync<T>(
        string sessionId,
        string key,
        CancellationToken ct = default)
    {
        var session = await GetSessionAsync(sessionId, ct);

        if (session.Metadata.TryGetValue(key, out var value))
        {
            if (value is JsonElement element)
            {
                return element.Deserialize<T>(_jsonOptions);
            }
            return (T)value;
        }

        return default;
    }

    private async Task CacheSessionAsync(AgentSession session, CancellationToken ct)
    {
        var json = JsonSerializer.Serialize(session, _jsonOptions);
        await _cache.SetStringAsync(
            $"session:{session.ThreadId}",
            json,
            new DistributedCacheEntryOptions
            {
                SlidingExpiration = TimeSpan.FromHours(2)
            },
            ct);
    }
}
Enter fullscreen mode Exit fullscreen mode

Handling Session Recovery

When things go wrong (and they will), you need graceful recovery:

public class ResilientAgentService
{
    private readonly AgentsClient _client;
    private readonly SessionStateManager _sessions;
    private readonly ILogger<ResilientAgentService> _logger;

    public async Task<AgentResponse> ProcessMessageAsync(
        string sessionId,
        string userMessage,
        CancellationToken ct = default)
    {
        AgentSession session;

        try
        {
            session = await _sessions.GetSessionAsync(sessionId, ct);
        }
        catch (SessionNotFoundException)
        {
            _logger.LogWarning("Session {SessionId} not found, creating new", sessionId);
            session = await CreateNewSessionAsync(sessionId, ct);
        }

        // Verify thread is still valid
        try
        {
            await _client.GetThreadAsync(session.ThreadId, ct);
        }
        catch (RequestFailedException ex) when (ex.Status == 404)
        {
            _logger.LogWarning(
                "Thread {ThreadId} for session {SessionId} not found, recreating",
                session.ThreadId, sessionId);

            session = await RecoverSessionAsync(session, ct);
        }

        return await ExecuteWithRetryAsync(session, userMessage, ct);
    }

    private async Task<AgentSession> RecoverSessionAsync(
        AgentSession oldSession,
        CancellationToken ct)
    {
        // Create new thread
        var thread = await _client.CreateThreadAsync(ct);

        // Preserve session metadata but reset thread
        var newSession = new AgentSession
        {
            ThreadId = thread.Value.Id,
            UserId = oldSession.UserId,
            Metadata = oldSession.Metadata,
            CreatedAt = DateTimeOffset.UtcNow,
            LastActivity = DateTimeOffset.UtcNow
        };

        // Optionally inject context about the recovery
        await _client.CreateMessageAsync(
            newSession.ThreadId,
            MessageRole.User,
            "System note: This is a recovered session. Previous context may be limited.",
            cancellationToken: ct);

        await _sessions.UpdateSessionAsync(newSession, ct);
        return newSession;
    }
}
Enter fullscreen mode Exit fullscreen mode

Observability with Azure Monitor

You can't fix what you can't see. Production agents need comprehensive observability:

public class ObservableAgentService
{
    private readonly AgentsClient _client;
    private readonly ILogger<ObservableAgentService> _logger;
    private readonly TelemetryClient _telemetry;
    private readonly Meter _meter;

    private readonly Counter<long> _messageCounter;
    private readonly Histogram<double> _responseLatency;
    private readonly Counter<long> _tokenCounter;
    private readonly Counter<long> _errorCounter;

    public ObservableAgentService(
        AgentsClient client,
        ILogger<ObservableAgentService> logger,
        TelemetryClient telemetry,
        IMeterFactory meterFactory)
    {
        _client = client;
        _logger = logger;
        _telemetry = telemetry;
        _meter = meterFactory.Create("AgentService");

        _messageCounter = _meter.CreateCounter<long>(
            "agent.messages.total",
            description: "\"Total messages processed\");"

        _responseLatency = _meter.CreateHistogram<double>(
            "agent.response.duration",
            unit: "ms",
            description: "\"Agent response time in milliseconds\");"

        _tokenCounter = _meter.CreateCounter<long>(
            "agent.tokens.total",
            description: "\"Total tokens consumed\");"

        _errorCounter = _meter.CreateCounter<long>(
            "agent.errors.total",
            description: "\"Total errors encountered\");"
    }

    public async Task<AgentResponse> ProcessWithTelemetryAsync(
        string agentId,
        string threadId,
        string message,
        CancellationToken ct = default)
    {
        var stopwatch = Stopwatch.StartNew();
        var operationId = Activity.Current?.Id ?? Guid.NewGuid().ToString();

        using var operation = _telemetry.StartOperation<RequestTelemetry>(
            "AgentInteraction");
        operation.Telemetry.Properties["AgentId"] = agentId;
        operation.Telemetry.Properties["ThreadId"] = threadId;

        _logger.LogInformation(
            "Processing message for agent {AgentId} on thread {ThreadId}. " +
            "OperationId: {OperationId}",
            agentId, threadId, operationId);

        try
        {
            // Create message
            await _client.CreateMessageAsync(
                threadId,
                MessageRole.User,
                message,
                cancellationToken: ct);

            // Run agent
            var run = await _client.CreateRunAsync(threadId, agentId, ct);

            // Poll for completion with telemetry
            var result = await PollRunWithTelemetryAsync(
                threadId, run.Value.Id, ct);

            stopwatch.Stop();

            // Record metrics
            _messageCounter.Add(1, 
                new KeyValuePair<string, object?>("agent_id", agentId),
                new KeyValuePair<string, object?>("status", "success"));

            _responseLatency.Record(stopwatch.ElapsedMilliseconds,
                new KeyValuePair<string, object?>("agent_id", agentId));

            if (result.Usage != null)
            {
                _tokenCounter.Add(result.Usage.TotalTokens,
                    new KeyValuePair<string, object?>("agent_id", agentId),
                    new KeyValuePair<string, object?>("type", "total"));
            }

            _logger.LogInformation(
                "Completed agent interaction in {ElapsedMs}ms. " +
                "Tokens: {Tokens}. OperationId: {OperationId}",
                stopwatch.ElapsedMilliseconds,
                result.Usage?.TotalTokens ?? 0,
                operationId);

            return result;
        }
        catch (Exception ex)
        {
            _errorCounter.Add(1,
                new KeyValuePair<string, object?>("agent_id", agentId),
                new KeyValuePair<string, object?>("error_type", ex.GetType().Name));

            _telemetry.TrackException(ex, new Dictionary<string, string>
            {
                ["AgentId"] = agentId,
                ["ThreadId"] = threadId,
                ["OperationId"] = operationId
            });

            _logger.LogError(ex,
                "Agent interaction failed. AgentId: {AgentId}, " +
                "ThreadId: {ThreadId}, OperationId: {OperationId}",
                agentId, threadId, operationId);

            throw;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Custom Metrics Dashboard

Set up Application Insights queries for your agent metrics:

// Agent Response Time Percentiles
customMetrics
| where name == "agent.response.duration"
| summarize 
    p50 = percentile(value, 50),
    p95 = percentile(value, 95),
    p99 = percentile(value, 99)
    by bin(timestamp, 1h), tostring(customDimensions.agent_id)
| render timechart

// Token Consumption by Agent
customMetrics
| where name == "agent.tokens.total"
| summarize TotalTokens = sum(value) 
    by bin(timestamp, 1d), tostring(customDimensions.agent_id)
| render columnchart

// Error Rate
customMetrics
| where name == "agent.errors.total"
| summarize Errors = sum(value) 
    by bin(timestamp, 1h), tostring(customDimensions.error_type)
| render timechart
Enter fullscreen mode Exit fullscreen mode

Error Handling and Retry Patterns

Azure AI Agent Service calls can fail for various reasons. Here's a robust retry strategy using Polly:

public static class AgentResiliencePolicy
{
    public static IAsyncPolicy<T> CreatePolicy<T>(ILogger logger)
    {
        // Retry with exponential backoff for transient failures
        var retryPolicy = Policy<T>
            .Handle<RequestFailedException>(ex => IsTransient(ex))
            .Or<HttpRequestException>()
            .Or<TaskCanceledException>()
            .WaitAndRetryAsync(
                retryCount: 3,
                sleepDurationProvider: attempt => 
                    TimeSpan.FromSeconds(Math.Pow(2, attempt)),
                onRetry: (outcome, timespan, attempt, context) =>
                {
                    logger.LogWarning(
                        "Retry {Attempt} after {Delay}ms due to {Error}",
                        attempt, timespan.TotalMilliseconds,
                        outcome.Exception?.Message);
                });

        // Circuit breaker for sustained failures
        var circuitBreaker = Policy<T>
            .Handle<RequestFailedException>()
            .CircuitBreakerAsync(
                handledEventsAllowedBeforeBreaking: 5,
                durationOfBreak: TimeSpan.FromMinutes(1),
                onBreak: (outcome, breakDelay) =>
                {
                    logger.LogError(
                        "Circuit breaker opened for {Duration}ms",
                        breakDelay.TotalMilliseconds);
                },
                onReset: () => logger.LogInformation("Circuit breaker reset"));

        // Timeout for hanging requests
        var timeout = Policy.TimeoutAsync<T>(
            TimeSpan.FromMinutes(5),
            TimeoutStrategy.Pessimistic);

        return Policy.WrapAsync(timeout, retryPolicy, circuitBreaker);
    }

    private static bool IsTransient(RequestFailedException ex)
    {
        return ex.Status switch
        {
            429 => true,  // Rate limited
            500 => true,  // Internal server error
            502 => true,  // Bad gateway
            503 => true,  // Service unavailable
            504 => true,  // Gateway timeout
            _ => false
        };
    }
}

// Usage in your service
public class ResilientAgentClient
{
    private readonly AgentsClient _client;
    private readonly IAsyncPolicy<ThreadRun> _runPolicy;

    public async Task<ThreadRun> CreateRunWithResilienceAsync(
        string threadId,
        string agentId,
        CancellationToken ct = default)
    {
        return await _runPolicy.ExecuteAsync(async () =>
        {
            var response = await _client.CreateRunAsync(threadId, agentId, ct);
            return response.Value;
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

Handling Rate Limits Gracefully

public class RateLimitHandler
{
    private readonly SemaphoreSlim _semaphore;
    private readonly ILogger<RateLimitHandler> _logger;
    private DateTime _retryAfter = DateTime.MinValue;

    public RateLimitHandler(int maxConcurrency, ILogger<RateLimitHandler> logger)
    {
        _semaphore = new SemaphoreSlim(maxConcurrency);
        _logger = logger;
    }

    public async Task<T> ExecuteAsync<T>(
        Func<Task<T>> operation,
        CancellationToken ct = default)
    {
        // Wait if we're in a rate limit cooldown
        var waitTime = _retryAfter - DateTime.UtcNow;
        if (waitTime > TimeSpan.Zero)
        {
            _logger.LogInformation(
                "Rate limit active, waiting {WaitMs}ms",
                waitTime.TotalMilliseconds);
            await Task.Delay(waitTime, ct);
        }

        await _semaphore.WaitAsync(ct);
        try
        {
            return await operation();
        }
        catch (RequestFailedException ex) when (ex.Status == 429)
        {
            // Parse retry-after header
            var retryAfterSeconds = ParseRetryAfter(ex);
            _retryAfter = DateTime.UtcNow.AddSeconds(retryAfterSeconds);

            _logger.LogWarning(
                "Rate limited. Retry after {Seconds} seconds",
                retryAfterSeconds);

            throw;
        }
        finally
        {
            _semaphore.Release();
        }
    }

    private int ParseRetryAfter(RequestFailedException ex)
    {
        // Default to 60 seconds if header not present
        // In production, parse the actual header from the response
        return 60;
    }
}
Enter fullscreen mode Exit fullscreen mode

Cost Management and Token Tracking

Agents can get expensive fast. Build cost awareness into your system from day one:

public class CostTracker
{
    private readonly IMetricsService _metrics;
    private readonly ICostRepository _repository;
    private readonly ILogger<CostTracker> _logger;

    // Pricing as of 2024 - verify current rates!
    private static readonly Dictionary<string, (decimal Input, decimal Output)> Pricing = new()
    {
        ["gpt-4o"] = (0.005m / 1000, 0.015m / 1000),
        ["gpt-4o-mini"] = (0.00015m / 1000, 0.0006m / 1000),
        ["gpt-4-turbo"] = (0.01m / 1000, 0.03m / 1000)
    };

    public async Task<UsageReport> TrackUsageAsync(
        string userId,
        string agentId,
        string model,
        int inputTokens,
        int outputTokens,
        CancellationToken ct = default)
    {
        var (inputRate, outputRate) = Pricing.GetValueOrDefault(
            model, (0.01m / 1000, 0.03m / 1000));

        var cost = (inputTokens * inputRate) + (outputTokens * outputRate);

        var usage = new UsageRecord
        {
            UserId = userId,
            AgentId = agentId,
            Model = model,
            InputTokens = inputTokens,
            OutputTokens = outputTokens,
            EstimatedCost = cost,
            Timestamp = DateTimeOffset.UtcNow
        };

        await _repository.RecordUsageAsync(usage, ct);

        // Check if user is approaching limits
        var dailyUsage = await _repository.GetDailyUsageAsync(userId, ct);
        var monthlyUsage = await _repository.GetMonthlyUsageAsync(userId, ct);

        if (dailyUsage.TotalCost > 10m) // $10 daily threshold
        {
            _logger.LogWarning(
                "User {UserId} exceeded daily cost threshold: ${Cost:F2}",
                userId, dailyUsage.TotalCost);
        }

        return new UsageReport
        {
            CurrentRequest = usage,
            DailyTotal = dailyUsage,
            MonthlyTotal = monthlyUsage
        };
    }
}

public class BudgetEnforcementMiddleware
{
    private readonly ICostRepository _repository;
    private readonly ILogger<BudgetEnforcementMiddleware> _logger;

    public async Task<bool> CanProcessRequestAsync(
        string userId,
        CancellationToken ct = default)
    {
        var budget = await _repository.GetUserBudgetAsync(userId, ct);
        var usage = await _repository.GetMonthlyUsageAsync(userId, ct);

        if (usage.TotalCost >= budget.MonthlyLimit)
        {
            _logger.LogWarning(
                "User {UserId} blocked - monthly budget exhausted. " +
                "Used: ${Used:F2}, Limit: ${Limit:F2}",
                userId, usage.TotalCost, budget.MonthlyLimit);
            return false;
        }

        // Warn at 80% threshold
        if (usage.TotalCost >= budget.MonthlyLimit * 0.8m)
        {
            _logger.LogInformation(
                "User {UserId} approaching budget limit: " +
                "${Used:F2} of ${Limit:F2}",
                userId, usage.TotalCost, budget.MonthlyLimit);
        }

        return true;
    }
}
Enter fullscreen mode Exit fullscreen mode

Optimizing Token Usage

public class TokenOptimizer
{
    private readonly int _maxContextTokens;
    private readonly ITokenCounter _tokenCounter;

    public TokenOptimizer(int maxContextTokens, ITokenCounter tokenCounter)
    {
        _maxContextTokens = maxContextTokens;
        _tokenCounter = tokenCounter;
    }

    public async Task<string> OptimizeContextAsync(
        IList<Message> messages,
        string systemPrompt,
        CancellationToken ct = default)
    {
        var systemTokens = await _tokenCounter.CountAsync(systemPrompt, ct);
        var availableTokens = _maxContextTokens - systemTokens - 1000; // Reserve for response

        var optimizedMessages = new List<Message>();
        var currentTokens = 0;

        // Always keep the most recent messages
        foreach (var message in messages.Reverse())
        {
            var messageTokens = await _tokenCounter.CountAsync(message.Content, ct);

            if (currentTokens + messageTokens > availableTokens)
            {
                // Summarize older messages instead of dropping them
                var summary = await SummarizeOlderMessagesAsync(
                    messages.Take(messages.Count - optimizedMessages.Count).ToList(),
                    ct);

                optimizedMessages.Insert(0, new Message
                {
                    Role = "system",
                    Content = $"[Earlier conversation summary: {summary}]"
                });
                break;
            }

            optimizedMessages.Insert(0, message);
            currentTokens += messageTokens;
        }

        return JsonSerializer.Serialize(optimizedMessages);
    }
}
Enter fullscreen mode Exit fullscreen mode

Putting It All Together

Here's a complete production-ready agent service:

public class ProductionAgentService
{
    private readonly AgentsClient _client;
    private readonly SessionStateManager _sessions;
    private readonly CostTracker _costTracker;
    private readonly ObservableAgentService _telemetry;
    private readonly RateLimitHandler _rateLimiter;
    private readonly BudgetEnforcementMiddleware _budgetEnforcer;
    private readonly ILogger<ProductionAgentService> _logger;

    public async Task<AgentResponse> ProcessAsync(
        AgentRequest request,
        CancellationToken ct = default)
    {
        // 1. Budget check
        if (!await _budgetEnforcer.CanProcessRequestAsync(request.UserId, ct))
        {
            throw new BudgetExceededException(request.UserId);
        }

        // 2. Get or create session
        var session = await _sessions.GetSessionAsync(request.SessionId, ct);

        // 3. Execute with rate limiting and telemetry
        var response = await _rateLimiter.ExecuteAsync(async () =>
        {
            return await _telemetry.ProcessWithTelemetryAsync(
                request.AgentId,
                session.ThreadId,
                request.Message,
                ct);
        }, ct);

        // 4. Track costs
        if (response.Usage != null)
        {
            await _costTracker.TrackUsageAsync(
                request.UserId,
                request.AgentId,
                response.Model,
                response.Usage.PromptTokens,
                response.Usage.CompletionTokens,
                ct);
        }

        // 5. Update session state
        session.TotalTokensUsed += response.Usage?.TotalTokens ?? 0;
        await _sessions.UpdateSessionAsync(session, ct);

        return response;
    }
}
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. Threads are your conversation state - Cache IDs, verify existence, and clean up stale threads regularly

  2. Layer application state - Use sessions to track metadata, costs, and workflow state on top of thread state

  3. Instrument everything - Response times, token counts, error rates, and costs should all be observable

  4. Build resilience in - Retry transient failures, respect rate limits, and implement circuit breakers

  5. Track costs religiously - Token usage can surprise you; build budgets and alerts from day one

  6. Plan for recovery - Sessions will get corrupted, threads will disappear—handle it gracefully

In the final part of this series, we'll cover testing strategies and deployment patterns for Azure AI Agent Service, including integration testing, load testing, and CI/CD pipelines.


Coming up next: Part 5 - "Testing and Deployment: CI/CD for AI Agents"

Found this helpful? Follow me for more Azure AI content, and drop a comment with your production war stories!

Top comments (0)