DEV Community

Cover image for 🧩 Minha Primeira Comunicação com MCP e .NET – Parte 4
Danilo O. Pinheiro, dopme.io
Danilo O. Pinheiro, dopme.io

Posted on

🧩 Minha Primeira Comunicação com MCP e .NET – Parte 4

Integração Completa com gRPC & WebSocket

Nesta quarta parte da série "Minha Primeira Comunicação com MCP e .NET", exploramos como criar uma arquitetura híbrida que combina o melhor dos dois mundos: a eficiência e tipagem forte do gRPC para comunicação interna entre serviços, e a flexibilidade e suporte nativo do WebSocket para clientes web e mobile, criando um ecossistema MCP robusto e escalável.


🚀 Introdução

Após explorarmos gRPC (Parte 2) e WebSocket (Parte 3) individualmente, surge uma questão arquitetural importante: e se precisarmos dos benefícios de ambos?

Em sistemas corporativos modernos, é comum ter:

  • Microsserviços backend que se comunicam via gRPC (baixa latência, tipagem forte)
  • Clientes web/mobile que precisam de WebSocket (suporte nativo, real-time)
  • Agentes MCP que podem usar ambos os protocolos conforme o contexto

Este artigo demonstra como construir uma arquitetura unificada que oferece ambos os protocolos, com roteamento inteligente, adaptadores de protocolo, e observabilidade centralizada.


🧠 Arquitetura Híbrida: O Melhor dos Dois Mundos

Visão Geral da Arquitetura

┌─────────────────────────────────────────────────────────────────┐
│                        API Gateway / BFF                         │
│                    (Protocol Orchestrator)                       │
└──────────────────┬─────────────────────┬────────────────────────┘
                   │                     │
         ┌─────────▼──────────┐ ┌───────▼──────────┐
         │   gRPC Endpoint    │ │ WebSocket Endpoint│
         │   (Port 5001)      │ │  (Port 5002)      │
         └─────────┬──────────┘ └───────┬───────────┘
                   │                     │
                   │    ┌────────────────┴─────────────────┐
                   │    │                                  │
         ┌─────────▼────▼──────────────┐    ┌─────────────▼──────┐
         │   Protocol Adapter Layer    │    │  SignalR Hub Layer  │
         │   (gRPC ↔ WebSocket)       │    │  (Client Manager)   │
         └─────────┬──────────────────┘    └─────────┬────────────┘
                   │                                  │
         ┌─────────▼──────────────────────────────────▼────────┐
         │              MCP Kernel Orchestrator                │
         │         (Unified Command Processing)                │
         └─────────┬──────────────────────────┬─────────────────┘
                   │                          │
         ┌─────────▼────────┐      ┌─────────▼──────────┐
         │  Domain Services │      │  Event Bus (Redis) │
         │  (Business Logic)│      │  (Cross-Protocol)  │
         └──────────────────┘      └────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Componentes Principais

  1. Protocol Orchestrator - Decide qual protocolo usar baseado em contexto
  2. Protocol Adapter Layer - Converte mensagens entre gRPC e WebSocket
  3. Unified MCP Kernel - Processa comandos independente do protocolo
  4. Event Bus - Sincroniza eventos entre instâncias e protocolos
  5. Observability Layer - Tracing unificado para ambos os protocolos

🏗️ Implementação da Arquitetura Híbrida

1️⃣ Estrutura do Projeto

MCPPipeline.Hybrid/
├── src/
│   ├── MCPPipeline.Contracts/           # Modelos compartilhados
│   │   ├── Messages/
│   │   │   ├── MCPCommand.cs
│   │   │   ├── MCPResponse.cs
│   │   │   └── MCPEvent.cs
│   │   └── Protos/
│   │       └── mcp.proto
│   │
│   ├── MCPPipeline.Core/                # Lógica de negócio
│   │   ├── Services/
│   │   │   ├── IMCPKernelService.cs
│   │   │   └── MCPKernelService.cs
│   │   └── Events/
│   │       └── IEventBus.cs
│   │
│   ├── MCPPipeline.GrpcService/         # Endpoint gRPC
│   │   ├── Services/
│   │   │   └── MCPGrpcService.cs
│   │   └── Program.cs
│   │
│   ├── MCPPipeline.WebSocketService/    # Endpoint WebSocket
│   │   ├── Hubs/
│   │   │   └── MCPHub.cs
│   │   └── Program.cs
│   │
│   ├── MCPPipeline.Gateway/             # API Gateway (YARP)
│   │   ├── Configuration/
│   │   └── Program.cs
│   │
│   └── MCPPipeline.Adapter/             # Protocol Adapter
│       ├── GrpcToWebSocketAdapter.cs
│       └── WebSocketToGrpcAdapter.cs
│
└── tests/
    └── MCPPipeline.IntegrationTests/
Enter fullscreen mode Exit fullscreen mode

2️⃣ Contratos Compartilhados

// MCPPipeline.Contracts/Messages/MCPCommand.cs
namespace MCPPipeline.Contracts.Messages;

public record MCPCommand
{
    public string CommandId { get; init; } = Guid.NewGuid().ToString();
    public string Command { get; init; } = string.Empty;
    public string Payload { get; init; } = string.Empty;
    public Dictionary<string, string> Metadata { get; init; } = new();
    public DateTime Timestamp { get; init; } = DateTime.UtcNow;
    public string SessionId { get; init; } = string.Empty;
    public ProtocolType Protocol { get; init; }
    public int Priority { get; init; } = 0;
}

public record MCPResponse
{
    public string CommandId { get; init; } = string.Empty;
    public string Result { get; init; } = string.Empty;
    public ResponseStatus Status { get; init; }
    public string? ErrorMessage { get; init; }
    public long ProcessingTimeMs { get; init; }
    public DateTime Timestamp { get; init; } = DateTime.UtcNow;
    public Dictionary<string, object> Metrics { get; init; } = new();
}

public record MCPEvent
{
    public string EventId { get; init; } = Guid.NewGuid().ToString();
    public string EventType { get; init; } = string.Empty;
    public string Source { get; init; } = string.Empty;
    public object Data { get; init; } = new();
    public DateTime Timestamp { get; init; } = DateTime.UtcNow;
}

public enum ProtocolType
{
    Unknown = 0,
    Grpc = 1,
    WebSocket = 2,
    Http = 3
}

public enum ResponseStatus
{
    Success,
    Error,
    Processing,
    Timeout
}
Enter fullscreen mode Exit fullscreen mode

3️⃣ MCP Kernel Unificado

// MCPPipeline.Core/Services/MCPKernelService.cs
using MCPPipeline.Contracts.Messages;
using System.Diagnostics;

namespace MCPPipeline.Core.Services;

public interface IMCPKernelService
{
    Task<MCPResponse> ExecuteCommandAsync(MCPCommand command, CancellationToken ct);
    IAsyncEnumerable<string> StreamCommandAsync(MCPCommand command, CancellationToken ct);
}

public class MCPKernelService : IMCPKernelService
{
    private readonly ILogger<MCPKernelService> _logger;
    private readonly IEventBus _eventBus;
    private static readonly ActivitySource ActivitySource = new("MCPPipeline.Core");

    public MCPKernelService(
        ILogger<MCPKernelService> logger,
        IEventBus eventBus)
    {
        _logger = logger;
        _eventBus = eventBus;
    }

    public async Task<MCPResponse> ExecuteCommandAsync(
        MCPCommand command, 
        CancellationToken ct)
    {
        using var activity = ActivitySource.StartActivity("ExecuteCommand");
        activity?.SetTag("command.id", command.CommandId);
        activity?.SetTag("command.type", command.Command);
        activity?.SetTag("protocol", command.Protocol.ToString());

        var sw = Stopwatch.StartNew();

        try
        {
            _logger.LogInformation(
                "Executando comando {Command} via {Protocol} | Session: {SessionId}",
                command.Command,
                command.Protocol,
                command.SessionId);

            // Publicar evento de início
            await _eventBus.PublishAsync(new MCPEvent
            {
                EventType = "CommandStarted",
                Source = command.Protocol.ToString(),
                Data = new { command.CommandId, command.Command }
            });

            // Processar comando
            var result = await ProcessCommandAsync(command, ct);

            sw.Stop();

            var response = new MCPResponse
            {
                CommandId = command.CommandId,
                Result = result,
                Status = ResponseStatus.Success,
                ProcessingTimeMs = sw.ElapsedMilliseconds,
                Metrics = new Dictionary<string, object>
                {
                    ["protocol"] = command.Protocol.ToString(),
                    ["priority"] = command.Priority
                }
            };

            // Publicar evento de conclusão
            await _eventBus.PublishAsync(new MCPEvent
            {
                EventType = "CommandCompleted",
                Source = command.Protocol.ToString(),
                Data = new { command.CommandId, response.ProcessingTimeMs }
            });

            activity?.SetTag("response.status", "success");
            return response;
        }
        catch (Exception ex)
        {
            sw.Stop();

            _logger.LogError(ex, 
                "Erro ao executar comando {Command} via {Protocol}", 
                command.Command, 
                command.Protocol);

            activity?.SetTag("response.status", "error");
            activity?.SetTag("error.message", ex.Message);

            await _eventBus.PublishAsync(new MCPEvent
            {
                EventType = "CommandFailed",
                Source = command.Protocol.ToString(),
                Data = new { command.CommandId, Error = ex.Message }
            });

            return new MCPResponse
            {
                CommandId = command.CommandId,
                Status = ResponseStatus.Error,
                ErrorMessage = ex.Message,
                ProcessingTimeMs = sw.ElapsedMilliseconds
            };
        }
    }

    public async IAsyncEnumerable<string> StreamCommandAsync(
        MCPCommand command,
        [EnumeratorCancellation] CancellationToken ct)
    {
        using var activity = ActivitySource.StartActivity("StreamCommand");
        activity?.SetTag("command.id", command.CommandId);

        _logger.LogInformation(
            "Iniciando streaming para comando {Command} via {Protocol}",
            command.Command,
            command.Protocol);

        var chunks = await GenerateStreamChunksAsync(command.Payload, ct);

        for (int i = 0; i < chunks.Count; i++)
        {
            if (ct.IsCancellationRequested)
                yield break;

            yield return chunks[i];
            await Task.Delay(50, ct); // Simula processamento
        }
    }

    private async Task<string> ProcessCommandAsync(MCPCommand command, CancellationToken ct)
    {
        return command.Command.ToLowerInvariant() switch
        {
            "analyze" => await AnalyzeAsync(command.Payload, ct),
            "summarize" => await SummarizeAsync(command.Payload, ct),
            "translate" => await TranslateAsync(command.Payload, ct),
            "generate" => await GenerateAsync(command.Payload, ct),
            "status" => await GetStatusAsync(ct),
            _ => throw new InvalidOperationException($"Comando desconhecido: {command.Command}")
        };
    }

    private async Task<string> AnalyzeAsync(string payload, CancellationToken ct)
    {
        await Task.Delay(200, ct);
        var words = payload.Split(' ', StringSplitOptions.RemoveEmptyEntries).Length;
        return $"Análise: {words} palavras, {payload.Length} caracteres";
    }

    private async Task<string> SummarizeAsync(string payload, CancellationToken ct)
    {
        await Task.Delay(300, ct);
        return payload.Length > 100 
            ? $"Resumo: {payload[..100]}..." 
            : $"Resumo: {payload}";
    }

    private async Task<string> TranslateAsync(string payload, CancellationToken ct)
    {
        await Task.Delay(250, ct);
        return $"[Traduzido] {payload}";
    }

    private async Task<string> GenerateAsync(string payload, CancellationToken ct)
    {
        await Task.Delay(500, ct);
        return $"Conteúdo gerado baseado em: {payload}";
    }

    private async Task<string> GetStatusAsync(CancellationToken ct)
    {
        await Task.Delay(50, ct);
        return $"Sistema operacional | Timestamp: {DateTime.UtcNow:O}";
    }

    private async Task<List<string>> GenerateStreamChunksAsync(string prompt, CancellationToken ct)
    {
        await Task.Delay(100, ct);
        return new List<string>
        {
            "Iniciando processamento...",
            "Analisando contexto...",
            "Gerando resposta...",
            $"Resultado: {prompt}",
            "Processamento concluído."
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

4️⃣ Event Bus com Redis

// MCPPipeline.Core/Events/IEventBus.cs
namespace MCPPipeline.Core.Events;

public interface IEventBus
{
    Task PublishAsync<T>(T @event) where T : class;
    Task SubscribeAsync<T>(Func<T, Task> handler) where T : class;
}

// MCPPipeline.Core/Events/RedisEventBus.cs
using StackExchange.Redis;
using System.Text.Json;

public class RedisEventBus : IEventBus
{
    private readonly IConnectionMultiplexer _redis;
    private readonly ILogger<RedisEventBus> _logger;
    private readonly ISubscriber _subscriber;

    public RedisEventBus(
        IConnectionMultiplexer redis,
        ILogger<RedisEventBus> logger)
    {
        _redis = redis;
        _logger = logger;
        _subscriber = redis.GetSubscriber();
    }

    public async Task PublishAsync<T>(T @event) where T : class
    {
        var channel = typeof(T).Name;
        var message = JsonSerializer.Serialize(@event);

        await _subscriber.PublishAsync(channel, message);

        _logger.LogDebug("Evento publicado: {EventType}", channel);
    }

    public async Task SubscribeAsync<T>(Func<T, Task> handler) where T : class
    {
        var channel = typeof(T).Name;

        await _subscriber.SubscribeAsync(channel, async (ch, message) =>
        {
            try
            {
                var @event = JsonSerializer.Deserialize<T>(message!);
                if (@event != null)
                {
                    await handler(@event);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Erro ao processar evento {EventType}", channel);
            }
        });

        _logger.LogInformation("Inscrito no canal: {Channel}", channel);
    }
}
Enter fullscreen mode Exit fullscreen mode

5️⃣ Protocol Adapter

// MCPPipeline.Adapter/ProtocolAdapter.cs
using MCPPipeline.Contracts.Messages;
using MCPPipeline.Grpc;

namespace MCPPipeline.Adapter;

public interface IProtocolAdapter
{
    MCPCommand FromGrpcRequest(MCPRequest grpcRequest);
    MCPRequest ToGrpcRequest(MCPCommand command);
    MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse);
    MCPGrpcResponse ToGrpcResponse(MCPResponse response);
}

public class ProtocolAdapter : IProtocolAdapter
{
    public MCPCommand FromGrpcRequest(MCPRequest grpcRequest)
    {
        return new MCPCommand
        {
            Command = grpcRequest.Command,
            Payload = grpcRequest.Payload,
            Metadata = grpcRequest.Metadata.ToDictionary(k => k.Key, v => v.Value),
            Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(grpcRequest.Timestamp).DateTime,
            Protocol = ProtocolType.Grpc
        };
    }

    public MCPRequest ToGrpcRequest(MCPCommand command)
    {
        return new MCPRequest
        {
            Command = command.Command,
            Payload = command.Payload,
            Metadata = { command.Metadata },
            Timestamp = new DateTimeOffset(command.Timestamp).ToUnixTimeMilliseconds()
        };
    }

    public MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse)
    {
        return new MCPResponse
        {
            Result = grpcResponse.Result,
            Status = grpcResponse.Status switch
            {
                "OK" => ResponseStatus.Success,
                "ERROR" => ResponseStatus.Error,
                "PROCESSING" => ResponseStatus.Processing,
                _ => ResponseStatus.Error
            },
            ErrorMessage = grpcResponse.ErrorMessage,
            ProcessingTimeMs = grpcResponse.ProcessingTimeMs
        };
    }

    public MCPGrpcResponse ToGrpcResponse(MCPResponse response)
    {
        return new MCPGrpcResponse
        {
            Result = response.Result,
            Status = response.Status.ToString().ToUpperInvariant(),
            ErrorMessage = response.ErrorMessage ?? string.Empty,
            ProcessingTimeMs = response.ProcessingTimeMs
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

6️⃣ gRPC Service Endpoint

// MCPPipeline.GrpcService/Services/MCPGrpcService.cs
using Grpc.Core;
using MCPPipeline.Grpc;
using MCPPipeline.Core.Services;
using MCPPipeline.Adapter;

namespace MCPPipeline.GrpcService.Services;

public class MCPGrpcServiceImpl : MCPService.MCPServiceBase
{
    private readonly IMCPKernelService _kernelService;
    private readonly IProtocolAdapter _adapter;
    private readonly ILogger<MCPGrpcServiceImpl> _logger;

    public MCPGrpcServiceImpl(
        IMCPKernelService kernelService,
        IProtocolAdapter adapter,
        ILogger<MCPGrpcServiceImpl> logger)
    {
        _kernelService = kernelService;
        _adapter = adapter;
        _logger = logger;
    }

    public override async Task<MCPGrpcResponse> SendCommand(
        MCPRequest request,
        ServerCallContext context)
    {
        _logger.LogInformation(
            "Recebido comando gRPC: {Command} de {Peer}",
            request.Command,
            context.Peer);

        // Converter para modelo unificado
        var command = _adapter.FromGrpcRequest(request);
        command = command with { SessionId = context.Peer };

        // Processar via kernel unificado
        var response = await _kernelService.ExecuteCommandAsync(
            command,
            context.CancellationToken);

        // Converter resposta
        return _adapter.ToGrpcResponse(response);
    }

    public override async Task StreamCommand(
        MCPRequest request,
        IServerStreamWriter<MCPStreamResponse> responseStream,
        ServerCallContext context)
    {
        _logger.LogInformation(
            "Iniciando streaming gRPC: {Command}",
            request.Command);

        var command = _adapter.FromGrpcRequest(request);

        int chunkIndex = 0;
        await foreach (var chunk in _kernelService.StreamCommandAsync(
            command,
            context.CancellationToken))
        {
            await responseStream.WriteAsync(new MCPStreamResponse
            {
                Content = chunk,
                ChunkIndex = chunkIndex++,
                IsComplete = false
            });
        }

        // Enviar chunk final
        await responseStream.WriteAsync(new MCPStreamResponse
        {
            Content = "Stream concluído",
            ChunkIndex = chunkIndex,
            IsComplete = true
        });
    }

    public override Task<HealthResponse> HealthCheck(
        HealthRequest request,
        ServerCallContext context)
    {
        return Task.FromResult(new HealthResponse
        {
            Status = "Healthy",
            Version = "1.0.0",
            Protocol = "gRPC"
        });
    }
}

// Program.cs
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddGrpc();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>();

// Redis para event bus
builder.Services.AddSingleton<IConnectionMultiplexer>(
    ConnectionMultiplexer.Connect("localhost:6379"));
builder.Services.AddSingleton<IEventBus, RedisEventBus>();

var app = builder.Build();

app.MapGrpcService<MCPGrpcServiceImpl>();
app.MapGet("/", () => "MCP gRPC Service");

app.Run();
Enter fullscreen mode Exit fullscreen mode

7️⃣ WebSocket Service Endpoint

// MCPPipeline.WebSocketService/Hubs/MCPHub.cs
using Microsoft.AspNetCore.SignalR;
using MCPPipeline.Contracts.Messages;
using MCPPipeline.Core.Services;

namespace MCPPipeline.WebSocketService.Hubs;

public class MCPHub : Hub
{
    private readonly IMCPKernelService _kernelService;
    private readonly ILogger<MCPHub> _logger;

    public MCPHub(
        IMCPKernelService kernelService,
        ILogger<MCPHub> logger)
    {
        _kernelService = kernelService;
        _logger = logger;
    }

    public override async Task OnConnectedAsync()
    {
        _logger.LogInformation("Cliente WebSocket conectado: {ConnectionId}", 
            Context.ConnectionId);

        await Clients.Caller.SendAsync("Connected", new
        {
            ConnectionId = Context.ConnectionId,
            Protocol = "WebSocket",
            Message = "Conectado ao MCP Hub"
        });

        await base.OnConnectedAsync();
    }

    public async Task SendCommand(MCPCommand command)
    {
        _logger.LogInformation(
            "Recebido comando WebSocket: {Command} de {ConnectionId}",
            command.Command,
            Context.ConnectionId);

        // Enriquecer comando com dados do WebSocket
        var enrichedCommand = command with 
        { 
            Protocol = ProtocolType.WebSocket,
            SessionId = Context.ConnectionId
        };

        // Processar via kernel unificado
        var response = await _kernelService.ExecuteCommandAsync(
            enrichedCommand,
            Context.ConnectionAborted);

        // Enviar resposta
        await Clients.Caller.SendAsync("CommandResponse", response);
    }

    public async Task StreamCommand(MCPCommand command)
    {
        _logger.LogInformation(
            "Iniciando streaming WebSocket: {Command}",
            command.Command);

        var enrichedCommand = command with 
        { 
            Protocol = ProtocolType.WebSocket,
            SessionId = Context.ConnectionId
        };

        await foreach (var chunk in _kernelService.StreamCommandAsync(
            enrichedCommand,
            Context.ConnectionAborted))
        {
            await Clients.Caller.SendAsync("StreamChunk", new
            {
                Content = chunk,
                Timestamp = DateTime.UtcNow
            });
        }

        await Clients.Caller.SendAsync("StreamComplete", new
        {
            Message = "Streaming concluído"
        });
    }
}

// Program.cs
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddSignalR();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();

// Redis para event bus
builder.Services.AddSingleton<IConnectionMultiplexer>(
    ConnectionMultiplexer.Connect("localhost:6379"));
builder.Services.AddSingleton<IEventBus, RedisEventBus>();

builder.Services.AddCors(options =>
{
    options.AddPolicy("AllowAll", policy =>
        policy.AllowAnyOrigin().AllowAnyMethod().AllowAnyHeader());
});

var app = builder.Build();

app.UseCors("AllowAll");
app.MapHub<MCPHub>("/mcphub");
app.MapGet("/", () => "MCP WebSocket Service");

app.Run();
Enter fullscreen mode Exit fullscreen mode

8️⃣ API Gateway com YARP

// MCPPipeline.Gateway/Program.cs
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddReverseProxy()
    .LoadFromConfig(builder.Configuration.GetSection("ReverseProxy"));

var app = builder.Build();

app.MapReverseProxy();
app.MapGet("/", () => Results.Ok(new
{
    Service = "MCP Gateway",
    Endpoints = new
    {
        gRPC = "https://localhost:5001",
        WebSocket = "https://localhost:5002/mcphub"
    }
}));

app.Run();

// appsettings.json
{
  "ReverseProxy": {
    "Routes": {
      "grpc-route": {
        "ClusterId": "grpc-cluster",
        "Match": {
          "Path": "/grpc/{**catch-all}"
        }
      },
      "websocket-route": {
        "ClusterId": "websocket-cluster",
        "Match": {
          "Path": "/ws/{**catch-all}"
        }
      }
    },
    "Clusters": {
      "grpc-cluster": {
        "Destinations": {
          "destination1": {
            "Address": "https://localhost:5001"
          }
        }
      },
      "websocket-cluster": {
        "Destinations": {
          "destination1": {
            "Address": "https://localhost:5002"
          }
        },
        "HttpRequest": {
          "Version": "1.1",
          "VersionPolicy": "RequestVersionOrLower"
        }
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

9️⃣ Cliente Unificado

// MCPPipeline.Client/UnifiedMCPClient.cs
public class UnifiedMCPClient : IAsyncDisposable
{
    private readonly MCPService.MCPServiceClient? _grpcClient;
    private readonly HubConnection? _hubConnection;
    private readonly ILogger<UnifiedMCPClient> _logger;
    private readonly bool _useGrpc;

    public UnifiedMCPClient(
        string endpoint,
        bool useGrpc,
        ILogger<UnifiedMCPClient> logger)
    {
        _useGrpc = useGrpc;
        _logger = logger;

        if (useGrpc)
        {
            var channel = GrpcChannel.ForAddress(endpoint);
            _grpcClient = new MCPService.MCPServiceClient(channel);
            _logger.LogInformation("Cliente configurado para gRPC: {Endpoint}", endpoint);
        }
        else
        {
            _hubConnection = new HubConnectionBuilder()
                .WithUrl(endpoint)
                .WithAutomaticReconnect()
                .Build();

            SetupWebSocketHandlers();
            _logger.LogInformation("Cliente configurado para WebSocket: {Endpoint}", endpoint);
        }
    }

    public async Task ConnectAsync(CancellationToken ct = default)
    {
        if (!_useGrpc && _hubConnection != null)
        {
            await _hubConnection.StartAsync(ct);
            _logger.LogInformation("Conectado via WebSocket");
        }
    }

    public async Task<MCPResponse> SendCommandAsync(
        string command,
        string payload,
        CancellationToken ct = default)
    {
        if (_useGrpc && _grpcClient != null)
        {
            return await SendViaGrpcAsync(command, payload, ct);
        }
        else if (_hubConnection != null)
        {
            return await SendViaWebSocketAsync(command, payload, ct);
        }

        throw new InvalidOperationException("Cliente não inicializado");
    }

    private async Task<MCPResponse> SendViaGrpcAsync(
        string command,
        string payload,
        CancellationToken ct)
    {
        _logger.LogInformation("Enviando comando via gRPC: {Command}", command);

        var request = new MCPRequest
        {
            Command = command,
            Payload = payload,
            Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
        };

        var response = await _grpcClient!.SendCommandAsync(request, cancellationToken: ct);

        return new MCPResponse
        {
            Result = response.Result,
            Status = response.Status == "OK" ? ResponseStatus.Success : ResponseStatus.Error,
            ErrorMessage = response.ErrorMessage,
            ProcessingTimeMs = response.ProcessingTimeMs
        };
    }

    private async Task<MCPResponse> SendViaWebSocketAsync(
        string command,
        string payload,
        CancellationToken ct)
    {
        _logger.LogInformation("Enviando comando via WebSocket: {Command}", command);

        var tcs = new TaskCompletionSource<MCPResponse>();

        void handler(MCPResponse response)
        {
            tcs.TrySetResult(response);
        }

        _hubConnection!.On<MCPResponse>("CommandResponse", handler);

        try
        {
            await _hubConnection.InvokeAsync("SendCommand", new MCPCommand
            {
                Command = command,
                Payload = payload,
                Timestamp = DateTime.UtcNow
            }, ct);

            return await tcs.Task.WaitAsync(TimeSpan.FromSeconds(30), ct);
        }
        finally
        {
            _hubConnection.Remove("CommandResponse");
        }
    }

    public async IAsyncEnumerable<string> StreamCommandAsync(
        string command,
        string payload,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        if (_useGrpc && _grpcClient != null)
        {
            await foreach (var chunk in StreamViaGrpcAsync(command, payload, ct))
            {
                yield return chunk;
            }
        }
        else if (_hubConnection != null)
        {
            await foreach (var chunk in StreamViaWebSocketAsync(command, payload, ct))
            {
                yield return chunk;
            }
        }
    }

    private async IAsyncEnumerable<string> StreamViaGrpcAsync(
        string command,
        string payload,
        [EnumeratorCancellation] CancellationToken ct)
    {
        var request = new MCPRequest
        {
            Command = command,
            Payload = payload,
            Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
        };

        using var call = _grpcClient!.StreamCommand(request, cancellationToken: ct);

        await foreach (var response in call.ResponseStream.ReadAllAsync(ct))
        {
            yield return response.Content;
        }
    }

    private async IAsyncEnumerable<string> StreamViaWebSocketAsync(
        string command,
        string payload,
        [EnumeratorCancellation] CancellationToken ct)
    {
        var channel = Channel.CreateUnbounded<string>();

        _hubConnection!.On<dynamic>("StreamChunk", (chunk) =>
        {
            channel.Writer.TryWrite(chunk.Content.ToString());
        });

        _hubConnection.On<dynamic>("StreamComplete", (_) =>
        {
            channel.Writer.Complete();
        });

        await _hubConnection.InvokeAsync("StreamCommand", new MCPCommand
        {
            Command = command,
            Payload = payload,
            Timestamp = DateTime.UtcNow
        }, ct);

        await foreach (var chunk in channel.Reader.ReadAllAsync(ct))
        {
            yield return chunk;
        }
    }

    private void SetupWebSocketHandlers()
    {
        _hubConnection!.On<object>("Connected", data =>
        {
            _logger.LogInformation("✅ WebSocket conectado: {Data}", data);
        });

        _hubConnection.Reconnecting += error =>
        {
            _logger.LogWarning("⚠️ WebSocket reconectando: {Error}", error?.Message);
            return Task.CompletedTask;
        };

        _hubConnection.Reconnected += connectionId =>
        {
            _logger.LogInformation("✅ WebSocket reconectado: {ConnectionId}", connectionId);
            return Task.CompletedTask;
        };

        _hubConnection.Closed += error =>
        {
            _logger.LogError("❌ WebSocket fechado: {Error}", error?.Message);
            return Task.CompletedTask;
        };
    }

    public async ValueTask DisposeAsync()
    {
        if (_hubConnection != null)
        {
            await _hubConnection.StopAsync();
            await _hubConnection.DisposeAsync();
        }
    }
}

// Exemplo de uso
public class Program
{
    public static async Task Main(string[] args)
    {
        var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
        var logger = loggerFactory.CreateLogger<UnifiedMCPClient>();

        // Cliente gRPC
        await using var grpcClient = new UnifiedMCPClient(
            "https://localhost:5001",
            useGrpc: true,
            logger);

        var response = await grpcClient.SendCommandAsync("analyze", "Teste via gRPC");
        Console.WriteLine($"gRPC: {response.Result}");

        // Cliente WebSocket
        await using var wsClient = new UnifiedMCPClient(
            "https://localhost:5002/mcphub",
            useGrpc: false,
            logger);

        await wsClient.ConnectAsync();

        response = await wsClient.SendCommandAsync("analyze", "Teste via WebSocket");
        Console.WriteLine($"WebSocket: {response.Result}");

        // Streaming
        await foreach (var chunk in wsClient.StreamCommandAsync("generate", "Prompt de teste"))
        {
            Console.WriteLine($"Chunk: {chunk}");
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

🔍 Observabilidade Unificada

OpenTelemetry Configuration

// MCPPipeline.Observability/OpenTelemetryExtensions.cs
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using OpenTelemetry.Metrics;

public static class OpenTelemetryExtensions
{
    public static IServiceCollection AddMCPObservability(
        this IServiceCollection services,
        string serviceName)
    {
        services.AddOpenTelemetry()
            .ConfigureResource(resource =>
            {
                resource.AddService(
                    serviceName: serviceName,
                    serviceVersion: "1.0.0");
            })
            .WithTracing(tracing =>
            {
                tracing
                    .AddAspNetCoreInstrumentation(options =>
                    {
                        options.RecordException = true;
                        options.EnrichWithHttpRequest = (activity, request) =>
                        {
                            activity.SetTag("http.scheme", request.Scheme);
                            activity.SetTag("http.client_ip", request.HttpContext.Connection.RemoteIpAddress);
                        };
                    })
                    .AddGrpcClientInstrumentation()
                    .AddSource("MCPPipeline.Core")
                    .AddSource("MCPPipeline.GrpcService")
                    .AddSource("MCPPipeline.WebSocketService")
                    .AddOtlpExporter(options =>
                    {
                        options.Endpoint = new Uri("http://localhost:4317");
                    });
            })
            .WithMetrics(metrics =>
            {
                metrics
                    .AddAspNetCoreInstrumentation()
                    .AddRuntimeInstrumentation()
                    .AddMeter("MCPPipeline.*")
                    .AddPrometheusExporter()
                    .AddOtlpExporter();
            });

        return services;
    }
}

// Uso nos serviços
builder.Services.AddMCPObservability("MCPPipeline.GrpcService");
Enter fullscreen mode Exit fullscreen mode

Métricas Customizadas

// MCPPipeline.Observability/MCPMetrics.cs
using System.Diagnostics.Metrics;

public class MCPMetrics
{
    private readonly Meter _meter;
    private readonly Counter<long> _commandsProcessed;
    private readonly Histogram<double> _commandDuration;
    private readonly Counter<long> _commandErrors;
    private readonly UpDownCounter<int> _activeConnections;

    public MCPMetrics()
    {
        _meter = new Meter("MCPPipeline.Metrics", "1.0.0");

        _commandsProcessed = _meter.CreateCounter<long>(
            "mcp.commands.processed",
            description: "Total de comandos processados");

        _commandDuration = _meter.CreateHistogram<double>(
            "mcp.command.duration",
            unit: "ms",
            description: "Duração do processamento de comandos");

        _commandErrors = _meter.CreateCounter<long>(
            "mcp.commands.errors",
            description: "Total de erros no processamento");

        _activeConnections = _meter.CreateUpDownCounter<int>(
            "mcp.connections.active",
            description: "Conexões ativas");
    }

    public void RecordCommandProcessed(string command, ProtocolType protocol)
    {
        _commandsProcessed.Add(1,
            new KeyValuePair<string, object?>("command", command),
            new KeyValuePair<string, object?>("protocol", protocol.ToString()));
    }

    public void RecordCommandDuration(string command, ProtocolType protocol, double durationMs)
    {
        _commandDuration.Record(durationMs,
            new KeyValuePair<string, object?>("command", command),
            new KeyValuePair<string, object?>("protocol", protocol.ToString()));
    }

    public void RecordCommandError(string command, ProtocolType protocol, string errorType)
    {
        _commandErrors.Add(1,
            new KeyValuePair<string, object?>("command", command),
            new KeyValuePair<string, object?>("protocol", protocol.ToString()),
            new KeyValuePair<string, object?>("error_type", errorType));
    }

    public void IncrementActiveConnections(ProtocolType protocol)
    {
        _activeConnections.Add(1,
            new KeyValuePair<string, object?>("protocol", protocol.ToString()));
    }

    public void DecrementActiveConnections(ProtocolType protocol)
    {
        _activeConnections.Add(-1,
            new KeyValuePair<string, object?>("protocol", protocol.ToString()));
    }
}
Enter fullscreen mode Exit fullscreen mode

📊 Dashboard com Grafana

Prometheus Configuration

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'mcp-grpc-service'
    static_configs:
      - targets: ['localhost:5001']

  - job_name: 'mcp-websocket-service'
    static_configs:
      - targets: ['localhost:5002']

  - job_name: 'mcp-gateway'
    static_configs:
      - targets: ['localhost:5000']
Enter fullscreen mode Exit fullscreen mode

Grafana Dashboard JSON

{
  "dashboard": {
    "title": "MCP Hybrid Protocol Dashboard",
    "panels": [
      {
        "title": "Comandos Processados por Protocolo",
        "targets": [
          {
            "expr": "rate(mcp_commands_processed_total[5m])",
            "legendFormat": "{{protocol}}"
          }
        ]
      },
      {
        "title": "Latência Média por Comando",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(mcp_command_duration_bucket[5m]))",
            "legendFormat": "p95 - {{command}}"
          }
        ]
      },
      {
        "title": "Conexões Ativas",
        "targets": [
          {
            "expr": "mcp_connections_active",
            "legendFormat": "{{protocol}}"
          }
        ]
      },
      {
        "title": "Taxa de Erro",
        "targets": [
          {
            "expr": "rate(mcp_commands_errors_total[5m])",
            "legendFormat": "{{protocol}} - {{error_type}}"
          }
        ]
      }
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

🧪 Testes de Integração

Teste Híbrido

// MCPPipeline.IntegrationTests/HybridProtocolTests.cs
public class HybridProtocolTests : IClassFixture<MCPTestFixture>
{
    private readonly MCPTestFixture _fixture;

    public HybridProtocolTests(MCPTestFixture fixture)
    {
        _fixture = fixture;
    }

    [Fact]
    public async Task Should_Process_Same_Command_Via_Both_Protocols()
    {
        // Arrange
        var grpcClient = _fixture.CreateGrpcClient();
        var wsClient = _fixture.CreateWebSocketClient();

        await wsClient.ConnectAsync();

        var testPayload = "Teste de integração híbrida";

        // Act
        var grpcResponse = await grpcClient.SendCommandAsync("analyze", testPayload);
        var wsResponse = await wsClient.SendCommandAsync("analyze", testPayload);

        // Assert
        Assert.Equal(ResponseStatus.Success, grpcResponse.Status);
        Assert.Equal(ResponseStatus.Success, wsResponse.Status);

        // Ambos devem retornar resultados equivalentes
        Assert.Contains("palavras", grpcResponse.Result);
        Assert.Contains("palavras", wsResponse.Result);
    }

    [Fact]
    public async Task Should_Handle_Concurrent_Requests_From_Both_Protocols()
    {
        // Arrange
        var grpcClient = _fixture.CreateGrpcClient();
        var wsClient = _fixture.CreateWebSocketClient();
        await wsClient.ConnectAsync();

        var tasks = new List<Task<MCPResponse>>();

        // Act - Enviar 50 requisições concorrentes de cada protocolo
        for (int i = 0; i < 50; i++)
        {
            tasks.Add(grpcClient.SendCommandAsync("status", $"gRPC-{i}"));
            tasks.Add(wsClient.SendCommandAsync("status", $"WS-{i}"));
        }

        var responses = await Task.WhenAll(tasks);

        // Assert
        Assert.All(responses, r => Assert.Equal(ResponseStatus.Success, r.Status));
        Assert.Equal(100, responses.Length);
    }

    [Fact]
    public async Task Should_Broadcast_Events_Between_Protocols()
    {
        // Arrange
        var grpcClient = _fixture.CreateGrpcClient();
        var wsClient = _fixture.CreateWebSocketClient();
        await wsClient.ConnectAsync();

        var eventReceived = new TaskCompletionSource<bool>();

        // Configurar listener no WebSocket
        // (implementação depende do event bus)

        // Act - Enviar comando via gRPC
        await grpcClient.SendCommandAsync("generate", "Teste de evento");

        // Assert - WebSocket deve receber notificação
        var received = await eventReceived.Task.WaitAsync(TimeSpan.FromSeconds(5));
        Assert.True(received);
    }
}

public class MCPTestFixture : IAsyncLifetime
{
    private WebApplication? _grpcApp;
    private WebApplication? _wsApp;

    public async Task InitializeAsync()
    {
        // Inicializar serviços de teste
        _grpcApp = CreateGrpcService();
        _wsApp = CreateWebSocketService();

        await _grpcApp.StartAsync();
        await _wsApp.StartAsync();
    }

    public async Task DisposeAsync()
    {
        if (_grpcApp != null) await _grpcApp.StopAsync();
        if (_wsApp != null) await _wsApp.StopAsync();
    }

    public UnifiedMCPClient CreateGrpcClient()
    {
        return new UnifiedMCPClient(
            "https://localhost:5001",
            useGrpc: true,
            Mock.Of<ILogger<UnifiedMCPClient>>());
    }

    public UnifiedMCPClient CreateWebSocketClient()
    {
        return new UnifiedMCPClient(
            "https://localhost:5002/mcphub",
            useGrpc: false,
            Mock.Of<ILogger<UnifiedMCPClient>>());
    }

    private WebApplication CreateGrpcService()
    {
        var builder = WebApplication.CreateBuilder();
        builder.Services.AddGrpc();
        builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
        builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>();

        var app = builder.Build();
        app.MapGrpcService<MCPGrpcServiceImpl>();
        return app;
    }

    private WebApplication CreateWebSocketService()
    {
        var builder = WebApplication.CreateBuilder();
        builder.Services.AddSignalR();
        builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();

        var app = builder.Build();
        app.MapHub<MCPHub>("/mcphub");
        return app;
    }
}
Enter fullscreen mode Exit fullscreen mode

🚀 Teste de Performance

Benchmark com BenchmarkDotNet

// MCPPipeline.Benchmarks/ProtocolBenchmarks.cs
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;

[MemoryDiagnoser]
[SimpleJob(warmupCount: 3, iterationCount: 10)]
public class ProtocolBenchmarks
{
    private UnifiedMCPClient _grpcClient = null!;
    private UnifiedMCPClient _wsClient = null!;

    [GlobalSetup]
    public async Task Setup()
    {
        _grpcClient = new UnifiedMCPClient(
            "https://localhost:5001",
            useGrpc: true,
            Mock.Of<ILogger<UnifiedMCPClient>>());

        _wsClient = new UnifiedMCPClient(
            "https://localhost:5002/mcphub",
            useGrpc: false,
            Mock.Of<ILogger<UnifiedMCPClient>>());

        await _wsClient.ConnectAsync();
    }

    [Benchmark(Baseline = true)]
    public async Task<MCPResponse> GrpcCommand()
    {
        return await _grpcClient.SendCommandAsync("status", "benchmark");
    }

    [Benchmark]
    public async Task<MCPResponse> WebSocketCommand()
    {
        return await _wsClient.SendCommandAsync("status", "benchmark");
    }

    [Benchmark]
    public async Task GrpcStreaming()
    {
        await foreach (var _ in _grpcClient.StreamCommandAsync("generate", "test"))
        {
            // Consumir stream
        }
    }

    [Benchmark]
    public async Task WebSocketStreaming()
    {
        await foreach (var _ in _wsClient.StreamCommandAsync("generate", "test"))
        {
            // Consumir stream
        }
    }

    [GlobalCleanup]
    public async Task Cleanup()
    {
        await _grpcClient.DisposeAsync();
        await _wsClient.DisposeAsync();
    }
}

// Program.cs
public class Program
{
    public static void Main(string[] args)
    {
        BenchmarkRunner.Run<ProtocolBenchmarks>();
    }
}
Enter fullscreen mode Exit fullscreen mode

Resultados Esperados

|            Method |      Mean |     Error |    StdDev | Ratio | Gen0 | Allocated |
|------------------ |----------:|----------:|----------:|------:|-----:|----------:|
|       GrpcCommand |  2.450 ms | 0.0421 ms | 0.0394 ms |  1.00 | 15.6 |     128KB |
|  WebSocketCommand |  8.732 ms | 0.1523 ms | 0.1425 ms |  3.56 | 31.2 |     256KB |
|     GrpcStreaming | 12.105 ms | 0.2103 ms | 0.1967 ms |  4.94 | 46.8 |     384KB |
| WebSocketStreaming| 18.421 ms | 0.3142 ms | 0.2940 ms |  7.52 | 62.5 |     512KB |
Enter fullscreen mode Exit fullscreen mode

🎯 Decisão de Protocolo: Quando Usar Cada Um

Matriz de Decisão

public class ProtocolSelector
{
    public ProtocolType SelectOptimalProtocol(ClientContext context)
    {
        // 1. Cliente é navegador? → WebSocket
        if (context.IsWebBrowser)
            return ProtocolType.WebSocket;

        // 2. Comunicação interna entre serviços? → gRPC
        if (context.IsInternalService)
            return ProtocolType.Grpc;

        // 3. Requer streaming bidirecional de longa duração? → WebSocket
        if (context.RequiresLongLivedStream)
            return ProtocolType.WebSocket;

        // 4. Prioriza latência mínima? → gRPC
        if (context.LatencySensitive)
            return ProtocolType.Grpc;

        // 5. Múltiplos clientes precisam receber broadcasts? → WebSocket
        if (context.RequiresBroadcast)
            return ProtocolType.WebSocket;

        // 6. Padrão: gRPC para performance
        return ProtocolType.Grpc;
    }
}

public record ClientContext
{
    public bool IsWebBrowser { get; init; }
    public bool IsInternalService { get; init; }
    public bool RequiresLongLivedStream { get; init; }
    public bool LatencySensitive { get; init; }
    public bool RequiresBroadcast { get; init; }
}
Enter fullscreen mode Exit fullscreen mode

Tabela Comparativa

Cenário gRPC WebSocket Recomendação
API Backend → Backend ✅ Ideal ⚠️ Overhead gRPC
Aplicação Web → Backend ❌ Limitado ✅ Nativo WebSocket
Mobile App → Backend ✅ Excelente ✅ Bom gRPC (melhor performance)
Dashboard Real-time ⚠️ Polling ✅ Push nativo WebSocket
Chat Multi-usuário ❌ Não ideal ✅ Perfeito WebSocket
Análise de Dados ✅ Alta performance ⚠️ Mais lento gRPC
Streaming de IA ✅ Eficiente ✅ Funcional Ambos (contexto dependente)

🔒 Segurança em Ambiente Híbrido

Autenticação Unificada

// MCPPipeline.Security/UnifiedAuthenticationExtensions.cs
public static class UnifiedAuthenticationExtensions
{
    public static IServiceCollection AddUnifiedAuthentication(
        this IServiceCollection services,
        IConfiguration configuration)
    {
        services.AddAuthentication(options =>
        {
            options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
            options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
        })
        .AddJwtBearer(options =>
        {
            options.TokenValidationParameters = new TokenValidationParameters
            {
                ValidateIssuer = true,
                ValidateAudience = true,
                ValidateLifetime = true,
                ValidateIssuerSigningKey = true,
                ValidIssuer = configuration["Jwt:Issuer"],
                ValidAudience = configuration["Jwt:Audience"],
                IssuerSigningKey = new SymmetricSecurityKey(
                    Encoding.UTF8.GetBytes(configuration["Jwt:Key"]!))
            };

            // Suporte para WebSocket token via query string
            options.Events = new JwtBearerEvents
            {
                OnMessageReceived = context =>
                {
                    var accessToken = context.Request.Query["access_token"];
                    var path = context.HttpContext.Request.Path;

                    if (!string.IsNullOrEmpty(accessToken) && 
                        path.StartsWithSegments("/mcphub"))
                    {
                        context.Token = accessToken;
                    }
                    return Task.CompletedTask;
                }
            };
        });

        // Adicionar metadata para gRPC
        services.AddGrpc(options =>
        {
            options.Interceptors.Add<AuthenticationInterceptor>();
        });

        return services;
    }
}

// Interceptor para gRPC
public class AuthenticationInterceptor : Interceptor
{
    private readonly ILogger<AuthenticationInterceptor> _logger;

    public AuthenticationInterceptor(ILogger<AuthenticationInterceptor> logger)
    {
        _logger = logger;
    }

    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
        TRequest request,
        ServerCallContext context,
        UnaryServerMethod<TRequest, TResponse> continuation)
    {
        var authHeader = context.RequestHeaders.GetValue("authorization");

        if (string.IsNullOrEmpty(authHeader))
        {
            _logger.LogWarning("Requisição gRPC sem autenticação


public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
        TRequest request,
        ServerCallContext context,
        UnaryServerMethod<TRequest, TResponse> continuation)
    {
        var authHeader = context.RequestHeaders.GetValue("authorization");

        if (string.IsNullOrEmpty(authHeader))
        {
            _logger.LogWarning("Requisição gRPC sem autenticação");
            throw new RpcException(new Status(StatusCode.Unauthenticated, "Token não fornecido"));
        }

        // Validar token (implementar lógica de validação)
        _logger.LogInformation("Token gRPC validado para peer: {Peer}", context.Peer);

        return await continuation(request, context);
    }
}
Enter fullscreen mode Exit fullscreen mode

Rate Limiting Unificado

// MCPPipeline.Security/RateLimitingExtensions.cs
using System.Threading.RateLimiting;

public static class RateLimitingExtensions
{
    public static IServiceCollection AddUnifiedRateLimiting(
        this IServiceCollection services)
    {
        services.AddRateLimiter(options =>
        {
            // Policy para gRPC
            options.AddFixedWindowLimiter("grpc", limiterOptions =>
            {
                limiterOptions.PermitLimit = 100;
                limiterOptions.Window = TimeSpan.FromMinutes(1);
                limiterOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
                limiterOptions.QueueLimit = 10;
            });

            // Policy para WebSocket
            options.AddSlidingWindowLimiter("websocket", limiterOptions =>
            {
                limiterOptions.PermitLimit = 60;
                limiterOptions.Window = TimeSpan.FromMinutes(1);
                limiterOptions.SegmentsPerWindow = 4;
                limiterOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
                limiterOptions.QueueLimit = 5;
            });

            options.OnRejected = async (context, token) =>
            {
                context.HttpContext.Response.StatusCode = 429;

                await context.HttpContext.Response.WriteAsJsonAsync(new
                {
                    error = "Rate limit exceeded",
                    retryAfter = context.Lease.TryGetMetadata(
                        MetadataName.RetryAfter, out var retryAfter) 
                        ? retryAfter.TotalSeconds 
                        : null
                }, token);
            };
        });

        return services;
    }
}
Enter fullscreen mode Exit fullscreen mode

🏗️ Docker Compose para Ambiente Completo

# docker-compose.yml
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data

  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"  # UI
      - "4317:4317"    # OTLP gRPC
      - "4318:4318"    # OTLP HTTP
    environment:
      - COLLECTOR_OTLP_ENABLED=true

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus-data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    volumes:
      - grafana-data:/var/lib/grafana
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
      - GF_USERS_ALLOW_SIGN_UP=false

  mcp-grpc-service:
    build:
      context: .
      dockerfile: src/MCPPipeline.GrpcService/Dockerfile
    ports:
      - "5001:5001"
    environment:
      - ASPNETCORE_ENVIRONMENT=Development
      - Redis__ConnectionString=redis:6379
      - Jaeger__AgentHost=jaeger
      - Jaeger__AgentPort=4317
    depends_on:
      - redis
      - jaeger

  mcp-websocket-service:
    build:
      context: .
      dockerfile: src/MCPPipeline.WebSocketService/Dockerfile
    ports:
      - "5002:5002"
    environment:
      - ASPNETCORE_ENVIRONMENT=Development
      - Redis__ConnectionString=redis:6379
      - Jaeger__AgentHost=jaeger
      - Jaeger__AgentPort=4317
    depends_on:
      - redis
      - jaeger

  mcp-gateway:
    build:
      context: .
      dockerfile: src/MCPPipeline.Gateway/Dockerfile
    ports:
      - "5000:5000"
    environment:
      - ASPNETCORE_ENVIRONMENT=Development
    depends_on:
      - mcp-grpc-service
      - mcp-websocket-service

volumes:
  redis-data:
  prometheus-data:
  grafana-data:
Enter fullscreen mode Exit fullscreen mode

📈 Estratégias de Escalabilidade

Kubernetes Deployment

# k8s/mcp-deployment.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: mcp-config
data:
  redis-connection: "redis-service:6379"
  jaeger-endpoint: "http://jaeger-collector:4317"

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-grpc-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mcp-grpc
  template:
    metadata:
      labels:
        app: mcp-grpc
    spec:
      containers:
      - name: mcp-grpc
        image: mcp-grpc-service:latest# 🧩 Minha Primeira Comunicação com MCP e .NET – Parte 4
Enter fullscreen mode Exit fullscreen mode

Integração Completa com gRPC & WebSocket

Nesta quarta parte da série "Minha Primeira Comunicação com MCP e .NET", exploramos como criar uma arquitetura híbrida que combina o melhor dos dois mundos: a eficiência e tipagem forte do gRPC para comunicação interna entre serviços, e a flexibilidade e suporte nativo do WebSocket para clientes web e mobile, criando um ecossistema MCP robusto e escalável.


🚀 Introdução

Após explorarmos gRPC (Parte 2) e WebSocket (Parte 3) individualmente, surge uma questão arquitetural importante: e se precisarmos dos benefícios de ambos?

Em sistemas corporativos modernos, é comum ter:

  • Microsserviços backend que se comunicam via gRPC (baixa latência, tipagem forte)
  • Clientes web/mobile que precisam de WebSocket (suporte nativo, real-time)
  • Agentes MCP que podem usar ambos os protocolos conforme o contexto

Este artigo demonstra como construir uma arquitetura unificada que oferece ambos os protocolos, com roteamento inteligente, adaptadores de protocolo, e observabilidade centralizada.


🧠 Arquitetura Híbrida: O Melhor dos Dois Mundos

Visão Geral da Arquitetura

┌─────────────────────────────────────────────────────────────────┐
│                        API Gateway / BFF                         │
│                    (Protocol Orchestrator)                       │
└──────────────────┬─────────────────────┬────────────────────────┘
                   │                     │
         ┌─────────▼──────────┐ ┌───────▼──────────┐
         │   gRPC Endpoint    │ │ WebSocket Endpoint│
         │   (Port 5001)      │ │  (Port 5002)      │
         └─────────┬──────────┘ └───────┬───────────┘
                   │                     │
                   │    ┌────────────────┴─────────────────┐
                   │    │                                  │
         ┌─────────▼────▼──────────────┐    ┌─────────────▼──────┐
         │   Protocol Adapter Layer    │    │  SignalR Hub Layer  │
         │   (gRPC ↔ WebSocket)       │    │  (Client Manager)   │
         └─────────┬──────────────────┘    └─────────┬────────────┘
                   │                                  │
         ┌─────────▼──────────────────────────────────▼────────┐
         │              MCP Kernel Orchestrator                │
         │         (Unified Command Processing)                │
         └─────────┬──────────────────────────┬─────────────────┘
                   │                          │
         ┌─────────▼────────┐      ┌─────────▼──────────┐
         │  Domain Services │      │  Event Bus (Redis) │
         │  (Business Logic)│      │  (Cross-Protocol)  │
         └──────────────────┘      └────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Componentes Principais

  1. Protocol Orchestrator - Decide qual protocolo usar baseado em contexto
  2. Protocol Adapter Layer - Converte mensagens entre gRPC e WebSocket
  3. Unified MCP Kernel - Processa comandos independente do protocolo
  4. Event Bus - Sincroniza eventos entre instâncias e protocolos
  5. Observability Layer - Tracing unificado para ambos os protocolos

🏗️ Implementação da Arquitetura Híbrida

1️⃣ Estrutura do Projeto

MCPPipeline.Hybrid/
├── src/
│   ├── MCPPipeline.Contracts/           # Modelos compartilhados
│   │   ├── Messages/
│   │   │   ├── MCPCommand.cs
│   │   │   ├── MCPResponse.cs
│   │   │   └── MCPEvent.cs
│   │   └── Protos/
│   │       └── mcp.proto
│   │
│   ├── MCPPipeline.Core/                # Lógica de negócio
│   │   ├── Services/
│   │   │   ├── IMCPKernelService.cs
│   │   │   └── MCPKernelService.cs
│   │   └── Events/
│   │       └── IEventBus.cs
│   │
│   ├── MCPPipeline.GrpcService/         # Endpoint gRPC
│   │   ├── Services/
│   │   │   └── MCPGrpcService.cs
│   │   └── Program.cs
│   │
│   ├── MCPPipeline.WebSocketService/    # Endpoint WebSocket
│   │   ├── Hubs/
│   │   │   └── MCPHub.cs
│   │   └── Program.cs
│   │
│   ├── MCPPipeline.Gateway/             # API Gateway (YARP)
│   │   ├── Configuration/
│   │   └── Program.cs
│   │
│   └── MCPPipeline.Adapter/             # Protocol Adapter
│       ├── GrpcToWebSocketAdapter.cs
│       └── WebSocketToGrpcAdapter.cs
│
└── tests/
    └── MCPPipeline.IntegrationTests/
Enter fullscreen mode Exit fullscreen mode

2️⃣ Contratos Compartilhados

// MCPPipeline.Contracts/Messages/MCPCommand.cs
namespace MCPPipeline.Contracts.Messages;

public record MCPCommand
{
    public string CommandId { get; init; } = Guid.NewGuid().ToString();
    public string Command { get; init; } = string.Empty;
    public string Payload { get; init; } = string.Empty;
    public Dictionary<string, string> Metadata { get; init; } = new();
    public DateTime Timestamp { get; init; } = DateTime.UtcNow;
    public string SessionId { get; init; } = string.Empty;
    public ProtocolType Protocol { get; init; }
    public int Priority { get; init; } = 0;
}

public record MCPResponse
{
    public string CommandId { get; init; } = string.Empty;
    public string Result { get; init; } = string.Empty;
    public ResponseStatus Status { get; init; }
    public string? ErrorMessage { get; init; }
    public long ProcessingTimeMs { get; init; }
    public DateTime Timestamp { get; init; } = DateTime.UtcNow;
    public Dictionary<string, object> Metrics { get; init; } = new();
}

public record MCPEvent
{
    public string EventId { get; init; } = Guid.NewGuid().ToString();
    public string EventType { get; init; } = string.Empty;
    public string Source { get; init; } = string.Empty;
    public object Data { get; init; } = new();
    public DateTime Timestamp { get; init; } = DateTime.UtcNow;
}

public enum ProtocolType
{
    Unknown = 0,
    Grpc = 1,
    WebSocket = 2,
    Http = 3
}

public enum ResponseStatus
{
    Success,
    Error,
    Processing,
    Timeout
}
Enter fullscreen mode Exit fullscreen mode

3️⃣ MCP Kernel Unificado

// MCPPipeline.Core/Services/MCPKernelService.cs
using MCPPipeline.Contracts.Messages;
using System.Diagnostics;

namespace MCPPipeline.Core.Services;

public interface IMCPKernelService
{
    Task<MCPResponse> ExecuteCommandAsync(MCPCommand command, CancellationToken ct);
    IAsyncEnumerable<string> StreamCommandAsync(MCPCommand command, CancellationToken ct);
}

public class MCPKernelService : IMCPKernelService
{
    private readonly ILogger<MCPKernelService> _logger;
    private readonly IEventBus _eventBus;
    private static readonly ActivitySource ActivitySource = new("MCPPipeline.Core");

    public MCPKernelService(
        ILogger<MCPKernelService> logger,
        IEventBus eventBus)
    {
        _logger = logger;
        _eventBus = eventBus;
    }

    public async Task<MCPResponse> ExecuteCommandAsync(
        MCPCommand command, 
        CancellationToken ct)
    {
        using var activity = ActivitySource.StartActivity("ExecuteCommand");
        activity?.SetTag("command.id", command.CommandId);
        activity?.SetTag("command.type", command.Command);
        activity?.SetTag("protocol", command.Protocol.ToString());

        var sw = Stopwatch.StartNew();

        try
        {
            _logger.LogInformation(
                "Executando comando {Command} via {Protocol} | Session: {SessionId}",
                command.Command,
                command.Protocol,
                command.SessionId);

            // Publicar evento de início
            await _eventBus.PublishAsync(new MCPEvent
            {
                EventType = "CommandStarted",
                Source = command.Protocol.ToString(),
                Data = new { command.CommandId, command.Command }
            });

            // Processar comando
            var result = await ProcessCommandAsync(command, ct);

            sw.Stop();

            var response = new MCPResponse
            {
                CommandId = command.CommandId,
                Result = result,
                Status = ResponseStatus.Success,
                ProcessingTimeMs = sw.ElapsedMilliseconds,
                Metrics = new Dictionary<string, object>
                {
                    ["protocol"] = command.Protocol.ToString(),
                    ["priority"] = command.Priority
                }
            };

            // Publicar evento de conclusão
            await _eventBus.PublishAsync(new MCPEvent
            {
                EventType = "CommandCompleted",
                Source = command.Protocol.ToString(),
                Data = new { command.CommandId, response.ProcessingTimeMs }
            });

            activity?.SetTag("response.status", "success");
            return response;
        }
        catch (Exception ex)
        {
            sw.Stop();

            _logger.LogError(ex, 
                "Erro ao executar comando {Command} via {Protocol}", 
                command.Command, 
                command.Protocol);

            activity?.SetTag("response.status", "error");
            activity?.SetTag("error.message", ex.Message);

            await _eventBus.PublishAsync(new MCPEvent
            {
                EventType = "CommandFailed",
                Source = command.Protocol.ToString(),
                Data = new { command.CommandId, Error = ex.Message }
            });

            return new MCPResponse
            {
                CommandId = command.CommandId,
                Status = ResponseStatus.Error,
                ErrorMessage = ex.Message,
                ProcessingTimeMs = sw.ElapsedMilliseconds
            };
        }
    }

    public async IAsyncEnumerable<string> StreamCommandAsync(
        MCPCommand command,
        [EnumeratorCancellation] CancellationToken ct)
    {
        using var activity = ActivitySource.StartActivity("StreamCommand");
        activity?.SetTag("command.id", command.CommandId);

        _logger.LogInformation(
            "Iniciando streaming para comando {Command} via {Protocol}",
            command.Command,
            command.Protocol);

        var chunks = await GenerateStreamChunksAsync(command.Payload, ct);

        for (int i = 0; i < chunks.Count; i++)
        {
            if (ct.IsCancellationRequested)
                yield break;

            yield return chunks[i];
            await Task.Delay(50, ct); // Simula processamento
        }
    }

    private async Task<string> ProcessCommandAsync(MCPCommand command, CancellationToken ct)
    {
        return command.Command.ToLowerInvariant() switch
        {
            "analyze" => await AnalyzeAsync(command.Payload, ct),
            "summarize" => await SummarizeAsync(command.Payload, ct),
            "translate" => await TranslateAsync(command.Payload, ct),
            "generate" => await GenerateAsync(command.Payload, ct),
            "status" => await GetStatusAsync(ct),
            _ => throw new InvalidOperationException($"Comando desconhecido: {command.Command}")
        };
    }

    private async Task<string> AnalyzeAsync(string payload, CancellationToken ct)
    {
        await Task.Delay(200, ct);
        var words = payload.Split(' ', StringSplitOptions.RemoveEmptyEntries).Length;
        return $"Análise: {words} palavras, {payload.Length} caracteres";
    }

    private async Task<string> SummarizeAsync(string payload, CancellationToken ct)
    {
        await Task.Delay(300, ct);
        return payload.Length > 100 
            ? $"Resumo: {payload[..100]}..." 
            : $"Resumo: {payload}";
    }

    private async Task<string> TranslateAsync(string payload, CancellationToken ct)
    {
        await Task.Delay(250, ct);
        return $"[Traduzido] {payload}";
    }

    private async Task<string> GenerateAsync(string payload, CancellationToken ct)
    {
        await Task.Delay(500, ct);
        return $"Conteúdo gerado baseado em: {payload}";
    }

    private async Task<string> GetStatusAsync(CancellationToken ct)
    {
        await Task.Delay(50, ct);
        return $"Sistema operacional | Timestamp: {DateTime.UtcNow:O}";
    }

    private async Task<List<string>> GenerateStreamChunksAsync(string prompt, CancellationToken ct)
    {
        await Task.Delay(100, ct);
        return new List<string>
        {
            "Iniciando processamento...",
            "Analisando contexto...",
            "Gerando resposta...",
            $"Resultado: {prompt}",
            "Processamento concluído."
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

4️⃣ Event Bus com Redis

// MCPPipeline.Core/Events/IEventBus.cs
namespace MCPPipeline.Core.Events;

public interface IEventBus
{
    Task PublishAsync<T>(T @event) where T : class;
    Task SubscribeAsync<T>(Func<T, Task> handler) where T : class;
}

// MCPPipeline.Core/Events/RedisEventBus.cs
using StackExchange.Redis;
using System.Text.Json;

public class RedisEventBus : IEventBus
{
    private readonly IConnectionMultiplexer _redis;
    private readonly ILogger<RedisEventBus> _logger;
    private readonly ISubscriber _subscriber;

    public RedisEventBus(
        IConnectionMultiplexer redis,
        ILogger<RedisEventBus> logger)
    {
        _redis = redis;
        _logger = logger;
        _subscriber = redis.GetSubscriber();
    }

    public async Task PublishAsync<T>(T @event) where T : class
    {
        var channel = typeof(T).Name;
        var message = JsonSerializer.Serialize(@event);

        await _subscriber.PublishAsync(channel, message);

        _logger.LogDebug("Evento publicado: {EventType}", channel);
    }

    public async Task SubscribeAsync<T>(Func<T, Task> handler) where T : class
    {
        var channel = typeof(T).Name;

        await _subscriber.SubscribeAsync(channel, async (ch, message) =>
        {
            try
            {
                var @event = JsonSerializer.Deserialize<T>(message!);
                if (@event != null)
                {
                    await handler(@event);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Erro ao processar evento {EventType}", channel);
            }
        });

        _logger.LogInformation("Inscrito no canal: {Channel}", channel);
    }
}
Enter fullscreen mode Exit fullscreen mode

5️⃣ Protocol Adapter

// MCPPipeline.Adapter/ProtocolAdapter.cs
using MCPPipeline.Contracts.Messages;
using MCPPipeline.Grpc;

namespace MCPPipeline.Adapter;

public interface IProtocolAdapter
{
    MCPCommand FromGrpcRequest(MCPRequest grpcRequest);
    MCPRequest ToGrpcRequest(MCPCommand command);
    MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse);
    MCPGrpcResponse ToGrpcResponse(MCPResponse response);
}

public class ProtocolAdapter : IProtocolAdapter
{
    public MCPCommand FromGrpcRequest(MCPRequest grpcRequest)
    {
        return new MCPCommand
        {
            Command = grpcRequest.Command,
            Payload = grpcRequest.Payload,
            Metadata = grpcRequest.Metadata.ToDictionary(k => k.Key, v => v.Value),
            Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(grpcRequest.Timestamp).DateTime,
            Protocol = ProtocolType.Grpc
        };
    }

    public MCPRequest ToGrpcRequest(MCPCommand command)
    {
        return new MCPRequest
        {
            Command = command.Command,
            Payload = command.Payload,
            Metadata = { command.Metadata },
            Timestamp = new DateTimeOffset(command.Timestamp).ToUnixTimeMilliseconds()
        };
    }

    public MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse)
    {
        return new MCPResponse
        {
            Result = grpcResponse.Result,
            Status = grpcResponse.Status switch
            {
                "OK" => ResponseStatus.Success,
                "ERROR" => ResponseStatus.Error,
                "PROCESSING" => ResponseStatus.Processing,
                _ => ResponseStatus.Error
            },
            ErrorMessage = grpcResponse.ErrorMessage,
            ProcessingTimeMs = grpcResponse.ProcessingTimeMs
        };
    }

    public MCPGrpcResponse ToGrpcResponse(MCPResponse response)
    {
        return new MCPGrpcResponse
        {
            Result = response.Result,
            Status = response.Status.ToString().ToUpperInvariant(),
            ErrorMessage = response.ErrorMessage ?? string.Empty,
            ProcessingTimeMs = response.ProcessingTimeMs
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

6️⃣ gRPC Service Endpoint

// MCPPipeline.GrpcService/Services/MCPGrpcService.cs
using Grpc.Core;
using MCPPipeline.Grpc;
using MCPPipeline.Core.Services;
using MCPPipeline.Adapter;

namespace MCPPipeline.GrpcService.Services;

public class MCPGrpcServiceImpl : MCPService.MCPServiceBase
{
    private readonly IMCPKernelService _kernelService;
    private readonly IProtocolAdapter _adapter;
    private readonly ILogger<MCPGrpcServiceImpl> _logger;

    public MCPGrpcServiceImpl(
        IMCPKernelService kernelService,
        IProtocolAdapter adapter,
        ILogger<MCPGrpcServiceImpl> logger)
    {
        _kernelService = kernelService;
        _adapter = adapter;
        _logger = logger;
    }

    public override async Task<MCPGrpcResponse> SendCommand(
        MCPRequest request,
        ServerCallContext context)
    {
        _logger.LogInformation(
            "Recebido comando gRPC: {Command} de {Peer}",
            request.Command,
            context.Peer);

        // Converter para modelo unificado
        var command = _adapter.FromGrpcRequest(request);
        command = command with { SessionId = context.Peer };

        // Processar via kernel unificado
        var response = await _kernelService.ExecuteCommandAsync(
            command,
            context.CancellationToken);

        // Converter resposta
        return _adapter.ToGrpcResponse(response);
    }

    public override async Task StreamCommand(
        MCPRequest request,
        IServerStreamWriter<MCPStreamResponse> responseStream,
        ServerCallContext context)
    {
        _logger.LogInformation(
            "Iniciando streaming gRPC: {Command}",
            request.Command);

        var command = _adapter.FromGrpcRequest(request);

        int chunkIndex = 0;
        await foreach (var chunk in _kernelService.StreamCommandAsync(
            command,
            context.CancellationToken))
        {
            await responseStream.WriteAsync(new MCPStreamResponse
            {
                Content = chunk,
                ChunkIndex = chunkIndex++,
                IsComplete = false
            });
        }

        // Enviar chunk final
        await responseStream.WriteAsync(new MCPStreamResponse
        {
            Content = "Stream concluído",
            ChunkIndex = chunkIndex,
            IsComplete = true
        });
    }

    public override Task<HealthResponse> HealthCheck(
        HealthRequest request,
        ServerCallContext context)
    {
        return Task.FromResult(new HealthResponse
        {
            Status = "Healthy",
            Version = "1.0.0",
            Protocol = "gRPC"
        });
    }
}

// Program.cs
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddGrpc();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>();

// Redis para event bus
builder.Services.AddSingleton<IConnectionMultiplexer>(
    ConnectionMultiplexer.Connect("localhost:6379"));
builder.Services.AddSingleton<IEventBus, RedisEventBus>();

var app = builder.Build();

app.MapGrpcService<MCPGrpcServiceImpl>();
app.MapGet("/", () => "MCP gRPC Service");

app.Run();
Enter fullscreen mode Exit fullscreen mode

7️⃣ WebSocket Service Endpoint

// MCPPipeline.WebSocketService/Hubs/MCPHub.cs
using Microsoft.AspNetCore.SignalR;
using MCPPipeline.Contracts.Messages;
using MCPPipeline.Core.Services;

namespace MCPPipeline.WebSocketService.Hubs;

public class MCPHub : Hub
{
    private readonly IMCPKernelService _kernelService;
    private readonly ILogger<MCPHub> _logger;

    public MCPHub(
        IMCPKernelService kernelService,
        ILogger<MCPHub> logger)
    {
        _kernelService = kernelService;
        _logger = logger;
    }

    public override async Task OnConnectedAsync()
    {
        _logger.LogInformation("Cliente WebSocket conectado: {ConnectionId}", 
            Context.ConnectionId);

        await Clients.Caller.SendAsync("Connected", new
        {
            ConnectionId = Context.ConnectionId,
            Protocol = "WebSocket",
            Message = "Conectado ao MCP Hub"
        });

        await base.OnConnectedAsync();
    }

    public async Task SendCommand(MCPCommand command)
    {
        _logger.LogInformation(
            "Recebido comando WebSocket: {Command} de {ConnectionId}",
            command.Command,
            Context.ConnectionId);

        // Enriquecer comando com dados do WebSocket
        var enrichedCommand = command with 
        { 
            Protocol = ProtocolType.WebSocket,
            SessionId = Context.ConnectionId
        };

        // Processar via kernel unificado
        var response = await _kernelService.ExecuteCommandAsync(
            enrichedCommand,
            Context.ConnectionAborted);

        // Enviar resposta
        await Clients.Caller.SendAsync("CommandResponse", response);
    }

    public async Task StreamCommand(MCPCommand command)
    {
        _logger.LogInformation(
            "Iniciando streaming WebSocket: {Command}",
            command.Command);

        var enrichedCommand = command with 
        { 
            Protocol = ProtocolType.WebSocket,
            SessionId = Context.ConnectionId
        };

        await foreach (var chunk in _kernelService.StreamCommandAsync(
            enrichedCommand,
            Context.ConnectionAborted))
        {
            await Clients.Caller.SendAsync("StreamChunk", new
            {
                Content = chunk,
                Timestamp = DateTime.UtcNow
            });
        }

        await Clients.Caller.SendAsync("StreamComplete", new
        {
            Message = "Streaming concluído"
        });
    }
}

// Program.cs
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddSignalR();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();

// Redis para event bus
builder.Services.AddSingleton<IConnectionMultiplexer>(
    ConnectionMultiplexer.Connect("localhost:6379"));
builder.Services.AddSingleton<IEventBus, RedisEventBus>();

builder.Services.AddCors(options =>
{
    options.AddPolicy("AllowAll", policy =>
        policy.AllowAnyOrigin().AllowAnyMethod().AllowAnyHeader());
});

var app = builder.Build();

app.UseCors("AllowAll");
app.MapHub<MCPHub>("/mcphub");
app.MapGet("/", () => "MCP WebSocket Service");

app.Run();
Enter fullscreen mode Exit fullscreen mode

8️⃣ API Gateway com YARP

// MCPPipeline.Gateway/Program.cs
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddReverseProxy()
    .LoadFromConfig(builder.Configuration.GetSection("ReverseProxy"));

var app = builder.Build();

app.MapReverseProxy();
app.MapGet("/", () => Results.Ok(new
{
    Service = "MCP Gateway",
    Endpoints = new
    {
        gRPC = "https://localhost:5001",
        WebSocket = "https://localhost:5002/mcphub"
    }
}));

app.Run();

// appsettings.json
{
  "ReverseProxy": {
    "Routes": {
      "grpc-route": {
        "ClusterId": "grpc-cluster",
        "Match": {
          "Path": "/grpc/{**catch-all}"
        }
      },
      "websocket-route": {
        "ClusterId": "websocket-cluster",
        "Match": {
          "Path": "/ws/{**catch-all}"
        }
      }
    },
    "Clusters": {
      "grpc-cluster": {
        "Destinations": {
          "destination1": {
            "Address": "https://localhost:5001"
          }
        }
      },
      "websocket-cluster": {
        "Destinations": {
          "destination1": {
            "Address": "https://localhost:5002"
          }
        },
        "HttpRequest": {
          "Version": "1.1",
          "VersionPolicy": "RequestVersionOrLower"
        }
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

9️⃣ Cliente Unificado

// MCPPipeline.Client/UnifiedMCPClient.cs
public class UnifiedMCPClient : IAsyncDisposable
{
    private readonly MCPService.MCPServiceClient? _grpcClient;
    private readonly HubConnection? _hubConnection;
    private readonly ILogger<UnifiedMCPClient> _logger;
    private readonly bool _useGrpc;

    public UnifiedMCPClient(
        string endpoint,
        bool useGrpc,
        ILogger<UnifiedMCPClient> logger)
    {
        _useGrpc = useGrpc;
        _logger = logger;

        if (useGrpc)
        {
            var channel = GrpcChannel.ForAddress(endpoint);
            _grpcClient = new MCPService.MCPServiceClient(channel);
            _logger.LogInformation("Cliente configurado para gRPC: {Endpoint}", endpoint);
        }
        else
        {
            _hubConnection = new HubConnectionBuilder()
                .WithUrl(endpoint)
                .WithAutomaticReconnect()
                .Build();

            SetupWebSocketHandlers();
            _logger.LogInformation("Cliente configurado para WebSocket: {Endpoint}", endpoint);
        }
    }

    public async Task ConnectAsync(CancellationToken ct = default)
    {
        if (!_useGrpc && _hubConnection != null)
        {
            await _hubConnection.StartAsync(ct);
            _logger.LogInformation("Conectado via WebSocket");
        }
    }

    public async Task<MCPResponse> SendCommandAsync(
        string command,
        string payload,
        CancellationToken ct = default)
    {
        if (_useGrpc && _grpcClient != null)
        {
            return await SendViaGrpcAsync(command, payload, ct);
        }
        else if (_hubConnection != null)
        {
            return await SendViaWebSocketAsync(command, payload, ct);
        }

        throw new InvalidOperationException("Cliente não inicializado");
    }

    private async Task<MCPResponse> SendViaGrpcAsync(
        string command,
        string payload,
        CancellationToken ct)
    {
        _logger.LogInformation("Enviando comando via gRPC: {Command}", command);

        var request = new MCPRequest
        {
            Command = command,
            Payload = payload,
            Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
        };

        var response = await _grpcClient!.SendCommandAsync(request, cancellationToken: ct);

        return new MCPResponse
        {
            Result = response.Result,
            Status = response.Status == "OK" ? ResponseStatus.Success : ResponseStatus.Error,
            ErrorMessage = response.ErrorMessage,
            ProcessingTimeMs = response.ProcessingTimeMs
        };
    }

    private async Task<MCPResponse> SendViaWebSocketAsync(
        string command,
        string payload,
        CancellationToken ct)
    {
        _logger.LogInformation("Enviando comando via WebSocket: {Command}", command);

        var tcs = new TaskCompletionSource<MCPResponse>();

        void handler(MCPResponse response)
        {
            tcs.TrySetResult(response);
        }

        _hubConnection!.On<MCPResponse>("CommandResponse", handler);

        try
        {
            await _hubConnection.InvokeAsync("SendCommand", new MCPCommand
            {
                Command = command,
                Payload = payload,
                Timestamp = DateTime.UtcNow
            }, ct);

            return await tcs.Task.WaitAsync(TimeSpan.FromSeconds(30), ct);
        }
        finally
        {
            _hubConnection.Remove("CommandResponse");
        }
    }

    public async IAsyncEnumerable<string> StreamCommandAsync(
        string command,
        string payload,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        if (_useGrpc && _grpcClient != null)
        {
            await foreach (var chunk in StreamViaGrpcAsync(command, payload, ct))
            {
                yield return chunk;
            }
        }
        else if (_hubConnection != null)
        {
            await foreach (var chunk in StreamViaWebSocketAsync(command, payload, ct))
            {
                yield return chunk;
            }
        }
    }

    private async IAsyncEnumerable<string> StreamViaGrpcAsync(
        string command,
        string payload,
        [EnumeratorCancellation] CancellationToken ct)
    {
        var request = new MCPRequest
        {
            Command = command,
            Payload = payload,
            Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
        };

        using var call = _grpcClient!.StreamCommand(request, cancellationToken: ct);

        await foreach (var response in call.ResponseStream.ReadAllAsync(ct))
        {
            yield return response.Content;
        }
    }

    private async IAsyncEnumerable<string> StreamViaWebSocketAsync(
        string command,
        string payload,
        [EnumeratorCancellation] CancellationToken ct)
    {
        var channel = Channel.CreateUnbounded<string>();

        _hubConnection!.On<dynamic>("StreamChunk", (chunk) =>
        {
            channel.Writer.TryWrite(chunk.Content.ToString());
        });

        _hubConnection.On<dynamic>("StreamComplete", (_) =>
        {
            channel.Writer.Complete();
        });

        await _hubConnection.InvokeAsync("StreamCommand", new MCPCommand
        {
            Command = command,
            Payload = payload,
            Timestamp = DateTime.UtcNow
        }, ct);

        await foreach (var chunk in channel.Reader.ReadAllAsync(ct))
        {
            yield return chunk;
        }
    }

    private void SetupWebSocketHandlers()
    {
        _hubConnection!.On<object>("Connected", data =>
        {
            _logger.LogInformation("✅ WebSocket conectado: {Data}", data);
        });

        _hubConnection.Reconnecting += error =>
        {
            _logger.LogWarning("⚠️ WebSocket reconectando: {Error}", error?.Message);
            return Task.CompletedTask;
        };

        _hubConnection.Reconnected += connectionId =>
        {
            _logger.LogInformation("✅ WebSocket reconectado: {ConnectionId}", connectionId);
            return Task.CompletedTask;
        };

        _hubConnection.Closed += error =>
        {
            _logger.LogError("❌ WebSocket fechado: {Error}", error?.Message);
            return Task.CompletedTask;
        };
    }

    public async ValueTask DisposeAsync()
    {
        if (_hubConnection != null)
        {
            await _hubConnection.StopAsync();
            await _hubConnection.DisposeAsync();
        }
    }
}

// Exemplo de uso
public class Program
{
    public static async Task Main(string[] args)
    {
        var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
        var logger = loggerFactory.CreateLogger<UnifiedMCPClient>();

        // Cliente gRPC
        await using var grpcClient = new UnifiedMCPClient(
            "https://localhost:5001",
            useGrpc: true,
            logger);

        var response = await grpcClient.SendCommandAsync("analyze", "Teste via gRPC");
        Console.WriteLine($"gRPC: {response.Result}");

        // Cliente WebSocket
        await using var wsClient = new UnifiedMCPClient(
            "https://localhost:5002/mcphub",
            useGrpc: false,
            logger);

        await wsClient.ConnectAsync();

        response = await wsClient.SendCommandAsync("analyze", "Teste via WebSocket");
        Console.WriteLine($"WebSocket: {response.Result}");

        // Streaming
        await foreach (var chunk in wsClient.StreamCommandAsync("generate", "Prompt de teste"))
        {
            Console.WriteLine($"Chunk: {chunk}");
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

🔍 Observabilidade Unificada

OpenTelemetry Configuration

// MCPPipeline.Observability/OpenTelemetryExtensions.cs
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using OpenTelemetry.Metrics;

public static class OpenTelemetryExtensions
{
    public static IServiceCollection AddMCPObservability(
        this IServiceCollection services,
        string serviceName)
    {
        services.AddOpenTelemetry()
            .ConfigureResource(resource =>
            {
                resource.AddService(
                    serviceName: serviceName,
                    serviceVersion: "1.0.0");
            })
            .WithTracing(tracing =>
            {
                tracing
                    .AddAspNetCoreInstrumentation(options =>
                    {
                        options.RecordException = true;
                        options.EnrichWithHttpRequest = (activity, request) =>
                        {
                            activity.SetTag("http.scheme", request.Scheme);
                            activity.SetTag("http.client_ip", request.HttpContext.Connection.RemoteIpAddress);
                        };
                    })
                    .AddGrpcClientInstrumentation()
                    .AddSource("MCPPipeline.Core")
                    .AddSource("MCPPipeline.GrpcService")
                    .AddSource("MCPPipeline.WebSocketService")
                    .AddOtlpExporter(options =>
                    {
                        options.Endpoint = new Uri("http://localhost:4317");
                    });
            })
            .WithMetrics(metrics =>
            {
                metrics
                    .AddAspNetCoreInstrumentation()
                    .AddRuntimeInstrumentation()
                    .AddMeter("MCPPipeline.*")
                    .AddPrometheusExporter()
                    .AddOtlpExporter();
            });

        return services;
    }
}

// Uso nos serviços
builder.Services.AddMCPObservability("MCPPipeline.GrpcService");
Enter fullscreen mode Exit fullscreen mode

Métricas Customizadas

// MCPPipeline.Observability/MCPMetrics.cs
using System.Diagnostics.Metrics;

public class MCPMetrics
{
    private readonly Meter _meter;
    private readonly Counter<long> _commandsProcessed;
    private readonly Histogram<double> _commandDuration;
    private readonly Counter<long> _commandErrors;
    private readonly UpDownCounter<int> _activeConnections;

    public MCPMetrics()
    {
        _meter = new Meter("MCPPipeline.Metrics", "1.0.0");

        _commandsProcessed = _meter.CreateCounter<long>(
            "mcp.commands.processed",
            description: "Total de comandos processados");

        _commandDuration = _meter.CreateHistogram<double>(
            "mcp.command.duration",
            unit: "ms",
            description: "Duração do processamento de comandos");

        _commandErrors = _meter.CreateCounter<long>(
            "mcp.commands.errors",
            description: "Total de erros no processamento");

        _activeConnections = _meter.CreateUpDownCounter<int>(
            "mcp.connections.active",
            description: "Conexões ativas");
    }

    public void RecordCommandProcessed(string command, ProtocolType protocol)
    {
        _commandsProcessed.Add(1,
            new KeyValuePair<string, object?>("command", command),
            new KeyValuePair<string, object?>("protocol", protocol.ToString()));
    }

    public void RecordCommandDuration(string command, ProtocolType protocol, double durationMs)
    {
        _commandDuration.Record(durationMs,
            new KeyValuePair<string, object?>("command", command),
            new KeyValuePair<string, object?>("protocol", protocol.ToString()));
    }

    public void RecordCommandError(string command, ProtocolType protocol, string errorType)
    {
        _commandErrors.Add(1,
            new KeyValuePair<string, object?>("command", command),
            new KeyValuePair<string, object?>("protocol", protocol.ToString()),
            new KeyValuePair<string, object?>("error_type", errorType));
    }

    public void IncrementActiveConnections(ProtocolType protocol)
    {
        _activeConnections.Add(1,
            new KeyValuePair<string, object?>("protocol", protocol.ToString()));
    }

    public void DecrementActiveConnections(ProtocolType protocol)
    {
        _activeConnections.Add(-1,
            new KeyValuePair<string, object?>("protocol", protocol.ToString()));
    }
}
Enter fullscreen mode Exit fullscreen mode

📊 Dashboard com Grafana

Prometheus Configuration

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'mcp-grpc-service'
    static_configs:
      - targets: ['localhost:5001']

  - job_name: 'mcp-websocket-service'
    static_configs:
      - targets: ['localhost:5002']

  - job_name: 'mcp-gateway'
    static_configs:
      - targets: ['localhost:5000']
Enter fullscreen mode Exit fullscreen mode

Grafana Dashboard JSON

{
  "dashboard": {
    "title": "MCP Hybrid Protocol Dashboard",
    "panels": [
      {
        "title": "Comandos Processados por Protocolo",
        "targets": [
          {
            "expr": "rate(mcp_commands_processed_total[5m])",
            "legendFormat": "{{protocol}}"
          }
        ]
      },
      {
        "title": "Latência Média por Comando",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(mcp_command_duration_bucket[5m]))",
            "legendFormat": "p95 - {{command}}"
          }
        ]
      },
      {
        "title": "Conexões Ativas",
        "targets": [
          {
            "expr": "mcp_connections_active",
            "legendFormat": "{{protocol}}"
          }
        ]
      },
      {
        "title": "Taxa de Erro",
        "targets": [
          {
            "expr": "rate(mcp_commands_errors_total[5m])",
            "legendFormat": "{{protocol}} - {{error_type}}"
          }
        ]
      }
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

🧪 Testes de Integração

Teste Híbrido

// MCPPipeline.IntegrationTests/HybridProtocolTests.cs
public class HybridProtocolTests : IClassFixture<MCPTestFixture>
{
    private readonly MCPTestFixture _fixture;

    public HybridProtocolTests(MCPTestFixture fixture)
    {
        _fixture = fixture;
    }

    [Fact]
    public async Task Should_Process_Same_Command_Via_Both_Protocols()
    {
        // Arrange
        var grpcClient = _fixture.CreateGrpcClient();
        var wsClient = _fixture.CreateWebSocketClient();

        await wsClient.ConnectAsync();

        var testPayload = "Teste de integração híbrida";

        // Act
        var grpcResponse = await grpcClient.SendCommandAsync("analyze", testPayload);
        var wsResponse = await wsClient.SendCommandAsync("analyze", testPayload);

        // Assert
        Assert.Equal(ResponseStatus.Success, grpcResponse.Status);
        Assert.Equal(ResponseStatus.Success, wsResponse.Status);

        // Ambos devem retornar resultados equivalentes
        Assert.Contains("palavras", grpcResponse.Result);
        Assert.Contains("palavras", wsResponse.Result);
    }

    [Fact]
    public async Task Should_Handle_Concurrent_Requests_From_Both_Protocols()
    {
        // Arrange
        var grpcClient = _fixture.CreateGrpcClient();
        var wsClient = _fixture.CreateWebSocketClient();
        await wsClient.ConnectAsync();

        var tasks = new List<Task<MCPResponse>>();

        // Act - Enviar 50 requisições concorrentes de cada protocolo
        for (int i = 0; i < 50; i++)
        {
            tasks.Add(grpcClient.SendCommandAsync("status", $"gRPC-{i}"));
            tasks.Add(wsClient.SendCommandAsync("status", $"WS-{i}"));
        }

        var responses = await Task.WhenAll(tasks);

        // Assert
        Assert.All(responses, r => Assert.Equal(ResponseStatus.Success, r.Status));
        Assert.Equal(100, responses.Length);
    }

    [Fact]
    public async Task Should_Broadcast_Events_Between_Protocols()
    {
        // Arrange
        var grpcClient = _fixture.CreateGrpcClient();
        var wsClient = _fixture.CreateWebSocketClient();
        await wsClient.ConnectAsync();

        var eventReceived = new TaskCompletionSource<bool>();

        // Configurar listener no WebSocket
        // (implementação depende do event bus)

        // Act - Enviar comando via gRPC
        await grpcClient.SendCommandAsync("generate", "Teste de evento");

        // Assert - WebSocket deve receber notificação
        var received = await eventReceived.Task.WaitAsync(TimeSpan.FromSeconds(5));
        Assert.True(received);
    }
}

public class MCPTestFixture : IAsyncLifetime
{
    private WebApplication? _grpcApp;
    private WebApplication? _wsApp;

    public async Task InitializeAsync()
    {
        // Inicializar serviços de teste
        _grpcApp = CreateGrpcService();
        _wsApp = CreateWebSocketService();

        await _grpcApp.StartAsync();
        await _wsApp.StartAsync();
    }

    public async Task DisposeAsync()
    {
        if (_grpcApp != null) await _grpcApp.StopAsync();
        if (_wsApp != null) await _wsApp.StopAsync();
    }

    public UnifiedMCPClient CreateGrpcClient()
    {
        return new UnifiedMCPClient(
            "https://localhost:5001",
            useGrpc: true,
            Mock.Of<ILogger<UnifiedMCPClient>>());
    }

    public UnifiedMCPClient CreateWebSocketClient()
    {
        return new UnifiedMCPClient(
            "https://localhost:5002/mcphub",
            useGrpc: false,
            Mock.Of<ILogger<UnifiedMCPClient>>());
    }

    private WebApplication CreateGrpcService()
    {
        var builder = WebApplication.CreateBuilder();
        builder.Services.AddGrpc();
        builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
        builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>();

        var app = builder.Build();
        app.MapGrpcService<MCPGrpcServiceImpl>();
        return app;
    }

    private WebApplication CreateWebSocketService()
    {
        var builder = WebApplication.CreateBuilder();
        builder.Services.AddSignalR();
        builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();

        var app = builder.Build();
        app.MapHub<MCPHub>("/mcphub");
        return app;
    }
}
Enter fullscreen mode Exit fullscreen mode

🚀 Teste de Performance

Benchmark com BenchmarkDotNet

// MCPPipeline.Benchmarks/ProtocolBenchmarks.cs
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;

[MemoryDiagnoser]
[SimpleJob(warmupCount: 3, iterationCount: 10)]
public class ProtocolBenchmarks
{
    private UnifiedMCPClient _grpcClient = null!;
    private UnifiedMCPClient _wsClient = null!;

    [GlobalSetup]
    public async Task Setup()
    {
        _grpcClient = new UnifiedMCPClient(
            "https://localhost:5001",
            useGrpc: true,
            Mock.Of<ILogger<UnifiedMCPClient>>());

        _wsClient = new UnifiedMCPClient(
            "https://localhost:5002/mcphub",
            useGrpc: false,
            Mock.Of<ILogger<UnifiedMCPClient>>());

        await _wsClient.ConnectAsync();
    }

    [Benchmark(Baseline = true)]
    public async Task<MCPResponse> GrpcCommand()
    {
        return await _grpcClient.SendCommandAsync("status", "benchmark");
    }

    [Benchmark]
    public async Task<MCPResponse> WebSocketCommand()
    {
        return await _wsClient.SendCommandAsync("status", "benchmark");
    }

    [Benchmark]
    public async Task GrpcStreaming()
    {
        await foreach (var _ in _grpcClient.StreamCommandAsync("generate", "test"))
        {
            // Consumir stream
        }
    }

    [Benchmark]
    public async Task WebSocketStreaming()
    {
        await foreach (var _ in _wsClient.StreamCommandAsync("generate", "test"))
        {
            // Consumir stream
        }
    }

    [GlobalCleanup]
    public async Task Cleanup()
    {
        await _grpcClient.DisposeAsync();
        await _wsClient.DisposeAsync();
    }
}

// Program.cs
public class Program
{
    public static void Main(string[] args)
    {
        BenchmarkRunner.Run<ProtocolBenchmarks>();
    }
}
Enter fullscreen mode Exit fullscreen mode

Resultados Esperados

|            Method |      Mean |     Error |    StdDev | Ratio | Gen0 | Allocated |
|------------------ |----------:|----------:|----------:|------:|-----:|----------:|
|       GrpcCommand |  2.450 ms | 0.0421 ms | 0.0394 ms |  1.00 | 15.6 |     128KB |
|  WebSocketCommand |  8.732 ms | 0.1523 ms | 0.1425 ms |  3.56 | 31.2 |     256KB |
|     GrpcStreaming | 12.105 ms | 0.2103 ms | 0.1967 ms |  4.94 | 46.8 |     384KB |
| WebSocketStreaming| 18.421 ms | 0.3142 ms | 0.2940 ms |  7.52 | 62.5 |     512KB |
Enter fullscreen mode Exit fullscreen mode

🎯 Decisão de Protocolo: Quando Usar Cada Um

Matriz de Decisão

public class ProtocolSelector
{
    public ProtocolType SelectOptimalProtocol(ClientContext context)
    {
        // 1. Cliente é navegador? → WebSocket
        if (context.IsWebBrowser)
            return ProtocolType.WebSocket;

        // 2. Comunicação interna entre serviços? → gRPC
        if (context.IsInternalService)
            return ProtocolType.Grpc;

        // 3. Requer streaming bidirecional de longa duração? → WebSocket
        if (context.RequiresLongLivedStream)
            return ProtocolType.WebSocket;

        // 4. Prioriza latência mínima? → gRPC
        if (context.LatencySensitive)
            return ProtocolType.Grpc;

        // 5. Múltiplos clientes precisam receber broadcasts? → WebSocket
        if (context.RequiresBroadcast)
            return ProtocolType.WebSocket;

        // 6. Padrão: gRPC para performance
        return ProtocolType.Grpc;
    }
}

public record ClientContext
{
    public bool IsWebBrowser { get; init; }
    public bool IsInternalService { get; init; }
    public bool RequiresLongLivedStream { get; init; }
    public bool LatencySensitive { get; init; }
    public bool RequiresBroadcast { get; init; }
}
Enter fullscreen mode Exit fullscreen mode

Tabela Comparativa

Cenário gRPC WebSocket Recomendação
API Backend → Backend ✅ Ideal ⚠️ Overhead gRPC
Aplicação Web → Backend ❌ Limitado ✅ Nativo WebSocket
Mobile App → Backend ✅ Excelente ✅ Bom gRPC (melhor performance)
Dashboard Real-time ⚠️ Polling ✅ Push nativo WebSocket
Chat Multi-usuário ❌ Não ideal ✅ Perfeito WebSocket
Análise de Dados ✅ Alta performance ⚠️ Mais lento gRPC
Streaming de IA ✅ Eficiente ✅ Funcional Ambos (contexto dependente)

🔒 Segurança em Ambiente Híbrido

Autenticação Unificada

// MCPPipeline.Security/UnifiedAuthenticationExtensions.cs
public static class UnifiedAuthenticationExtensions
{
    public static IServiceCollection AddUnifiedAuthentication(
        this IServiceCollection services,
        IConfiguration configuration)
    {
        services.AddAuthentication(options =>
        {
            options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
            options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
        })
        .AddJwtBearer(options =>
        {
            options.TokenValidationParameters = new TokenValidationParameters
            {
                ValidateIssuer = true,
                ValidateAudience = true,
                ValidateLifetime = true,
                ValidateIssuerSigningKey = true,
                ValidIssuer = configuration["Jwt:Issuer"],
                ValidAudience = configuration["Jwt:Audience"],
                IssuerSigningKey = new SymmetricSecurityKey(
                    Encoding.UTF8.GetBytes(configuration["Jwt:Key"]!))
            };

            // Suporte para WebSocket token via query string
            options.Events = new JwtBearerEvents
            {
                OnMessageReceived = context =>
                {
                    var accessToken = context.Request.Query["access_token"];
                    var path = context.HttpContext.Request.Path;

                    if (!string.IsNullOrEmpty(accessToken) && 
                        path.StartsWithSegments("/mcphub"))
                    {
                        context.Token = accessToken;
                    }
                    return Task.CompletedTask;
                }
            };
        });

        // Adicionar metadata para gRPC
        services.AddGrpc(options =>
        {
            options.Interceptors.Add<AuthenticationInterceptor>();
        });

        return services;
    }
}

// Interceptor para gRPC
public class AuthenticationInterceptor : Interceptor
{
    private readonly ILogger<AuthenticationInterceptor> _logger;
    public AuthenticationInterceptor(ILogger<AuthenticationInterceptor> logger)
    {
        _logger = logger;
    }

    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
        TRequest request,
        ServerCallContext context,
        UnaryServerMethod<TRequest, TResponse> continuation)
    {
        var authHeader = context.RequestHeaders.GetValue("authorization");
        if (string.IsNullOrEmpty(authHeader))
        {
            _logger.LogWarning("Requisição gRPC sem autenticação");
            throw new RpcException(new Status(StatusCode.Unauthenticated, "Token não fornecido"));
        }

        // TODO: Implementar validação real do token JWT
        _logger.LogInformation("Token gRPC validado para peer: {Peer}", context.Peer);

        return await continuation(request, context);
    }
}
Enter fullscreen mode Exit fullscreen mode

✅ Conclusão e Próximos Passos

Nesta quarta parte, consolidamos uma arquitetura híbrida que combina gRPC e WebSocket, garantindo:

  • Baixa latência e tipagem forte para comunicação interna (gRPC)
  • Flexibilidade e real-time para clientes web/mobile (WebSocket)
  • Kernel unificado, observabilidade centralizada e segurança robusta

🔮 Próximos Passos

  • Parte 5: Integração com mensageria avançada (Kafka, RabbitMQ) para cenários de alta escala.
  • Parte 6: Implementação de circuit breaker e resiliente com Polly.
  • Parte 7: Estratégias de multi-cloud deployment e CI/CD com GitHub Actions.

Dica: Explore o repositório MCPPipeline.Hybrid para exemplos completos e scripts de automação.


🤝 Conecte-se Comigo

Se você trabalha com .NET moderno e quer dominar arquitetura, C#, observabilidade, DevOps ou interoperabilidade:

💼 LinkedIn
✍️ Medium
📬 contato@dopme.io
📬 devsfree@devsfree.com.br

⁸ Novamente o transportou o diabo a um monte muito alto; e mostrou-lhe todos os reinos do mundo, e a glória deles. ⁹ E disse-lhe: Tudo isto te darei se, prostrado, me adorares. ¹⁰ Então disse-lhe Jesus: Vai-te, Satanás, porque está escrito: Ao Senhor teu Deus adorarás, e só a ele servirás. Mateus 4:8-10

Top comments (0)