Integração Completa com gRPC & WebSocket
Nesta quarta parte da série "Minha Primeira Comunicação com MCP e .NET", exploramos como criar uma arquitetura híbrida que combina o melhor dos dois mundos: a eficiência e tipagem forte do gRPC para comunicação interna entre serviços, e a flexibilidade e suporte nativo do WebSocket para clientes web e mobile, criando um ecossistema MCP robusto e escalável.
🚀 Introdução
Após explorarmos gRPC (Parte 2) e WebSocket (Parte 3) individualmente, surge uma questão arquitetural importante: e se precisarmos dos benefícios de ambos?
Em sistemas corporativos modernos, é comum ter:
- Microsserviços backend que se comunicam via gRPC (baixa latência, tipagem forte)
- Clientes web/mobile que precisam de WebSocket (suporte nativo, real-time)
- Agentes MCP que podem usar ambos os protocolos conforme o contexto
Este artigo demonstra como construir uma arquitetura unificada que oferece ambos os protocolos, com roteamento inteligente, adaptadores de protocolo, e observabilidade centralizada.
🧠 Arquitetura Híbrida: O Melhor dos Dois Mundos
Visão Geral da Arquitetura
┌─────────────────────────────────────────────────────────────────┐
│ API Gateway / BFF │
│ (Protocol Orchestrator) │
└──────────────────┬─────────────────────┬────────────────────────┘
│ │
┌─────────▼──────────┐ ┌───────▼──────────┐
│ gRPC Endpoint │ │ WebSocket Endpoint│
│ (Port 5001) │ │ (Port 5002) │
└─────────┬──────────┘ └───────┬───────────┘
│ │
│ ┌────────────────┴─────────────────┐
│ │ │
┌─────────▼────▼──────────────┐ ┌─────────────▼──────┐
│ Protocol Adapter Layer │ │ SignalR Hub Layer │
│ (gRPC ↔ WebSocket) │ │ (Client Manager) │
└─────────┬──────────────────┘ └─────────┬────────────┘
│ │
┌─────────▼──────────────────────────────────▼────────┐
│ MCP Kernel Orchestrator │
│ (Unified Command Processing) │
└─────────┬──────────────────────────┬─────────────────┘
│ │
┌─────────▼────────┐ ┌─────────▼──────────┐
│ Domain Services │ │ Event Bus (Redis) │
│ (Business Logic)│ │ (Cross-Protocol) │
└──────────────────┘ └────────────────────┘
Componentes Principais
- Protocol Orchestrator - Decide qual protocolo usar baseado em contexto
- Protocol Adapter Layer - Converte mensagens entre gRPC e WebSocket
- Unified MCP Kernel - Processa comandos independente do protocolo
- Event Bus - Sincroniza eventos entre instâncias e protocolos
- Observability Layer - Tracing unificado para ambos os protocolos
🏗️ Implementação da Arquitetura Híbrida
1️⃣ Estrutura do Projeto
MCPPipeline.Hybrid/
├── src/
│ ├── MCPPipeline.Contracts/ # Modelos compartilhados
│ │ ├── Messages/
│ │ │ ├── MCPCommand.cs
│ │ │ ├── MCPResponse.cs
│ │ │ └── MCPEvent.cs
│ │ └── Protos/
│ │ └── mcp.proto
│ │
│ ├── MCPPipeline.Core/ # Lógica de negócio
│ │ ├── Services/
│ │ │ ├── IMCPKernelService.cs
│ │ │ └── MCPKernelService.cs
│ │ └── Events/
│ │ └── IEventBus.cs
│ │
│ ├── MCPPipeline.GrpcService/ # Endpoint gRPC
│ │ ├── Services/
│ │ │ └── MCPGrpcService.cs
│ │ └── Program.cs
│ │
│ ├── MCPPipeline.WebSocketService/ # Endpoint WebSocket
│ │ ├── Hubs/
│ │ │ └── MCPHub.cs
│ │ └── Program.cs
│ │
│ ├── MCPPipeline.Gateway/ # API Gateway (YARP)
│ │ ├── Configuration/
│ │ └── Program.cs
│ │
│ └── MCPPipeline.Adapter/ # Protocol Adapter
│ ├── GrpcToWebSocketAdapter.cs
│ └── WebSocketToGrpcAdapter.cs
│
└── tests/
└── MCPPipeline.IntegrationTests/
2️⃣ Contratos Compartilhados
// MCPPipeline.Contracts/Messages/MCPCommand.cs
namespace MCPPipeline.Contracts.Messages;
public record MCPCommand
{
public string CommandId { get; init; } = Guid.NewGuid().ToString();
public string Command { get; init; } = string.Empty;
public string Payload { get; init; } = string.Empty;
public Dictionary<string, string> Metadata { get; init; } = new();
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
public string SessionId { get; init; } = string.Empty;
public ProtocolType Protocol { get; init; }
public int Priority { get; init; } = 0;
}
public record MCPResponse
{
public string CommandId { get; init; } = string.Empty;
public string Result { get; init; } = string.Empty;
public ResponseStatus Status { get; init; }
public string? ErrorMessage { get; init; }
public long ProcessingTimeMs { get; init; }
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
public Dictionary<string, object> Metrics { get; init; } = new();
}
public record MCPEvent
{
public string EventId { get; init; } = Guid.NewGuid().ToString();
public string EventType { get; init; } = string.Empty;
public string Source { get; init; } = string.Empty;
public object Data { get; init; } = new();
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
}
public enum ProtocolType
{
Unknown = 0,
Grpc = 1,
WebSocket = 2,
Http = 3
}
public enum ResponseStatus
{
Success,
Error,
Processing,
Timeout
}
3️⃣ MCP Kernel Unificado
// MCPPipeline.Core/Services/MCPKernelService.cs
using MCPPipeline.Contracts.Messages;
using System.Diagnostics;
namespace MCPPipeline.Core.Services;
public interface IMCPKernelService
{
Task<MCPResponse> ExecuteCommandAsync(MCPCommand command, CancellationToken ct);
IAsyncEnumerable<string> StreamCommandAsync(MCPCommand command, CancellationToken ct);
}
public class MCPKernelService : IMCPKernelService
{
private readonly ILogger<MCPKernelService> _logger;
private readonly IEventBus _eventBus;
private static readonly ActivitySource ActivitySource = new("MCPPipeline.Core");
public MCPKernelService(
ILogger<MCPKernelService> logger,
IEventBus eventBus)
{
_logger = logger;
_eventBus = eventBus;
}
public async Task<MCPResponse> ExecuteCommandAsync(
MCPCommand command,
CancellationToken ct)
{
using var activity = ActivitySource.StartActivity("ExecuteCommand");
activity?.SetTag("command.id", command.CommandId);
activity?.SetTag("command.type", command.Command);
activity?.SetTag("protocol", command.Protocol.ToString());
var sw = Stopwatch.StartNew();
try
{
_logger.LogInformation(
"Executando comando {Command} via {Protocol} | Session: {SessionId}",
command.Command,
command.Protocol,
command.SessionId);
// Publicar evento de início
await _eventBus.PublishAsync(new MCPEvent
{
EventType = "CommandStarted",
Source = command.Protocol.ToString(),
Data = new { command.CommandId, command.Command }
});
// Processar comando
var result = await ProcessCommandAsync(command, ct);
sw.Stop();
var response = new MCPResponse
{
CommandId = command.CommandId,
Result = result,
Status = ResponseStatus.Success,
ProcessingTimeMs = sw.ElapsedMilliseconds,
Metrics = new Dictionary<string, object>
{
["protocol"] = command.Protocol.ToString(),
["priority"] = command.Priority
}
};
// Publicar evento de conclusão
await _eventBus.PublishAsync(new MCPEvent
{
EventType = "CommandCompleted",
Source = command.Protocol.ToString(),
Data = new { command.CommandId, response.ProcessingTimeMs }
});
activity?.SetTag("response.status", "success");
return response;
}
catch (Exception ex)
{
sw.Stop();
_logger.LogError(ex,
"Erro ao executar comando {Command} via {Protocol}",
command.Command,
command.Protocol);
activity?.SetTag("response.status", "error");
activity?.SetTag("error.message", ex.Message);
await _eventBus.PublishAsync(new MCPEvent
{
EventType = "CommandFailed",
Source = command.Protocol.ToString(),
Data = new { command.CommandId, Error = ex.Message }
});
return new MCPResponse
{
CommandId = command.CommandId,
Status = ResponseStatus.Error,
ErrorMessage = ex.Message,
ProcessingTimeMs = sw.ElapsedMilliseconds
};
}
}
public async IAsyncEnumerable<string> StreamCommandAsync(
MCPCommand command,
[EnumeratorCancellation] CancellationToken ct)
{
using var activity = ActivitySource.StartActivity("StreamCommand");
activity?.SetTag("command.id", command.CommandId);
_logger.LogInformation(
"Iniciando streaming para comando {Command} via {Protocol}",
command.Command,
command.Protocol);
var chunks = await GenerateStreamChunksAsync(command.Payload, ct);
for (int i = 0; i < chunks.Count; i++)
{
if (ct.IsCancellationRequested)
yield break;
yield return chunks[i];
await Task.Delay(50, ct); // Simula processamento
}
}
private async Task<string> ProcessCommandAsync(MCPCommand command, CancellationToken ct)
{
return command.Command.ToLowerInvariant() switch
{
"analyze" => await AnalyzeAsync(command.Payload, ct),
"summarize" => await SummarizeAsync(command.Payload, ct),
"translate" => await TranslateAsync(command.Payload, ct),
"generate" => await GenerateAsync(command.Payload, ct),
"status" => await GetStatusAsync(ct),
_ => throw new InvalidOperationException($"Comando desconhecido: {command.Command}")
};
}
private async Task<string> AnalyzeAsync(string payload, CancellationToken ct)
{
await Task.Delay(200, ct);
var words = payload.Split(' ', StringSplitOptions.RemoveEmptyEntries).Length;
return $"Análise: {words} palavras, {payload.Length} caracteres";
}
private async Task<string> SummarizeAsync(string payload, CancellationToken ct)
{
await Task.Delay(300, ct);
return payload.Length > 100
? $"Resumo: {payload[..100]}..."
: $"Resumo: {payload}";
}
private async Task<string> TranslateAsync(string payload, CancellationToken ct)
{
await Task.Delay(250, ct);
return $"[Traduzido] {payload}";
}
private async Task<string> GenerateAsync(string payload, CancellationToken ct)
{
await Task.Delay(500, ct);
return $"Conteúdo gerado baseado em: {payload}";
}
private async Task<string> GetStatusAsync(CancellationToken ct)
{
await Task.Delay(50, ct);
return $"Sistema operacional | Timestamp: {DateTime.UtcNow:O}";
}
private async Task<List<string>> GenerateStreamChunksAsync(string prompt, CancellationToken ct)
{
await Task.Delay(100, ct);
return new List<string>
{
"Iniciando processamento...",
"Analisando contexto...",
"Gerando resposta...",
$"Resultado: {prompt}",
"Processamento concluído."
};
}
}
4️⃣ Event Bus com Redis
// MCPPipeline.Core/Events/IEventBus.cs
namespace MCPPipeline.Core.Events;
public interface IEventBus
{
Task PublishAsync<T>(T @event) where T : class;
Task SubscribeAsync<T>(Func<T, Task> handler) where T : class;
}
// MCPPipeline.Core/Events/RedisEventBus.cs
using StackExchange.Redis;
using System.Text.Json;
public class RedisEventBus : IEventBus
{
private readonly IConnectionMultiplexer _redis;
private readonly ILogger<RedisEventBus> _logger;
private readonly ISubscriber _subscriber;
public RedisEventBus(
IConnectionMultiplexer redis,
ILogger<RedisEventBus> logger)
{
_redis = redis;
_logger = logger;
_subscriber = redis.GetSubscriber();
}
public async Task PublishAsync<T>(T @event) where T : class
{
var channel = typeof(T).Name;
var message = JsonSerializer.Serialize(@event);
await _subscriber.PublishAsync(channel, message);
_logger.LogDebug("Evento publicado: {EventType}", channel);
}
public async Task SubscribeAsync<T>(Func<T, Task> handler) where T : class
{
var channel = typeof(T).Name;
await _subscriber.SubscribeAsync(channel, async (ch, message) =>
{
try
{
var @event = JsonSerializer.Deserialize<T>(message!);
if (@event != null)
{
await handler(@event);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Erro ao processar evento {EventType}", channel);
}
});
_logger.LogInformation("Inscrito no canal: {Channel}", channel);
}
}
5️⃣ Protocol Adapter
// MCPPipeline.Adapter/ProtocolAdapter.cs
using MCPPipeline.Contracts.Messages;
using MCPPipeline.Grpc;
namespace MCPPipeline.Adapter;
public interface IProtocolAdapter
{
MCPCommand FromGrpcRequest(MCPRequest grpcRequest);
MCPRequest ToGrpcRequest(MCPCommand command);
MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse);
MCPGrpcResponse ToGrpcResponse(MCPResponse response);
}
public class ProtocolAdapter : IProtocolAdapter
{
public MCPCommand FromGrpcRequest(MCPRequest grpcRequest)
{
return new MCPCommand
{
Command = grpcRequest.Command,
Payload = grpcRequest.Payload,
Metadata = grpcRequest.Metadata.ToDictionary(k => k.Key, v => v.Value),
Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(grpcRequest.Timestamp).DateTime,
Protocol = ProtocolType.Grpc
};
}
public MCPRequest ToGrpcRequest(MCPCommand command)
{
return new MCPRequest
{
Command = command.Command,
Payload = command.Payload,
Metadata = { command.Metadata },
Timestamp = new DateTimeOffset(command.Timestamp).ToUnixTimeMilliseconds()
};
}
public MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse)
{
return new MCPResponse
{
Result = grpcResponse.Result,
Status = grpcResponse.Status switch
{
"OK" => ResponseStatus.Success,
"ERROR" => ResponseStatus.Error,
"PROCESSING" => ResponseStatus.Processing,
_ => ResponseStatus.Error
},
ErrorMessage = grpcResponse.ErrorMessage,
ProcessingTimeMs = grpcResponse.ProcessingTimeMs
};
}
public MCPGrpcResponse ToGrpcResponse(MCPResponse response)
{
return new MCPGrpcResponse
{
Result = response.Result,
Status = response.Status.ToString().ToUpperInvariant(),
ErrorMessage = response.ErrorMessage ?? string.Empty,
ProcessingTimeMs = response.ProcessingTimeMs
};
}
}
6️⃣ gRPC Service Endpoint
// MCPPipeline.GrpcService/Services/MCPGrpcService.cs
using Grpc.Core;
using MCPPipeline.Grpc;
using MCPPipeline.Core.Services;
using MCPPipeline.Adapter;
namespace MCPPipeline.GrpcService.Services;
public class MCPGrpcServiceImpl : MCPService.MCPServiceBase
{
private readonly IMCPKernelService _kernelService;
private readonly IProtocolAdapter _adapter;
private readonly ILogger<MCPGrpcServiceImpl> _logger;
public MCPGrpcServiceImpl(
IMCPKernelService kernelService,
IProtocolAdapter adapter,
ILogger<MCPGrpcServiceImpl> logger)
{
_kernelService = kernelService;
_adapter = adapter;
_logger = logger;
}
public override async Task<MCPGrpcResponse> SendCommand(
MCPRequest request,
ServerCallContext context)
{
_logger.LogInformation(
"Recebido comando gRPC: {Command} de {Peer}",
request.Command,
context.Peer);
// Converter para modelo unificado
var command = _adapter.FromGrpcRequest(request);
command = command with { SessionId = context.Peer };
// Processar via kernel unificado
var response = await _kernelService.ExecuteCommandAsync(
command,
context.CancellationToken);
// Converter resposta
return _adapter.ToGrpcResponse(response);
}
public override async Task StreamCommand(
MCPRequest request,
IServerStreamWriter<MCPStreamResponse> responseStream,
ServerCallContext context)
{
_logger.LogInformation(
"Iniciando streaming gRPC: {Command}",
request.Command);
var command = _adapter.FromGrpcRequest(request);
int chunkIndex = 0;
await foreach (var chunk in _kernelService.StreamCommandAsync(
command,
context.CancellationToken))
{
await responseStream.WriteAsync(new MCPStreamResponse
{
Content = chunk,
ChunkIndex = chunkIndex++,
IsComplete = false
});
}
// Enviar chunk final
await responseStream.WriteAsync(new MCPStreamResponse
{
Content = "Stream concluído",
ChunkIndex = chunkIndex,
IsComplete = true
});
}
public override Task<HealthResponse> HealthCheck(
HealthRequest request,
ServerCallContext context)
{
return Task.FromResult(new HealthResponse
{
Status = "Healthy",
Version = "1.0.0",
Protocol = "gRPC"
});
}
}
// Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>();
// Redis para event bus
builder.Services.AddSingleton<IConnectionMultiplexer>(
ConnectionMultiplexer.Connect("localhost:6379"));
builder.Services.AddSingleton<IEventBus, RedisEventBus>();
var app = builder.Build();
app.MapGrpcService<MCPGrpcServiceImpl>();
app.MapGet("/", () => "MCP gRPC Service");
app.Run();
7️⃣ WebSocket Service Endpoint
// MCPPipeline.WebSocketService/Hubs/MCPHub.cs
using Microsoft.AspNetCore.SignalR;
using MCPPipeline.Contracts.Messages;
using MCPPipeline.Core.Services;
namespace MCPPipeline.WebSocketService.Hubs;
public class MCPHub : Hub
{
private readonly IMCPKernelService _kernelService;
private readonly ILogger<MCPHub> _logger;
public MCPHub(
IMCPKernelService kernelService,
ILogger<MCPHub> logger)
{
_kernelService = kernelService;
_logger = logger;
}
public override async Task OnConnectedAsync()
{
_logger.LogInformation("Cliente WebSocket conectado: {ConnectionId}",
Context.ConnectionId);
await Clients.Caller.SendAsync("Connected", new
{
ConnectionId = Context.ConnectionId,
Protocol = "WebSocket",
Message = "Conectado ao MCP Hub"
});
await base.OnConnectedAsync();
}
public async Task SendCommand(MCPCommand command)
{
_logger.LogInformation(
"Recebido comando WebSocket: {Command} de {ConnectionId}",
command.Command,
Context.ConnectionId);
// Enriquecer comando com dados do WebSocket
var enrichedCommand = command with
{
Protocol = ProtocolType.WebSocket,
SessionId = Context.ConnectionId
};
// Processar via kernel unificado
var response = await _kernelService.ExecuteCommandAsync(
enrichedCommand,
Context.ConnectionAborted);
// Enviar resposta
await Clients.Caller.SendAsync("CommandResponse", response);
}
public async Task StreamCommand(MCPCommand command)
{
_logger.LogInformation(
"Iniciando streaming WebSocket: {Command}",
command.Command);
var enrichedCommand = command with
{
Protocol = ProtocolType.WebSocket,
SessionId = Context.ConnectionId
};
await foreach (var chunk in _kernelService.StreamCommandAsync(
enrichedCommand,
Context.ConnectionAborted))
{
await Clients.Caller.SendAsync("StreamChunk", new
{
Content = chunk,
Timestamp = DateTime.UtcNow
});
}
await Clients.Caller.SendAsync("StreamComplete", new
{
Message = "Streaming concluído"
});
}
}
// Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSignalR();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
// Redis para event bus
builder.Services.AddSingleton<IConnectionMultiplexer>(
ConnectionMultiplexer.Connect("localhost:6379"));
builder.Services.AddSingleton<IEventBus, RedisEventBus>();
builder.Services.AddCors(options =>
{
options.AddPolicy("AllowAll", policy =>
policy.AllowAnyOrigin().AllowAnyMethod().AllowAnyHeader());
});
var app = builder.Build();
app.UseCors("AllowAll");
app.MapHub<MCPHub>("/mcphub");
app.MapGet("/", () => "MCP WebSocket Service");
app.Run();
8️⃣ API Gateway com YARP
// MCPPipeline.Gateway/Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddReverseProxy()
.LoadFromConfig(builder.Configuration.GetSection("ReverseProxy"));
var app = builder.Build();
app.MapReverseProxy();
app.MapGet("/", () => Results.Ok(new
{
Service = "MCP Gateway",
Endpoints = new
{
gRPC = "https://localhost:5001",
WebSocket = "https://localhost:5002/mcphub"
}
}));
app.Run();
// appsettings.json
{
"ReverseProxy": {
"Routes": {
"grpc-route": {
"ClusterId": "grpc-cluster",
"Match": {
"Path": "/grpc/{**catch-all}"
}
},
"websocket-route": {
"ClusterId": "websocket-cluster",
"Match": {
"Path": "/ws/{**catch-all}"
}
}
},
"Clusters": {
"grpc-cluster": {
"Destinations": {
"destination1": {
"Address": "https://localhost:5001"
}
}
},
"websocket-cluster": {
"Destinations": {
"destination1": {
"Address": "https://localhost:5002"
}
},
"HttpRequest": {
"Version": "1.1",
"VersionPolicy": "RequestVersionOrLower"
}
}
}
}
}
9️⃣ Cliente Unificado
// MCPPipeline.Client/UnifiedMCPClient.cs
public class UnifiedMCPClient : IAsyncDisposable
{
private readonly MCPService.MCPServiceClient? _grpcClient;
private readonly HubConnection? _hubConnection;
private readonly ILogger<UnifiedMCPClient> _logger;
private readonly bool _useGrpc;
public UnifiedMCPClient(
string endpoint,
bool useGrpc,
ILogger<UnifiedMCPClient> logger)
{
_useGrpc = useGrpc;
_logger = logger;
if (useGrpc)
{
var channel = GrpcChannel.ForAddress(endpoint);
_grpcClient = new MCPService.MCPServiceClient(channel);
_logger.LogInformation("Cliente configurado para gRPC: {Endpoint}", endpoint);
}
else
{
_hubConnection = new HubConnectionBuilder()
.WithUrl(endpoint)
.WithAutomaticReconnect()
.Build();
SetupWebSocketHandlers();
_logger.LogInformation("Cliente configurado para WebSocket: {Endpoint}", endpoint);
}
}
public async Task ConnectAsync(CancellationToken ct = default)
{
if (!_useGrpc && _hubConnection != null)
{
await _hubConnection.StartAsync(ct);
_logger.LogInformation("Conectado via WebSocket");
}
}
public async Task<MCPResponse> SendCommandAsync(
string command,
string payload,
CancellationToken ct = default)
{
if (_useGrpc && _grpcClient != null)
{
return await SendViaGrpcAsync(command, payload, ct);
}
else if (_hubConnection != null)
{
return await SendViaWebSocketAsync(command, payload, ct);
}
throw new InvalidOperationException("Cliente não inicializado");
}
private async Task<MCPResponse> SendViaGrpcAsync(
string command,
string payload,
CancellationToken ct)
{
_logger.LogInformation("Enviando comando via gRPC: {Command}", command);
var request = new MCPRequest
{
Command = command,
Payload = payload,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
var response = await _grpcClient!.SendCommandAsync(request, cancellationToken: ct);
return new MCPResponse
{
Result = response.Result,
Status = response.Status == "OK" ? ResponseStatus.Success : ResponseStatus.Error,
ErrorMessage = response.ErrorMessage,
ProcessingTimeMs = response.ProcessingTimeMs
};
}
private async Task<MCPResponse> SendViaWebSocketAsync(
string command,
string payload,
CancellationToken ct)
{
_logger.LogInformation("Enviando comando via WebSocket: {Command}", command);
var tcs = new TaskCompletionSource<MCPResponse>();
void handler(MCPResponse response)
{
tcs.TrySetResult(response);
}
_hubConnection!.On<MCPResponse>("CommandResponse", handler);
try
{
await _hubConnection.InvokeAsync("SendCommand", new MCPCommand
{
Command = command,
Payload = payload,
Timestamp = DateTime.UtcNow
}, ct);
return await tcs.Task.WaitAsync(TimeSpan.FromSeconds(30), ct);
}
finally
{
_hubConnection.Remove("CommandResponse");
}
}
public async IAsyncEnumerable<string> StreamCommandAsync(
string command,
string payload,
[EnumeratorCancellation] CancellationToken ct = default)
{
if (_useGrpc && _grpcClient != null)
{
await foreach (var chunk in StreamViaGrpcAsync(command, payload, ct))
{
yield return chunk;
}
}
else if (_hubConnection != null)
{
await foreach (var chunk in StreamViaWebSocketAsync(command, payload, ct))
{
yield return chunk;
}
}
}
private async IAsyncEnumerable<string> StreamViaGrpcAsync(
string command,
string payload,
[EnumeratorCancellation] CancellationToken ct)
{
var request = new MCPRequest
{
Command = command,
Payload = payload,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
using var call = _grpcClient!.StreamCommand(request, cancellationToken: ct);
await foreach (var response in call.ResponseStream.ReadAllAsync(ct))
{
yield return response.Content;
}
}
private async IAsyncEnumerable<string> StreamViaWebSocketAsync(
string command,
string payload,
[EnumeratorCancellation] CancellationToken ct)
{
var channel = Channel.CreateUnbounded<string>();
_hubConnection!.On<dynamic>("StreamChunk", (chunk) =>
{
channel.Writer.TryWrite(chunk.Content.ToString());
});
_hubConnection.On<dynamic>("StreamComplete", (_) =>
{
channel.Writer.Complete();
});
await _hubConnection.InvokeAsync("StreamCommand", new MCPCommand
{
Command = command,
Payload = payload,
Timestamp = DateTime.UtcNow
}, ct);
await foreach (var chunk in channel.Reader.ReadAllAsync(ct))
{
yield return chunk;
}
}
private void SetupWebSocketHandlers()
{
_hubConnection!.On<object>("Connected", data =>
{
_logger.LogInformation("✅ WebSocket conectado: {Data}", data);
});
_hubConnection.Reconnecting += error =>
{
_logger.LogWarning("⚠️ WebSocket reconectando: {Error}", error?.Message);
return Task.CompletedTask;
};
_hubConnection.Reconnected += connectionId =>
{
_logger.LogInformation("✅ WebSocket reconectado: {ConnectionId}", connectionId);
return Task.CompletedTask;
};
_hubConnection.Closed += error =>
{
_logger.LogError("❌ WebSocket fechado: {Error}", error?.Message);
return Task.CompletedTask;
};
}
public async ValueTask DisposeAsync()
{
if (_hubConnection != null)
{
await _hubConnection.StopAsync();
await _hubConnection.DisposeAsync();
}
}
}
// Exemplo de uso
public class Program
{
public static async Task Main(string[] args)
{
var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger<UnifiedMCPClient>();
// Cliente gRPC
await using var grpcClient = new UnifiedMCPClient(
"https://localhost:5001",
useGrpc: true,
logger);
var response = await grpcClient.SendCommandAsync("analyze", "Teste via gRPC");
Console.WriteLine($"gRPC: {response.Result}");
// Cliente WebSocket
await using var wsClient = new UnifiedMCPClient(
"https://localhost:5002/mcphub",
useGrpc: false,
logger);
await wsClient.ConnectAsync();
response = await wsClient.SendCommandAsync("analyze", "Teste via WebSocket");
Console.WriteLine($"WebSocket: {response.Result}");
// Streaming
await foreach (var chunk in wsClient.StreamCommandAsync("generate", "Prompt de teste"))
{
Console.WriteLine($"Chunk: {chunk}");
}
}
}
🔍 Observabilidade Unificada
OpenTelemetry Configuration
// MCPPipeline.Observability/OpenTelemetryExtensions.cs
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using OpenTelemetry.Metrics;
public static class OpenTelemetryExtensions
{
public static IServiceCollection AddMCPObservability(
this IServiceCollection services,
string serviceName)
{
services.AddOpenTelemetry()
.ConfigureResource(resource =>
{
resource.AddService(
serviceName: serviceName,
serviceVersion: "1.0.0");
})
.WithTracing(tracing =>
{
tracing
.AddAspNetCoreInstrumentation(options =>
{
options.RecordException = true;
options.EnrichWithHttpRequest = (activity, request) =>
{
activity.SetTag("http.scheme", request.Scheme);
activity.SetTag("http.client_ip", request.HttpContext.Connection.RemoteIpAddress);
};
})
.AddGrpcClientInstrumentation()
.AddSource("MCPPipeline.Core")
.AddSource("MCPPipeline.GrpcService")
.AddSource("MCPPipeline.WebSocketService")
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri("http://localhost:4317");
});
})
.WithMetrics(metrics =>
{
metrics
.AddAspNetCoreInstrumentation()
.AddRuntimeInstrumentation()
.AddMeter("MCPPipeline.*")
.AddPrometheusExporter()
.AddOtlpExporter();
});
return services;
}
}
// Uso nos serviços
builder.Services.AddMCPObservability("MCPPipeline.GrpcService");
Métricas Customizadas
// MCPPipeline.Observability/MCPMetrics.cs
using System.Diagnostics.Metrics;
public class MCPMetrics
{
private readonly Meter _meter;
private readonly Counter<long> _commandsProcessed;
private readonly Histogram<double> _commandDuration;
private readonly Counter<long> _commandErrors;
private readonly UpDownCounter<int> _activeConnections;
public MCPMetrics()
{
_meter = new Meter("MCPPipeline.Metrics", "1.0.0");
_commandsProcessed = _meter.CreateCounter<long>(
"mcp.commands.processed",
description: "Total de comandos processados");
_commandDuration = _meter.CreateHistogram<double>(
"mcp.command.duration",
unit: "ms",
description: "Duração do processamento de comandos");
_commandErrors = _meter.CreateCounter<long>(
"mcp.commands.errors",
description: "Total de erros no processamento");
_activeConnections = _meter.CreateUpDownCounter<int>(
"mcp.connections.active",
description: "Conexões ativas");
}
public void RecordCommandProcessed(string command, ProtocolType protocol)
{
_commandsProcessed.Add(1,
new KeyValuePair<string, object?>("command", command),
new KeyValuePair<string, object?>("protocol", protocol.ToString()));
}
public void RecordCommandDuration(string command, ProtocolType protocol, double durationMs)
{
_commandDuration.Record(durationMs,
new KeyValuePair<string, object?>("command", command),
new KeyValuePair<string, object?>("protocol", protocol.ToString()));
}
public void RecordCommandError(string command, ProtocolType protocol, string errorType)
{
_commandErrors.Add(1,
new KeyValuePair<string, object?>("command", command),
new KeyValuePair<string, object?>("protocol", protocol.ToString()),
new KeyValuePair<string, object?>("error_type", errorType));
}
public void IncrementActiveConnections(ProtocolType protocol)
{
_activeConnections.Add(1,
new KeyValuePair<string, object?>("protocol", protocol.ToString()));
}
public void DecrementActiveConnections(ProtocolType protocol)
{
_activeConnections.Add(-1,
new KeyValuePair<string, object?>("protocol", protocol.ToString()));
}
}
📊 Dashboard com Grafana
Prometheus Configuration
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'mcp-grpc-service'
static_configs:
- targets: ['localhost:5001']
- job_name: 'mcp-websocket-service'
static_configs:
- targets: ['localhost:5002']
- job_name: 'mcp-gateway'
static_configs:
- targets: ['localhost:5000']
Grafana Dashboard JSON
{
"dashboard": {
"title": "MCP Hybrid Protocol Dashboard",
"panels": [
{
"title": "Comandos Processados por Protocolo",
"targets": [
{
"expr": "rate(mcp_commands_processed_total[5m])",
"legendFormat": "{{protocol}}"
}
]
},
{
"title": "Latência Média por Comando",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(mcp_command_duration_bucket[5m]))",
"legendFormat": "p95 - {{command}}"
}
]
},
{
"title": "Conexões Ativas",
"targets": [
{
"expr": "mcp_connections_active",
"legendFormat": "{{protocol}}"
}
]
},
{
"title": "Taxa de Erro",
"targets": [
{
"expr": "rate(mcp_commands_errors_total[5m])",
"legendFormat": "{{protocol}} - {{error_type}}"
}
]
}
]
}
}
🧪 Testes de Integração
Teste Híbrido
// MCPPipeline.IntegrationTests/HybridProtocolTests.cs
public class HybridProtocolTests : IClassFixture<MCPTestFixture>
{
private readonly MCPTestFixture _fixture;
public HybridProtocolTests(MCPTestFixture fixture)
{
_fixture = fixture;
}
[Fact]
public async Task Should_Process_Same_Command_Via_Both_Protocols()
{
// Arrange
var grpcClient = _fixture.CreateGrpcClient();
var wsClient = _fixture.CreateWebSocketClient();
await wsClient.ConnectAsync();
var testPayload = "Teste de integração híbrida";
// Act
var grpcResponse = await grpcClient.SendCommandAsync("analyze", testPayload);
var wsResponse = await wsClient.SendCommandAsync("analyze", testPayload);
// Assert
Assert.Equal(ResponseStatus.Success, grpcResponse.Status);
Assert.Equal(ResponseStatus.Success, wsResponse.Status);
// Ambos devem retornar resultados equivalentes
Assert.Contains("palavras", grpcResponse.Result);
Assert.Contains("palavras", wsResponse.Result);
}
[Fact]
public async Task Should_Handle_Concurrent_Requests_From_Both_Protocols()
{
// Arrange
var grpcClient = _fixture.CreateGrpcClient();
var wsClient = _fixture.CreateWebSocketClient();
await wsClient.ConnectAsync();
var tasks = new List<Task<MCPResponse>>();
// Act - Enviar 50 requisições concorrentes de cada protocolo
for (int i = 0; i < 50; i++)
{
tasks.Add(grpcClient.SendCommandAsync("status", $"gRPC-{i}"));
tasks.Add(wsClient.SendCommandAsync("status", $"WS-{i}"));
}
var responses = await Task.WhenAll(tasks);
// Assert
Assert.All(responses, r => Assert.Equal(ResponseStatus.Success, r.Status));
Assert.Equal(100, responses.Length);
}
[Fact]
public async Task Should_Broadcast_Events_Between_Protocols()
{
// Arrange
var grpcClient = _fixture.CreateGrpcClient();
var wsClient = _fixture.CreateWebSocketClient();
await wsClient.ConnectAsync();
var eventReceived = new TaskCompletionSource<bool>();
// Configurar listener no WebSocket
// (implementação depende do event bus)
// Act - Enviar comando via gRPC
await grpcClient.SendCommandAsync("generate", "Teste de evento");
// Assert - WebSocket deve receber notificação
var received = await eventReceived.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.True(received);
}
}
public class MCPTestFixture : IAsyncLifetime
{
private WebApplication? _grpcApp;
private WebApplication? _wsApp;
public async Task InitializeAsync()
{
// Inicializar serviços de teste
_grpcApp = CreateGrpcService();
_wsApp = CreateWebSocketService();
await _grpcApp.StartAsync();
await _wsApp.StartAsync();
}
public async Task DisposeAsync()
{
if (_grpcApp != null) await _grpcApp.StopAsync();
if (_wsApp != null) await _wsApp.StopAsync();
}
public UnifiedMCPClient CreateGrpcClient()
{
return new UnifiedMCPClient(
"https://localhost:5001",
useGrpc: true,
Mock.Of<ILogger<UnifiedMCPClient>>());
}
public UnifiedMCPClient CreateWebSocketClient()
{
return new UnifiedMCPClient(
"https://localhost:5002/mcphub",
useGrpc: false,
Mock.Of<ILogger<UnifiedMCPClient>>());
}
private WebApplication CreateGrpcService()
{
var builder = WebApplication.CreateBuilder();
builder.Services.AddGrpc();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>();
var app = builder.Build();
app.MapGrpcService<MCPGrpcServiceImpl>();
return app;
}
private WebApplication CreateWebSocketService()
{
var builder = WebApplication.CreateBuilder();
builder.Services.AddSignalR();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
var app = builder.Build();
app.MapHub<MCPHub>("/mcphub");
return app;
}
}
🚀 Teste de Performance
Benchmark com BenchmarkDotNet
// MCPPipeline.Benchmarks/ProtocolBenchmarks.cs
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
[MemoryDiagnoser]
[SimpleJob(warmupCount: 3, iterationCount: 10)]
public class ProtocolBenchmarks
{
private UnifiedMCPClient _grpcClient = null!;
private UnifiedMCPClient _wsClient = null!;
[GlobalSetup]
public async Task Setup()
{
_grpcClient = new UnifiedMCPClient(
"https://localhost:5001",
useGrpc: true,
Mock.Of<ILogger<UnifiedMCPClient>>());
_wsClient = new UnifiedMCPClient(
"https://localhost:5002/mcphub",
useGrpc: false,
Mock.Of<ILogger<UnifiedMCPClient>>());
await _wsClient.ConnectAsync();
}
[Benchmark(Baseline = true)]
public async Task<MCPResponse> GrpcCommand()
{
return await _grpcClient.SendCommandAsync("status", "benchmark");
}
[Benchmark]
public async Task<MCPResponse> WebSocketCommand()
{
return await _wsClient.SendCommandAsync("status", "benchmark");
}
[Benchmark]
public async Task GrpcStreaming()
{
await foreach (var _ in _grpcClient.StreamCommandAsync("generate", "test"))
{
// Consumir stream
}
}
[Benchmark]
public async Task WebSocketStreaming()
{
await foreach (var _ in _wsClient.StreamCommandAsync("generate", "test"))
{
// Consumir stream
}
}
[GlobalCleanup]
public async Task Cleanup()
{
await _grpcClient.DisposeAsync();
await _wsClient.DisposeAsync();
}
}
// Program.cs
public class Program
{
public static void Main(string[] args)
{
BenchmarkRunner.Run<ProtocolBenchmarks>();
}
}
Resultados Esperados
| Method | Mean | Error | StdDev | Ratio | Gen0 | Allocated |
|------------------ |----------:|----------:|----------:|------:|-----:|----------:|
| GrpcCommand | 2.450 ms | 0.0421 ms | 0.0394 ms | 1.00 | 15.6 | 128KB |
| WebSocketCommand | 8.732 ms | 0.1523 ms | 0.1425 ms | 3.56 | 31.2 | 256KB |
| GrpcStreaming | 12.105 ms | 0.2103 ms | 0.1967 ms | 4.94 | 46.8 | 384KB |
| WebSocketStreaming| 18.421 ms | 0.3142 ms | 0.2940 ms | 7.52 | 62.5 | 512KB |
🎯 Decisão de Protocolo: Quando Usar Cada Um
Matriz de Decisão
public class ProtocolSelector
{
public ProtocolType SelectOptimalProtocol(ClientContext context)
{
// 1. Cliente é navegador? → WebSocket
if (context.IsWebBrowser)
return ProtocolType.WebSocket;
// 2. Comunicação interna entre serviços? → gRPC
if (context.IsInternalService)
return ProtocolType.Grpc;
// 3. Requer streaming bidirecional de longa duração? → WebSocket
if (context.RequiresLongLivedStream)
return ProtocolType.WebSocket;
// 4. Prioriza latência mínima? → gRPC
if (context.LatencySensitive)
return ProtocolType.Grpc;
// 5. Múltiplos clientes precisam receber broadcasts? → WebSocket
if (context.RequiresBroadcast)
return ProtocolType.WebSocket;
// 6. Padrão: gRPC para performance
return ProtocolType.Grpc;
}
}
public record ClientContext
{
public bool IsWebBrowser { get; init; }
public bool IsInternalService { get; init; }
public bool RequiresLongLivedStream { get; init; }
public bool LatencySensitive { get; init; }
public bool RequiresBroadcast { get; init; }
}
Tabela Comparativa
Cenário | gRPC | WebSocket | Recomendação |
---|---|---|---|
API Backend → Backend | ✅ Ideal | ⚠️ Overhead | gRPC |
Aplicação Web → Backend | ❌ Limitado | ✅ Nativo | WebSocket |
Mobile App → Backend | ✅ Excelente | ✅ Bom | gRPC (melhor performance) |
Dashboard Real-time | ⚠️ Polling | ✅ Push nativo | WebSocket |
Chat Multi-usuário | ❌ Não ideal | ✅ Perfeito | WebSocket |
Análise de Dados | ✅ Alta performance | ⚠️ Mais lento | gRPC |
Streaming de IA | ✅ Eficiente | ✅ Funcional | Ambos (contexto dependente) |
🔒 Segurança em Ambiente Híbrido
Autenticação Unificada
// MCPPipeline.Security/UnifiedAuthenticationExtensions.cs
public static class UnifiedAuthenticationExtensions
{
public static IServiceCollection AddUnifiedAuthentication(
this IServiceCollection services,
IConfiguration configuration)
{
services.AddAuthentication(options =>
{
options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
})
.AddJwtBearer(options =>
{
options.TokenValidationParameters = new TokenValidationParameters
{
ValidateIssuer = true,
ValidateAudience = true,
ValidateLifetime = true,
ValidateIssuerSigningKey = true,
ValidIssuer = configuration["Jwt:Issuer"],
ValidAudience = configuration["Jwt:Audience"],
IssuerSigningKey = new SymmetricSecurityKey(
Encoding.UTF8.GetBytes(configuration["Jwt:Key"]!))
};
// Suporte para WebSocket token via query string
options.Events = new JwtBearerEvents
{
OnMessageReceived = context =>
{
var accessToken = context.Request.Query["access_token"];
var path = context.HttpContext.Request.Path;
if (!string.IsNullOrEmpty(accessToken) &&
path.StartsWithSegments("/mcphub"))
{
context.Token = accessToken;
}
return Task.CompletedTask;
}
};
});
// Adicionar metadata para gRPC
services.AddGrpc(options =>
{
options.Interceptors.Add<AuthenticationInterceptor>();
});
return services;
}
}
// Interceptor para gRPC
public class AuthenticationInterceptor : Interceptor
{
private readonly ILogger<AuthenticationInterceptor> _logger;
public AuthenticationInterceptor(ILogger<AuthenticationInterceptor> logger)
{
_logger = logger;
}
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request,
ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
var authHeader = context.RequestHeaders.GetValue("authorization");
if (string.IsNullOrEmpty(authHeader))
{
_logger.LogWarning("Requisição gRPC sem autenticação
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request,
ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
var authHeader = context.RequestHeaders.GetValue("authorization");
if (string.IsNullOrEmpty(authHeader))
{
_logger.LogWarning("Requisição gRPC sem autenticação");
throw new RpcException(new Status(StatusCode.Unauthenticated, "Token não fornecido"));
}
// Validar token (implementar lógica de validação)
_logger.LogInformation("Token gRPC validado para peer: {Peer}", context.Peer);
return await continuation(request, context);
}
}
Rate Limiting Unificado
// MCPPipeline.Security/RateLimitingExtensions.cs
using System.Threading.RateLimiting;
public static class RateLimitingExtensions
{
public static IServiceCollection AddUnifiedRateLimiting(
this IServiceCollection services)
{
services.AddRateLimiter(options =>
{
// Policy para gRPC
options.AddFixedWindowLimiter("grpc", limiterOptions =>
{
limiterOptions.PermitLimit = 100;
limiterOptions.Window = TimeSpan.FromMinutes(1);
limiterOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
limiterOptions.QueueLimit = 10;
});
// Policy para WebSocket
options.AddSlidingWindowLimiter("websocket", limiterOptions =>
{
limiterOptions.PermitLimit = 60;
limiterOptions.Window = TimeSpan.FromMinutes(1);
limiterOptions.SegmentsPerWindow = 4;
limiterOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
limiterOptions.QueueLimit = 5;
});
options.OnRejected = async (context, token) =>
{
context.HttpContext.Response.StatusCode = 429;
await context.HttpContext.Response.WriteAsJsonAsync(new
{
error = "Rate limit exceeded",
retryAfter = context.Lease.TryGetMetadata(
MetadataName.RetryAfter, out var retryAfter)
? retryAfter.TotalSeconds
: null
}, token);
};
});
return services;
}
}
🏗️ Docker Compose para Ambiente Completo
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes
volumes:
- redis-data:/data
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # UI
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
environment:
- COLLECTOR_OTLP_ENABLED=true
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_USERS_ALLOW_SIGN_UP=false
mcp-grpc-service:
build:
context: .
dockerfile: src/MCPPipeline.GrpcService/Dockerfile
ports:
- "5001:5001"
environment:
- ASPNETCORE_ENVIRONMENT=Development
- Redis__ConnectionString=redis:6379
- Jaeger__AgentHost=jaeger
- Jaeger__AgentPort=4317
depends_on:
- redis
- jaeger
mcp-websocket-service:
build:
context: .
dockerfile: src/MCPPipeline.WebSocketService/Dockerfile
ports:
- "5002:5002"
environment:
- ASPNETCORE_ENVIRONMENT=Development
- Redis__ConnectionString=redis:6379
- Jaeger__AgentHost=jaeger
- Jaeger__AgentPort=4317
depends_on:
- redis
- jaeger
mcp-gateway:
build:
context: .
dockerfile: src/MCPPipeline.Gateway/Dockerfile
ports:
- "5000:5000"
environment:
- ASPNETCORE_ENVIRONMENT=Development
depends_on:
- mcp-grpc-service
- mcp-websocket-service
volumes:
redis-data:
prometheus-data:
grafana-data:
📈 Estratégias de Escalabilidade
Kubernetes Deployment
# k8s/mcp-deployment.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: mcp-config
data:
redis-connection: "redis-service:6379"
jaeger-endpoint: "http://jaeger-collector:4317"
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-grpc-service
spec:
replicas: 3
selector:
matchLabels:
app: mcp-grpc
template:
metadata:
labels:
app: mcp-grpc
spec:
containers:
- name: mcp-grpc
image: mcp-grpc-service:latest# 🧩 Minha Primeira Comunicação com MCP e .NET – Parte 4
Integração Completa com gRPC & WebSocket
Nesta quarta parte da série "Minha Primeira Comunicação com MCP e .NET", exploramos como criar uma arquitetura híbrida que combina o melhor dos dois mundos: a eficiência e tipagem forte do gRPC para comunicação interna entre serviços, e a flexibilidade e suporte nativo do WebSocket para clientes web e mobile, criando um ecossistema MCP robusto e escalável.
🚀 Introdução
Após explorarmos gRPC (Parte 2) e WebSocket (Parte 3) individualmente, surge uma questão arquitetural importante: e se precisarmos dos benefícios de ambos?
Em sistemas corporativos modernos, é comum ter:
- Microsserviços backend que se comunicam via gRPC (baixa latência, tipagem forte)
- Clientes web/mobile que precisam de WebSocket (suporte nativo, real-time)
- Agentes MCP que podem usar ambos os protocolos conforme o contexto
Este artigo demonstra como construir uma arquitetura unificada que oferece ambos os protocolos, com roteamento inteligente, adaptadores de protocolo, e observabilidade centralizada.
🧠 Arquitetura Híbrida: O Melhor dos Dois Mundos
Visão Geral da Arquitetura
┌─────────────────────────────────────────────────────────────────┐
│ API Gateway / BFF │
│ (Protocol Orchestrator) │
└──────────────────┬─────────────────────┬────────────────────────┘
│ │
┌─────────▼──────────┐ ┌───────▼──────────┐
│ gRPC Endpoint │ │ WebSocket Endpoint│
│ (Port 5001) │ │ (Port 5002) │
└─────────┬──────────┘ └───────┬───────────┘
│ │
│ ┌────────────────┴─────────────────┐
│ │ │
┌─────────▼────▼──────────────┐ ┌─────────────▼──────┐
│ Protocol Adapter Layer │ │ SignalR Hub Layer │
│ (gRPC ↔ WebSocket) │ │ (Client Manager) │
└─────────┬──────────────────┘ └─────────┬────────────┘
│ │
┌─────────▼──────────────────────────────────▼────────┐
│ MCP Kernel Orchestrator │
│ (Unified Command Processing) │
└─────────┬──────────────────────────┬─────────────────┘
│ │
┌─────────▼────────┐ ┌─────────▼──────────┐
│ Domain Services │ │ Event Bus (Redis) │
│ (Business Logic)│ │ (Cross-Protocol) │
└──────────────────┘ └────────────────────┘
Componentes Principais
- Protocol Orchestrator - Decide qual protocolo usar baseado em contexto
- Protocol Adapter Layer - Converte mensagens entre gRPC e WebSocket
- Unified MCP Kernel - Processa comandos independente do protocolo
- Event Bus - Sincroniza eventos entre instâncias e protocolos
- Observability Layer - Tracing unificado para ambos os protocolos
🏗️ Implementação da Arquitetura Híbrida
1️⃣ Estrutura do Projeto
MCPPipeline.Hybrid/
├── src/
│ ├── MCPPipeline.Contracts/ # Modelos compartilhados
│ │ ├── Messages/
│ │ │ ├── MCPCommand.cs
│ │ │ ├── MCPResponse.cs
│ │ │ └── MCPEvent.cs
│ │ └── Protos/
│ │ └── mcp.proto
│ │
│ ├── MCPPipeline.Core/ # Lógica de negócio
│ │ ├── Services/
│ │ │ ├── IMCPKernelService.cs
│ │ │ └── MCPKernelService.cs
│ │ └── Events/
│ │ └── IEventBus.cs
│ │
│ ├── MCPPipeline.GrpcService/ # Endpoint gRPC
│ │ ├── Services/
│ │ │ └── MCPGrpcService.cs
│ │ └── Program.cs
│ │
│ ├── MCPPipeline.WebSocketService/ # Endpoint WebSocket
│ │ ├── Hubs/
│ │ │ └── MCPHub.cs
│ │ └── Program.cs
│ │
│ ├── MCPPipeline.Gateway/ # API Gateway (YARP)
│ │ ├── Configuration/
│ │ └── Program.cs
│ │
│ └── MCPPipeline.Adapter/ # Protocol Adapter
│ ├── GrpcToWebSocketAdapter.cs
│ └── WebSocketToGrpcAdapter.cs
│
└── tests/
└── MCPPipeline.IntegrationTests/
2️⃣ Contratos Compartilhados
// MCPPipeline.Contracts/Messages/MCPCommand.cs
namespace MCPPipeline.Contracts.Messages;
public record MCPCommand
{
public string CommandId { get; init; } = Guid.NewGuid().ToString();
public string Command { get; init; } = string.Empty;
public string Payload { get; init; } = string.Empty;
public Dictionary<string, string> Metadata { get; init; } = new();
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
public string SessionId { get; init; } = string.Empty;
public ProtocolType Protocol { get; init; }
public int Priority { get; init; } = 0;
}
public record MCPResponse
{
public string CommandId { get; init; } = string.Empty;
public string Result { get; init; } = string.Empty;
public ResponseStatus Status { get; init; }
public string? ErrorMessage { get; init; }
public long ProcessingTimeMs { get; init; }
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
public Dictionary<string, object> Metrics { get; init; } = new();
}
public record MCPEvent
{
public string EventId { get; init; } = Guid.NewGuid().ToString();
public string EventType { get; init; } = string.Empty;
public string Source { get; init; } = string.Empty;
public object Data { get; init; } = new();
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
}
public enum ProtocolType
{
Unknown = 0,
Grpc = 1,
WebSocket = 2,
Http = 3
}
public enum ResponseStatus
{
Success,
Error,
Processing,
Timeout
}
3️⃣ MCP Kernel Unificado
// MCPPipeline.Core/Services/MCPKernelService.cs
using MCPPipeline.Contracts.Messages;
using System.Diagnostics;
namespace MCPPipeline.Core.Services;
public interface IMCPKernelService
{
Task<MCPResponse> ExecuteCommandAsync(MCPCommand command, CancellationToken ct);
IAsyncEnumerable<string> StreamCommandAsync(MCPCommand command, CancellationToken ct);
}
public class MCPKernelService : IMCPKernelService
{
private readonly ILogger<MCPKernelService> _logger;
private readonly IEventBus _eventBus;
private static readonly ActivitySource ActivitySource = new("MCPPipeline.Core");
public MCPKernelService(
ILogger<MCPKernelService> logger,
IEventBus eventBus)
{
_logger = logger;
_eventBus = eventBus;
}
public async Task<MCPResponse> ExecuteCommandAsync(
MCPCommand command,
CancellationToken ct)
{
using var activity = ActivitySource.StartActivity("ExecuteCommand");
activity?.SetTag("command.id", command.CommandId);
activity?.SetTag("command.type", command.Command);
activity?.SetTag("protocol", command.Protocol.ToString());
var sw = Stopwatch.StartNew();
try
{
_logger.LogInformation(
"Executando comando {Command} via {Protocol} | Session: {SessionId}",
command.Command,
command.Protocol,
command.SessionId);
// Publicar evento de início
await _eventBus.PublishAsync(new MCPEvent
{
EventType = "CommandStarted",
Source = command.Protocol.ToString(),
Data = new { command.CommandId, command.Command }
});
// Processar comando
var result = await ProcessCommandAsync(command, ct);
sw.Stop();
var response = new MCPResponse
{
CommandId = command.CommandId,
Result = result,
Status = ResponseStatus.Success,
ProcessingTimeMs = sw.ElapsedMilliseconds,
Metrics = new Dictionary<string, object>
{
["protocol"] = command.Protocol.ToString(),
["priority"] = command.Priority
}
};
// Publicar evento de conclusão
await _eventBus.PublishAsync(new MCPEvent
{
EventType = "CommandCompleted",
Source = command.Protocol.ToString(),
Data = new { command.CommandId, response.ProcessingTimeMs }
});
activity?.SetTag("response.status", "success");
return response;
}
catch (Exception ex)
{
sw.Stop();
_logger.LogError(ex,
"Erro ao executar comando {Command} via {Protocol}",
command.Command,
command.Protocol);
activity?.SetTag("response.status", "error");
activity?.SetTag("error.message", ex.Message);
await _eventBus.PublishAsync(new MCPEvent
{
EventType = "CommandFailed",
Source = command.Protocol.ToString(),
Data = new { command.CommandId, Error = ex.Message }
});
return new MCPResponse
{
CommandId = command.CommandId,
Status = ResponseStatus.Error,
ErrorMessage = ex.Message,
ProcessingTimeMs = sw.ElapsedMilliseconds
};
}
}
public async IAsyncEnumerable<string> StreamCommandAsync(
MCPCommand command,
[EnumeratorCancellation] CancellationToken ct)
{
using var activity = ActivitySource.StartActivity("StreamCommand");
activity?.SetTag("command.id", command.CommandId);
_logger.LogInformation(
"Iniciando streaming para comando {Command} via {Protocol}",
command.Command,
command.Protocol);
var chunks = await GenerateStreamChunksAsync(command.Payload, ct);
for (int i = 0; i < chunks.Count; i++)
{
if (ct.IsCancellationRequested)
yield break;
yield return chunks[i];
await Task.Delay(50, ct); // Simula processamento
}
}
private async Task<string> ProcessCommandAsync(MCPCommand command, CancellationToken ct)
{
return command.Command.ToLowerInvariant() switch
{
"analyze" => await AnalyzeAsync(command.Payload, ct),
"summarize" => await SummarizeAsync(command.Payload, ct),
"translate" => await TranslateAsync(command.Payload, ct),
"generate" => await GenerateAsync(command.Payload, ct),
"status" => await GetStatusAsync(ct),
_ => throw new InvalidOperationException($"Comando desconhecido: {command.Command}")
};
}
private async Task<string> AnalyzeAsync(string payload, CancellationToken ct)
{
await Task.Delay(200, ct);
var words = payload.Split(' ', StringSplitOptions.RemoveEmptyEntries).Length;
return $"Análise: {words} palavras, {payload.Length} caracteres";
}
private async Task<string> SummarizeAsync(string payload, CancellationToken ct)
{
await Task.Delay(300, ct);
return payload.Length > 100
? $"Resumo: {payload[..100]}..."
: $"Resumo: {payload}";
}
private async Task<string> TranslateAsync(string payload, CancellationToken ct)
{
await Task.Delay(250, ct);
return $"[Traduzido] {payload}";
}
private async Task<string> GenerateAsync(string payload, CancellationToken ct)
{
await Task.Delay(500, ct);
return $"Conteúdo gerado baseado em: {payload}";
}
private async Task<string> GetStatusAsync(CancellationToken ct)
{
await Task.Delay(50, ct);
return $"Sistema operacional | Timestamp: {DateTime.UtcNow:O}";
}
private async Task<List<string>> GenerateStreamChunksAsync(string prompt, CancellationToken ct)
{
await Task.Delay(100, ct);
return new List<string>
{
"Iniciando processamento...",
"Analisando contexto...",
"Gerando resposta...",
$"Resultado: {prompt}",
"Processamento concluído."
};
}
}
4️⃣ Event Bus com Redis
// MCPPipeline.Core/Events/IEventBus.cs
namespace MCPPipeline.Core.Events;
public interface IEventBus
{
Task PublishAsync<T>(T @event) where T : class;
Task SubscribeAsync<T>(Func<T, Task> handler) where T : class;
}
// MCPPipeline.Core/Events/RedisEventBus.cs
using StackExchange.Redis;
using System.Text.Json;
public class RedisEventBus : IEventBus
{
private readonly IConnectionMultiplexer _redis;
private readonly ILogger<RedisEventBus> _logger;
private readonly ISubscriber _subscriber;
public RedisEventBus(
IConnectionMultiplexer redis,
ILogger<RedisEventBus> logger)
{
_redis = redis;
_logger = logger;
_subscriber = redis.GetSubscriber();
}
public async Task PublishAsync<T>(T @event) where T : class
{
var channel = typeof(T).Name;
var message = JsonSerializer.Serialize(@event);
await _subscriber.PublishAsync(channel, message);
_logger.LogDebug("Evento publicado: {EventType}", channel);
}
public async Task SubscribeAsync<T>(Func<T, Task> handler) where T : class
{
var channel = typeof(T).Name;
await _subscriber.SubscribeAsync(channel, async (ch, message) =>
{
try
{
var @event = JsonSerializer.Deserialize<T>(message!);
if (@event != null)
{
await handler(@event);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Erro ao processar evento {EventType}", channel);
}
});
_logger.LogInformation("Inscrito no canal: {Channel}", channel);
}
}
5️⃣ Protocol Adapter
// MCPPipeline.Adapter/ProtocolAdapter.cs
using MCPPipeline.Contracts.Messages;
using MCPPipeline.Grpc;
namespace MCPPipeline.Adapter;
public interface IProtocolAdapter
{
MCPCommand FromGrpcRequest(MCPRequest grpcRequest);
MCPRequest ToGrpcRequest(MCPCommand command);
MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse);
MCPGrpcResponse ToGrpcResponse(MCPResponse response);
}
public class ProtocolAdapter : IProtocolAdapter
{
public MCPCommand FromGrpcRequest(MCPRequest grpcRequest)
{
return new MCPCommand
{
Command = grpcRequest.Command,
Payload = grpcRequest.Payload,
Metadata = grpcRequest.Metadata.ToDictionary(k => k.Key, v => v.Value),
Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(grpcRequest.Timestamp).DateTime,
Protocol = ProtocolType.Grpc
};
}
public MCPRequest ToGrpcRequest(MCPCommand command)
{
return new MCPRequest
{
Command = command.Command,
Payload = command.Payload,
Metadata = { command.Metadata },
Timestamp = new DateTimeOffset(command.Timestamp).ToUnixTimeMilliseconds()
};
}
public MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse)
{
return new MCPResponse
{
Result = grpcResponse.Result,
Status = grpcResponse.Status switch
{
"OK" => ResponseStatus.Success,
"ERROR" => ResponseStatus.Error,
"PROCESSING" => ResponseStatus.Processing,
_ => ResponseStatus.Error
},
ErrorMessage = grpcResponse.ErrorMessage,
ProcessingTimeMs = grpcResponse.ProcessingTimeMs
};
}
public MCPGrpcResponse ToGrpcResponse(MCPResponse response)
{
return new MCPGrpcResponse
{
Result = response.Result,
Status = response.Status.ToString().ToUpperInvariant(),
ErrorMessage = response.ErrorMessage ?? string.Empty,
ProcessingTimeMs = response.ProcessingTimeMs
};
}
}
6️⃣ gRPC Service Endpoint
// MCPPipeline.GrpcService/Services/MCPGrpcService.cs
using Grpc.Core;
using MCPPipeline.Grpc;
using MCPPipeline.Core.Services;
using MCPPipeline.Adapter;
namespace MCPPipeline.GrpcService.Services;
public class MCPGrpcServiceImpl : MCPService.MCPServiceBase
{
private readonly IMCPKernelService _kernelService;
private readonly IProtocolAdapter _adapter;
private readonly ILogger<MCPGrpcServiceImpl> _logger;
public MCPGrpcServiceImpl(
IMCPKernelService kernelService,
IProtocolAdapter adapter,
ILogger<MCPGrpcServiceImpl> logger)
{
_kernelService = kernelService;
_adapter = adapter;
_logger = logger;
}
public override async Task<MCPGrpcResponse> SendCommand(
MCPRequest request,
ServerCallContext context)
{
_logger.LogInformation(
"Recebido comando gRPC: {Command} de {Peer}",
request.Command,
context.Peer);
// Converter para modelo unificado
var command = _adapter.FromGrpcRequest(request);
command = command with { SessionId = context.Peer };
// Processar via kernel unificado
var response = await _kernelService.ExecuteCommandAsync(
command,
context.CancellationToken);
// Converter resposta
return _adapter.ToGrpcResponse(response);
}
public override async Task StreamCommand(
MCPRequest request,
IServerStreamWriter<MCPStreamResponse> responseStream,
ServerCallContext context)
{
_logger.LogInformation(
"Iniciando streaming gRPC: {Command}",
request.Command);
var command = _adapter.FromGrpcRequest(request);
int chunkIndex = 0;
await foreach (var chunk in _kernelService.StreamCommandAsync(
command,
context.CancellationToken))
{
await responseStream.WriteAsync(new MCPStreamResponse
{
Content = chunk,
ChunkIndex = chunkIndex++,
IsComplete = false
});
}
// Enviar chunk final
await responseStream.WriteAsync(new MCPStreamResponse
{
Content = "Stream concluído",
ChunkIndex = chunkIndex,
IsComplete = true
});
}
public override Task<HealthResponse> HealthCheck(
HealthRequest request,
ServerCallContext context)
{
return Task.FromResult(new HealthResponse
{
Status = "Healthy",
Version = "1.0.0",
Protocol = "gRPC"
});
}
}
// Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>();
// Redis para event bus
builder.Services.AddSingleton<IConnectionMultiplexer>(
ConnectionMultiplexer.Connect("localhost:6379"));
builder.Services.AddSingleton<IEventBus, RedisEventBus>();
var app = builder.Build();
app.MapGrpcService<MCPGrpcServiceImpl>();
app.MapGet("/", () => "MCP gRPC Service");
app.Run();
7️⃣ WebSocket Service Endpoint
// MCPPipeline.WebSocketService/Hubs/MCPHub.cs
using Microsoft.AspNetCore.SignalR;
using MCPPipeline.Contracts.Messages;
using MCPPipeline.Core.Services;
namespace MCPPipeline.WebSocketService.Hubs;
public class MCPHub : Hub
{
private readonly IMCPKernelService _kernelService;
private readonly ILogger<MCPHub> _logger;
public MCPHub(
IMCPKernelService kernelService,
ILogger<MCPHub> logger)
{
_kernelService = kernelService;
_logger = logger;
}
public override async Task OnConnectedAsync()
{
_logger.LogInformation("Cliente WebSocket conectado: {ConnectionId}",
Context.ConnectionId);
await Clients.Caller.SendAsync("Connected", new
{
ConnectionId = Context.ConnectionId,
Protocol = "WebSocket",
Message = "Conectado ao MCP Hub"
});
await base.OnConnectedAsync();
}
public async Task SendCommand(MCPCommand command)
{
_logger.LogInformation(
"Recebido comando WebSocket: {Command} de {ConnectionId}",
command.Command,
Context.ConnectionId);
// Enriquecer comando com dados do WebSocket
var enrichedCommand = command with
{
Protocol = ProtocolType.WebSocket,
SessionId = Context.ConnectionId
};
// Processar via kernel unificado
var response = await _kernelService.ExecuteCommandAsync(
enrichedCommand,
Context.ConnectionAborted);
// Enviar resposta
await Clients.Caller.SendAsync("CommandResponse", response);
}
public async Task StreamCommand(MCPCommand command)
{
_logger.LogInformation(
"Iniciando streaming WebSocket: {Command}",
command.Command);
var enrichedCommand = command with
{
Protocol = ProtocolType.WebSocket,
SessionId = Context.ConnectionId
};
await foreach (var chunk in _kernelService.StreamCommandAsync(
enrichedCommand,
Context.ConnectionAborted))
{
await Clients.Caller.SendAsync("StreamChunk", new
{
Content = chunk,
Timestamp = DateTime.UtcNow
});
}
await Clients.Caller.SendAsync("StreamComplete", new
{
Message = "Streaming concluído"
});
}
}
// Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSignalR();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
// Redis para event bus
builder.Services.AddSingleton<IConnectionMultiplexer>(
ConnectionMultiplexer.Connect("localhost:6379"));
builder.Services.AddSingleton<IEventBus, RedisEventBus>();
builder.Services.AddCors(options =>
{
options.AddPolicy("AllowAll", policy =>
policy.AllowAnyOrigin().AllowAnyMethod().AllowAnyHeader());
});
var app = builder.Build();
app.UseCors("AllowAll");
app.MapHub<MCPHub>("/mcphub");
app.MapGet("/", () => "MCP WebSocket Service");
app.Run();
8️⃣ API Gateway com YARP
// MCPPipeline.Gateway/Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddReverseProxy()
.LoadFromConfig(builder.Configuration.GetSection("ReverseProxy"));
var app = builder.Build();
app.MapReverseProxy();
app.MapGet("/", () => Results.Ok(new
{
Service = "MCP Gateway",
Endpoints = new
{
gRPC = "https://localhost:5001",
WebSocket = "https://localhost:5002/mcphub"
}
}));
app.Run();
// appsettings.json
{
"ReverseProxy": {
"Routes": {
"grpc-route": {
"ClusterId": "grpc-cluster",
"Match": {
"Path": "/grpc/{**catch-all}"
}
},
"websocket-route": {
"ClusterId": "websocket-cluster",
"Match": {
"Path": "/ws/{**catch-all}"
}
}
},
"Clusters": {
"grpc-cluster": {
"Destinations": {
"destination1": {
"Address": "https://localhost:5001"
}
}
},
"websocket-cluster": {
"Destinations": {
"destination1": {
"Address": "https://localhost:5002"
}
},
"HttpRequest": {
"Version": "1.1",
"VersionPolicy": "RequestVersionOrLower"
}
}
}
}
}
9️⃣ Cliente Unificado
// MCPPipeline.Client/UnifiedMCPClient.cs
public class UnifiedMCPClient : IAsyncDisposable
{
private readonly MCPService.MCPServiceClient? _grpcClient;
private readonly HubConnection? _hubConnection;
private readonly ILogger<UnifiedMCPClient> _logger;
private readonly bool _useGrpc;
public UnifiedMCPClient(
string endpoint,
bool useGrpc,
ILogger<UnifiedMCPClient> logger)
{
_useGrpc = useGrpc;
_logger = logger;
if (useGrpc)
{
var channel = GrpcChannel.ForAddress(endpoint);
_grpcClient = new MCPService.MCPServiceClient(channel);
_logger.LogInformation("Cliente configurado para gRPC: {Endpoint}", endpoint);
}
else
{
_hubConnection = new HubConnectionBuilder()
.WithUrl(endpoint)
.WithAutomaticReconnect()
.Build();
SetupWebSocketHandlers();
_logger.LogInformation("Cliente configurado para WebSocket: {Endpoint}", endpoint);
}
}
public async Task ConnectAsync(CancellationToken ct = default)
{
if (!_useGrpc && _hubConnection != null)
{
await _hubConnection.StartAsync(ct);
_logger.LogInformation("Conectado via WebSocket");
}
}
public async Task<MCPResponse> SendCommandAsync(
string command,
string payload,
CancellationToken ct = default)
{
if (_useGrpc && _grpcClient != null)
{
return await SendViaGrpcAsync(command, payload, ct);
}
else if (_hubConnection != null)
{
return await SendViaWebSocketAsync(command, payload, ct);
}
throw new InvalidOperationException("Cliente não inicializado");
}
private async Task<MCPResponse> SendViaGrpcAsync(
string command,
string payload,
CancellationToken ct)
{
_logger.LogInformation("Enviando comando via gRPC: {Command}", command);
var request = new MCPRequest
{
Command = command,
Payload = payload,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
var response = await _grpcClient!.SendCommandAsync(request, cancellationToken: ct);
return new MCPResponse
{
Result = response.Result,
Status = response.Status == "OK" ? ResponseStatus.Success : ResponseStatus.Error,
ErrorMessage = response.ErrorMessage,
ProcessingTimeMs = response.ProcessingTimeMs
};
}
private async Task<MCPResponse> SendViaWebSocketAsync(
string command,
string payload,
CancellationToken ct)
{
_logger.LogInformation("Enviando comando via WebSocket: {Command}", command);
var tcs = new TaskCompletionSource<MCPResponse>();
void handler(MCPResponse response)
{
tcs.TrySetResult(response);
}
_hubConnection!.On<MCPResponse>("CommandResponse", handler);
try
{
await _hubConnection.InvokeAsync("SendCommand", new MCPCommand
{
Command = command,
Payload = payload,
Timestamp = DateTime.UtcNow
}, ct);
return await tcs.Task.WaitAsync(TimeSpan.FromSeconds(30), ct);
}
finally
{
_hubConnection.Remove("CommandResponse");
}
}
public async IAsyncEnumerable<string> StreamCommandAsync(
string command,
string payload,
[EnumeratorCancellation] CancellationToken ct = default)
{
if (_useGrpc && _grpcClient != null)
{
await foreach (var chunk in StreamViaGrpcAsync(command, payload, ct))
{
yield return chunk;
}
}
else if (_hubConnection != null)
{
await foreach (var chunk in StreamViaWebSocketAsync(command, payload, ct))
{
yield return chunk;
}
}
}
private async IAsyncEnumerable<string> StreamViaGrpcAsync(
string command,
string payload,
[EnumeratorCancellation] CancellationToken ct)
{
var request = new MCPRequest
{
Command = command,
Payload = payload,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
using var call = _grpcClient!.StreamCommand(request, cancellationToken: ct);
await foreach (var response in call.ResponseStream.ReadAllAsync(ct))
{
yield return response.Content;
}
}
private async IAsyncEnumerable<string> StreamViaWebSocketAsync(
string command,
string payload,
[EnumeratorCancellation] CancellationToken ct)
{
var channel = Channel.CreateUnbounded<string>();
_hubConnection!.On<dynamic>("StreamChunk", (chunk) =>
{
channel.Writer.TryWrite(chunk.Content.ToString());
});
_hubConnection.On<dynamic>("StreamComplete", (_) =>
{
channel.Writer.Complete();
});
await _hubConnection.InvokeAsync("StreamCommand", new MCPCommand
{
Command = command,
Payload = payload,
Timestamp = DateTime.UtcNow
}, ct);
await foreach (var chunk in channel.Reader.ReadAllAsync(ct))
{
yield return chunk;
}
}
private void SetupWebSocketHandlers()
{
_hubConnection!.On<object>("Connected", data =>
{
_logger.LogInformation("✅ WebSocket conectado: {Data}", data);
});
_hubConnection.Reconnecting += error =>
{
_logger.LogWarning("⚠️ WebSocket reconectando: {Error}", error?.Message);
return Task.CompletedTask;
};
_hubConnection.Reconnected += connectionId =>
{
_logger.LogInformation("✅ WebSocket reconectado: {ConnectionId}", connectionId);
return Task.CompletedTask;
};
_hubConnection.Closed += error =>
{
_logger.LogError("❌ WebSocket fechado: {Error}", error?.Message);
return Task.CompletedTask;
};
}
public async ValueTask DisposeAsync()
{
if (_hubConnection != null)
{
await _hubConnection.StopAsync();
await _hubConnection.DisposeAsync();
}
}
}
// Exemplo de uso
public class Program
{
public static async Task Main(string[] args)
{
var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger<UnifiedMCPClient>();
// Cliente gRPC
await using var grpcClient = new UnifiedMCPClient(
"https://localhost:5001",
useGrpc: true,
logger);
var response = await grpcClient.SendCommandAsync("analyze", "Teste via gRPC");
Console.WriteLine($"gRPC: {response.Result}");
// Cliente WebSocket
await using var wsClient = new UnifiedMCPClient(
"https://localhost:5002/mcphub",
useGrpc: false,
logger);
await wsClient.ConnectAsync();
response = await wsClient.SendCommandAsync("analyze", "Teste via WebSocket");
Console.WriteLine($"WebSocket: {response.Result}");
// Streaming
await foreach (var chunk in wsClient.StreamCommandAsync("generate", "Prompt de teste"))
{
Console.WriteLine($"Chunk: {chunk}");
}
}
}
🔍 Observabilidade Unificada
OpenTelemetry Configuration
// MCPPipeline.Observability/OpenTelemetryExtensions.cs
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using OpenTelemetry.Metrics;
public static class OpenTelemetryExtensions
{
public static IServiceCollection AddMCPObservability(
this IServiceCollection services,
string serviceName)
{
services.AddOpenTelemetry()
.ConfigureResource(resource =>
{
resource.AddService(
serviceName: serviceName,
serviceVersion: "1.0.0");
})
.WithTracing(tracing =>
{
tracing
.AddAspNetCoreInstrumentation(options =>
{
options.RecordException = true;
options.EnrichWithHttpRequest = (activity, request) =>
{
activity.SetTag("http.scheme", request.Scheme);
activity.SetTag("http.client_ip", request.HttpContext.Connection.RemoteIpAddress);
};
})
.AddGrpcClientInstrumentation()
.AddSource("MCPPipeline.Core")
.AddSource("MCPPipeline.GrpcService")
.AddSource("MCPPipeline.WebSocketService")
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri("http://localhost:4317");
});
})
.WithMetrics(metrics =>
{
metrics
.AddAspNetCoreInstrumentation()
.AddRuntimeInstrumentation()
.AddMeter("MCPPipeline.*")
.AddPrometheusExporter()
.AddOtlpExporter();
});
return services;
}
}
// Uso nos serviços
builder.Services.AddMCPObservability("MCPPipeline.GrpcService");
Métricas Customizadas
// MCPPipeline.Observability/MCPMetrics.cs
using System.Diagnostics.Metrics;
public class MCPMetrics
{
private readonly Meter _meter;
private readonly Counter<long> _commandsProcessed;
private readonly Histogram<double> _commandDuration;
private readonly Counter<long> _commandErrors;
private readonly UpDownCounter<int> _activeConnections;
public MCPMetrics()
{
_meter = new Meter("MCPPipeline.Metrics", "1.0.0");
_commandsProcessed = _meter.CreateCounter<long>(
"mcp.commands.processed",
description: "Total de comandos processados");
_commandDuration = _meter.CreateHistogram<double>(
"mcp.command.duration",
unit: "ms",
description: "Duração do processamento de comandos");
_commandErrors = _meter.CreateCounter<long>(
"mcp.commands.errors",
description: "Total de erros no processamento");
_activeConnections = _meter.CreateUpDownCounter<int>(
"mcp.connections.active",
description: "Conexões ativas");
}
public void RecordCommandProcessed(string command, ProtocolType protocol)
{
_commandsProcessed.Add(1,
new KeyValuePair<string, object?>("command", command),
new KeyValuePair<string, object?>("protocol", protocol.ToString()));
}
public void RecordCommandDuration(string command, ProtocolType protocol, double durationMs)
{
_commandDuration.Record(durationMs,
new KeyValuePair<string, object?>("command", command),
new KeyValuePair<string, object?>("protocol", protocol.ToString()));
}
public void RecordCommandError(string command, ProtocolType protocol, string errorType)
{
_commandErrors.Add(1,
new KeyValuePair<string, object?>("command", command),
new KeyValuePair<string, object?>("protocol", protocol.ToString()),
new KeyValuePair<string, object?>("error_type", errorType));
}
public void IncrementActiveConnections(ProtocolType protocol)
{
_activeConnections.Add(1,
new KeyValuePair<string, object?>("protocol", protocol.ToString()));
}
public void DecrementActiveConnections(ProtocolType protocol)
{
_activeConnections.Add(-1,
new KeyValuePair<string, object?>("protocol", protocol.ToString()));
}
}
📊 Dashboard com Grafana
Prometheus Configuration
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'mcp-grpc-service'
static_configs:
- targets: ['localhost:5001']
- job_name: 'mcp-websocket-service'
static_configs:
- targets: ['localhost:5002']
- job_name: 'mcp-gateway'
static_configs:
- targets: ['localhost:5000']
Grafana Dashboard JSON
{
"dashboard": {
"title": "MCP Hybrid Protocol Dashboard",
"panels": [
{
"title": "Comandos Processados por Protocolo",
"targets": [
{
"expr": "rate(mcp_commands_processed_total[5m])",
"legendFormat": "{{protocol}}"
}
]
},
{
"title": "Latência Média por Comando",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(mcp_command_duration_bucket[5m]))",
"legendFormat": "p95 - {{command}}"
}
]
},
{
"title": "Conexões Ativas",
"targets": [
{
"expr": "mcp_connections_active",
"legendFormat": "{{protocol}}"
}
]
},
{
"title": "Taxa de Erro",
"targets": [
{
"expr": "rate(mcp_commands_errors_total[5m])",
"legendFormat": "{{protocol}} - {{error_type}}"
}
]
}
]
}
}
🧪 Testes de Integração
Teste Híbrido
// MCPPipeline.IntegrationTests/HybridProtocolTests.cs
public class HybridProtocolTests : IClassFixture<MCPTestFixture>
{
private readonly MCPTestFixture _fixture;
public HybridProtocolTests(MCPTestFixture fixture)
{
_fixture = fixture;
}
[Fact]
public async Task Should_Process_Same_Command_Via_Both_Protocols()
{
// Arrange
var grpcClient = _fixture.CreateGrpcClient();
var wsClient = _fixture.CreateWebSocketClient();
await wsClient.ConnectAsync();
var testPayload = "Teste de integração híbrida";
// Act
var grpcResponse = await grpcClient.SendCommandAsync("analyze", testPayload);
var wsResponse = await wsClient.SendCommandAsync("analyze", testPayload);
// Assert
Assert.Equal(ResponseStatus.Success, grpcResponse.Status);
Assert.Equal(ResponseStatus.Success, wsResponse.Status);
// Ambos devem retornar resultados equivalentes
Assert.Contains("palavras", grpcResponse.Result);
Assert.Contains("palavras", wsResponse.Result);
}
[Fact]
public async Task Should_Handle_Concurrent_Requests_From_Both_Protocols()
{
// Arrange
var grpcClient = _fixture.CreateGrpcClient();
var wsClient = _fixture.CreateWebSocketClient();
await wsClient.ConnectAsync();
var tasks = new List<Task<MCPResponse>>();
// Act - Enviar 50 requisições concorrentes de cada protocolo
for (int i = 0; i < 50; i++)
{
tasks.Add(grpcClient.SendCommandAsync("status", $"gRPC-{i}"));
tasks.Add(wsClient.SendCommandAsync("status", $"WS-{i}"));
}
var responses = await Task.WhenAll(tasks);
// Assert
Assert.All(responses, r => Assert.Equal(ResponseStatus.Success, r.Status));
Assert.Equal(100, responses.Length);
}
[Fact]
public async Task Should_Broadcast_Events_Between_Protocols()
{
// Arrange
var grpcClient = _fixture.CreateGrpcClient();
var wsClient = _fixture.CreateWebSocketClient();
await wsClient.ConnectAsync();
var eventReceived = new TaskCompletionSource<bool>();
// Configurar listener no WebSocket
// (implementação depende do event bus)
// Act - Enviar comando via gRPC
await grpcClient.SendCommandAsync("generate", "Teste de evento");
// Assert - WebSocket deve receber notificação
var received = await eventReceived.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.True(received);
}
}
public class MCPTestFixture : IAsyncLifetime
{
private WebApplication? _grpcApp;
private WebApplication? _wsApp;
public async Task InitializeAsync()
{
// Inicializar serviços de teste
_grpcApp = CreateGrpcService();
_wsApp = CreateWebSocketService();
await _grpcApp.StartAsync();
await _wsApp.StartAsync();
}
public async Task DisposeAsync()
{
if (_grpcApp != null) await _grpcApp.StopAsync();
if (_wsApp != null) await _wsApp.StopAsync();
}
public UnifiedMCPClient CreateGrpcClient()
{
return new UnifiedMCPClient(
"https://localhost:5001",
useGrpc: true,
Mock.Of<ILogger<UnifiedMCPClient>>());
}
public UnifiedMCPClient CreateWebSocketClient()
{
return new UnifiedMCPClient(
"https://localhost:5002/mcphub",
useGrpc: false,
Mock.Of<ILogger<UnifiedMCPClient>>());
}
private WebApplication CreateGrpcService()
{
var builder = WebApplication.CreateBuilder();
builder.Services.AddGrpc();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>();
var app = builder.Build();
app.MapGrpcService<MCPGrpcServiceImpl>();
return app;
}
private WebApplication CreateWebSocketService()
{
var builder = WebApplication.CreateBuilder();
builder.Services.AddSignalR();
builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>();
var app = builder.Build();
app.MapHub<MCPHub>("/mcphub");
return app;
}
}
🚀 Teste de Performance
Benchmark com BenchmarkDotNet
// MCPPipeline.Benchmarks/ProtocolBenchmarks.cs
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
[MemoryDiagnoser]
[SimpleJob(warmupCount: 3, iterationCount: 10)]
public class ProtocolBenchmarks
{
private UnifiedMCPClient _grpcClient = null!;
private UnifiedMCPClient _wsClient = null!;
[GlobalSetup]
public async Task Setup()
{
_grpcClient = new UnifiedMCPClient(
"https://localhost:5001",
useGrpc: true,
Mock.Of<ILogger<UnifiedMCPClient>>());
_wsClient = new UnifiedMCPClient(
"https://localhost:5002/mcphub",
useGrpc: false,
Mock.Of<ILogger<UnifiedMCPClient>>());
await _wsClient.ConnectAsync();
}
[Benchmark(Baseline = true)]
public async Task<MCPResponse> GrpcCommand()
{
return await _grpcClient.SendCommandAsync("status", "benchmark");
}
[Benchmark]
public async Task<MCPResponse> WebSocketCommand()
{
return await _wsClient.SendCommandAsync("status", "benchmark");
}
[Benchmark]
public async Task GrpcStreaming()
{
await foreach (var _ in _grpcClient.StreamCommandAsync("generate", "test"))
{
// Consumir stream
}
}
[Benchmark]
public async Task WebSocketStreaming()
{
await foreach (var _ in _wsClient.StreamCommandAsync("generate", "test"))
{
// Consumir stream
}
}
[GlobalCleanup]
public async Task Cleanup()
{
await _grpcClient.DisposeAsync();
await _wsClient.DisposeAsync();
}
}
// Program.cs
public class Program
{
public static void Main(string[] args)
{
BenchmarkRunner.Run<ProtocolBenchmarks>();
}
}
Resultados Esperados
| Method | Mean | Error | StdDev | Ratio | Gen0 | Allocated |
|------------------ |----------:|----------:|----------:|------:|-----:|----------:|
| GrpcCommand | 2.450 ms | 0.0421 ms | 0.0394 ms | 1.00 | 15.6 | 128KB |
| WebSocketCommand | 8.732 ms | 0.1523 ms | 0.1425 ms | 3.56 | 31.2 | 256KB |
| GrpcStreaming | 12.105 ms | 0.2103 ms | 0.1967 ms | 4.94 | 46.8 | 384KB |
| WebSocketStreaming| 18.421 ms | 0.3142 ms | 0.2940 ms | 7.52 | 62.5 | 512KB |
🎯 Decisão de Protocolo: Quando Usar Cada Um
Matriz de Decisão
public class ProtocolSelector
{
public ProtocolType SelectOptimalProtocol(ClientContext context)
{
// 1. Cliente é navegador? → WebSocket
if (context.IsWebBrowser)
return ProtocolType.WebSocket;
// 2. Comunicação interna entre serviços? → gRPC
if (context.IsInternalService)
return ProtocolType.Grpc;
// 3. Requer streaming bidirecional de longa duração? → WebSocket
if (context.RequiresLongLivedStream)
return ProtocolType.WebSocket;
// 4. Prioriza latência mínima? → gRPC
if (context.LatencySensitive)
return ProtocolType.Grpc;
// 5. Múltiplos clientes precisam receber broadcasts? → WebSocket
if (context.RequiresBroadcast)
return ProtocolType.WebSocket;
// 6. Padrão: gRPC para performance
return ProtocolType.Grpc;
}
}
public record ClientContext
{
public bool IsWebBrowser { get; init; }
public bool IsInternalService { get; init; }
public bool RequiresLongLivedStream { get; init; }
public bool LatencySensitive { get; init; }
public bool RequiresBroadcast { get; init; }
}
Tabela Comparativa
Cenário | gRPC | WebSocket | Recomendação |
---|---|---|---|
API Backend → Backend | ✅ Ideal | ⚠️ Overhead | gRPC |
Aplicação Web → Backend | ❌ Limitado | ✅ Nativo | WebSocket |
Mobile App → Backend | ✅ Excelente | ✅ Bom | gRPC (melhor performance) |
Dashboard Real-time | ⚠️ Polling | ✅ Push nativo | WebSocket |
Chat Multi-usuário | ❌ Não ideal | ✅ Perfeito | WebSocket |
Análise de Dados | ✅ Alta performance | ⚠️ Mais lento | gRPC |
Streaming de IA | ✅ Eficiente | ✅ Funcional | Ambos (contexto dependente) |
🔒 Segurança em Ambiente Híbrido
Autenticação Unificada
// MCPPipeline.Security/UnifiedAuthenticationExtensions.cs
public static class UnifiedAuthenticationExtensions
{
public static IServiceCollection AddUnifiedAuthentication(
this IServiceCollection services,
IConfiguration configuration)
{
services.AddAuthentication(options =>
{
options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
})
.AddJwtBearer(options =>
{
options.TokenValidationParameters = new TokenValidationParameters
{
ValidateIssuer = true,
ValidateAudience = true,
ValidateLifetime = true,
ValidateIssuerSigningKey = true,
ValidIssuer = configuration["Jwt:Issuer"],
ValidAudience = configuration["Jwt:Audience"],
IssuerSigningKey = new SymmetricSecurityKey(
Encoding.UTF8.GetBytes(configuration["Jwt:Key"]!))
};
// Suporte para WebSocket token via query string
options.Events = new JwtBearerEvents
{
OnMessageReceived = context =>
{
var accessToken = context.Request.Query["access_token"];
var path = context.HttpContext.Request.Path;
if (!string.IsNullOrEmpty(accessToken) &&
path.StartsWithSegments("/mcphub"))
{
context.Token = accessToken;
}
return Task.CompletedTask;
}
};
});
// Adicionar metadata para gRPC
services.AddGrpc(options =>
{
options.Interceptors.Add<AuthenticationInterceptor>();
});
return services;
}
}
// Interceptor para gRPC
public class AuthenticationInterceptor : Interceptor
{
private readonly ILogger<AuthenticationInterceptor> _logger;
public AuthenticationInterceptor(ILogger<AuthenticationInterceptor> logger)
{
_logger = logger;
}
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request,
ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
var authHeader = context.RequestHeaders.GetValue("authorization");
if (string.IsNullOrEmpty(authHeader))
{
_logger.LogWarning("Requisição gRPC sem autenticação");
throw new RpcException(new Status(StatusCode.Unauthenticated, "Token não fornecido"));
}
// TODO: Implementar validação real do token JWT
_logger.LogInformation("Token gRPC validado para peer: {Peer}", context.Peer);
return await continuation(request, context);
}
}
✅ Conclusão e Próximos Passos
Nesta quarta parte, consolidamos uma arquitetura híbrida que combina gRPC e WebSocket, garantindo:
- Baixa latência e tipagem forte para comunicação interna (gRPC)
- Flexibilidade e real-time para clientes web/mobile (WebSocket)
- Kernel unificado, observabilidade centralizada e segurança robusta
🔮 Próximos Passos
- Parte 5: Integração com mensageria avançada (Kafka, RabbitMQ) para cenários de alta escala.
- Parte 6: Implementação de circuit breaker e resiliente com Polly.
- Parte 7: Estratégias de multi-cloud deployment e CI/CD com GitHub Actions.
Dica: Explore o repositório MCPPipeline.Hybrid para exemplos completos e scripts de automação.
🤝 Conecte-se Comigo
Se você trabalha com .NET moderno e quer dominar arquitetura, C#, observabilidade, DevOps ou interoperabilidade:
💼 LinkedIn
✍️ Medium
📬 contato@dopme.io
📬 devsfree@devsfree.com.br
⁸ Novamente o transportou o diabo a um monte muito alto; e mostrou-lhe todos os reinos do mundo, e a glória deles. ⁹ E disse-lhe: Tudo isto te darei se, prostrado, me adorares. ¹⁰ Então disse-lhe Jesus: Vai-te, Satanás, porque está escrito: Ao Senhor teu Deus adorarás, e só a ele servirás. Mateus 4:8-10
Top comments (0)