DEV Community

Roberto de Vargas Neto
Roberto de Vargas Neto

Posted on • Edited on

Do Stream para o Banco: Processando Market Data com Spring Boot, Redis e Flyway

Olá, pessoal!

Continuando a série My Broker B3, hoje construímos o trading-broker-asset — o serviço Java que consome as cotações publicadas no Kafka pelo trading-broker-market-data e as transforma em dados estruturados no MySQL, com cache de alta performance no Redis.

Se o serviço Python é o coletor, este é o processador.


🏗️ O Papel deste Serviço no Ecossistema

[trading-broker-market-data]
         │
         │ Kafka: trading-assets-market-data-v1
         ▼
[trading-broker-asset]
         │
    ┌────┴────┐
    ▼         ▼
  MySQL     Redis
(assets)  market:price:{TICKER}
              │
    [b3-matching-engine]   (lê daqui)
    [trading-broker-order] (valida ticker daqui)
Enter fullscreen mode Exit fullscreen mode

Dois consumers críticos dependem dos dados que este serviço mantém: o Matching Engine (para decisões de preço) e a Order API (para validar se um ticker existe e está ativo).


🎯 Foco no MVP

Nesta fase, priorizei:

  • Consumer Kafka robusto com tratamento de erros
  • Upsert de ativos: cria na primeira vez, atualiza nas seguintes
  • Cache Redis com TTL de 10 minutos
  • API REST para consulta do catálogo
  • Filtro de status: apenas ativos ACTIVE são retornados

🛠️ Stack Tecnológica

Tecnologia Uso
Java 21 + Spring Boot 3.5.11 Core do serviço
Spring Kafka Consumo de eventos de cotação
MySQL + Flyway Catálogo de ativos versionado
Spring Data Redis Cache de preços em tempo real
SpringDoc OpenAPI Documentação via Swagger UI

🏗️ Os Pilares da Implementação

1. O Schema do Banco (Flyway)

CREATE TABLE assets (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    ticker VARCHAR(20) NOT NULL UNIQUE,
    name VARCHAR(200),
    current_price DECIMAL(19, 4),
    last_update DATETIME,
    status ENUM('ACTIVE', 'INACTIVE') NOT NULL DEFAULT 'ACTIVE',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Enter fullscreen mode Exit fullscreen mode

Por que Flyway? Versionamento do schema garante que qualquer ambiente (local, CI, produção) tenha exatamente a mesma estrutura de banco, sem surpresas.

2. O Consumer Kafka

@KafkaListener(
        topics = "trading-assets-market-data-v1",
        groupId = "trading-broker-asset"
)
public void consume(AssetMarketDataDTO dto) {
    log.info("Market data received for ticker: {}", dto.getTicker());
    try {
        assetService.updateAsset(dto);
    } catch (Exception e) {
        log.error("Failed to process market data for ticker {}: {}",
                dto.getTicker(), e.getMessage(), e);
        // Rethrow so Kafka can apply retry policy instead of silently discarding
        throw e;
    }
}
Enter fullscreen mode Exit fullscreen mode

Por que relançar a exceção? Se o processamento falhar silenciosamente, o offset é confirmado e a mensagem é descartada — o ativo nunca é atualizado. Relançar permite que o Kafka aplique sua política de retry.

3. O Upsert de Ativos

@Transactional
public void updateAsset(AssetMarketDataDTO dto) {
    Asset asset = assetRepository.findByTicker(dto.getTicker())
            .orElse(Asset.builder()
                    .ticker(dto.getTicker())
                    .status(AssetStatus.ACTIVE)
                    .build());

    asset.setName(dto.getName());
    asset.setCurrentPrice(dto.getPrice());
    // Uses real market timestamp from event — not LocalDateTime.now()
    asset.setLastUpdate(dto.getUpdatedAt() != null ? dto.getUpdatedAt() : LocalDateTime.now());

    assetRepository.save(asset);

    // Update Redis cache after persisting
    marketPriceCacheService.updatePrice(dto.getTicker(), dto.getPrice());
}
Enter fullscreen mode Exit fullscreen mode

Decisão importante: usamos o updatedAt real do evento Kafka, não LocalDateTime.now(). Isso garante que o lastUpdate no banco reflete quando o preço foi gerado pelo mercado, não quando foi processado pelo serviço.

4. O Cache Redis com TTL

private static final Duration CACHE_TTL = Duration.ofMinutes(10);

public void updatePrice(String ticker, BigDecimal price) {
    String key = "market:price:" + ticker;
    // TTL of 10 minutes — prevents stale prices for inactive or delisted assets
    redisTemplate.opsForValue().set(key, price.toString(), CACHE_TTL);
}
Enter fullscreen mode Exit fullscreen mode

Por que TTL? Se um ativo for inativado ou deslistado, seu preço vai expirar do Redis em 10 minutos. Sem TTL, o Matching Engine poderia usar preços de ativos que não existem mais.

5. Filtro de Status nas Queries

public List<AssetDTO> findAllActive() {
    // Query directly by status — not findAll() + filter in memory
    return assetRepository.findAllByStatus(AssetStatus.ACTIVE).stream()
            .map(AssetDTO::fromEntity)
            .toList();
}

public Optional<AssetDTO> findByTicker(String ticker) {
    // Only returns ACTIVE assets — inactive returns 404
    return assetRepository.findByTickerAndStatus(ticker.toUpperCase(), AssetStatus.ACTIVE)
            .map(AssetDTO::fromEntity);
}
Enter fullscreen mode Exit fullscreen mode

Quando a trading-broker-order valida um ticker antes de criar uma ordem, ela chama GET /api/v1/assets/{ticker}. Se o ativo estiver inativo, recebe 404 e rejeita a ordem. Correto e previsível.


🌐 API REST

Método Endpoint Descrição
GET /api/v1/assets Lista todos os ativos ativos
GET /api/v1/assets/{ticker} Busca ativo por ticker (404 se inativo)

📄 Swagger UI: http://localhost:8083/swagger-ui.html


✅ Validando a Execução

Com a aplicação rodando e o trading-broker-market-data publicando no Kafka:

  • ✅ Consumer conectado ao tópico trading-assets-market-data-v1
  • ✅ Ativos sendo criados/atualizados no MySQL em tempo real
  • ✅ Redis sendo atualizado com TTL após cada evento
  • ✅ Logs mostrando Updating ticker PETR4 with price R$ 47.37

🚀 O que vem a seguir?

Com o catálogo de ativos funcionando, o próximo passo é o b3-market-sync-api — que sincroniza os preços diretamente para o Redis da B3 — seguido do b3-matching-engine-api, que usa esses preços para executar as ordens.


🔎 Sobre a série

⬅️ Post Anterior: Dica de Ferramentas: MongoDB e Kafka

➡️ Próximo Post: Sincronizando o Mercado Real: Consumindo a Brapi e Alimentando o Redis com Spring Boot

📘 Índice da Série: Guia da Série


Links:

Top comments (0)