Olá, pessoal!
Continuando a série My Broker B3, hoje construímos o trading-broker-asset — o serviço Java que consome as cotações publicadas no Kafka pelo trading-broker-market-data e as transforma em dados estruturados no MySQL, com cache de alta performance no Redis.
Se o serviço Python é o coletor, este é o processador.
🏗️ O Papel deste Serviço no Ecossistema
[trading-broker-market-data]
│
│ Kafka: trading-assets-market-data-v1
▼
[trading-broker-asset]
│
┌────┴────┐
▼ ▼
MySQL Redis
(assets) market:price:{TICKER}
│
[b3-matching-engine] (lê daqui)
[trading-broker-order] (valida ticker daqui)
Dois consumers críticos dependem dos dados que este serviço mantém: o Matching Engine (para decisões de preço) e a Order API (para validar se um ticker existe e está ativo).
🎯 Foco no MVP
Nesta fase, priorizei:
- Consumer Kafka robusto com tratamento de erros
- Upsert de ativos: cria na primeira vez, atualiza nas seguintes
- Cache Redis com TTL de 10 minutos
- API REST para consulta do catálogo
- Filtro de status: apenas ativos
ACTIVEsão retornados
🛠️ Stack Tecnológica
| Tecnologia | Uso |
|---|---|
| Java 21 + Spring Boot 3.5.11 | Core do serviço |
| Spring Kafka | Consumo de eventos de cotação |
| MySQL + Flyway | Catálogo de ativos versionado |
| Spring Data Redis | Cache de preços em tempo real |
| SpringDoc OpenAPI | Documentação via Swagger UI |
🏗️ Os Pilares da Implementação
1. O Schema do Banco (Flyway)
CREATE TABLE assets (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
ticker VARCHAR(20) NOT NULL UNIQUE,
name VARCHAR(200),
current_price DECIMAL(19, 4),
last_update DATETIME,
status ENUM('ACTIVE', 'INACTIVE') NOT NULL DEFAULT 'ACTIVE',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Por que Flyway? Versionamento do schema garante que qualquer ambiente (local, CI, produção) tenha exatamente a mesma estrutura de banco, sem surpresas.
2. O Consumer Kafka
@KafkaListener(
topics = "trading-assets-market-data-v1",
groupId = "trading-broker-asset"
)
public void consume(AssetMarketDataDTO dto) {
log.info("Market data received for ticker: {}", dto.getTicker());
try {
assetService.updateAsset(dto);
} catch (Exception e) {
log.error("Failed to process market data for ticker {}: {}",
dto.getTicker(), e.getMessage(), e);
// Rethrow so Kafka can apply retry policy instead of silently discarding
throw e;
}
}
Por que relançar a exceção? Se o processamento falhar silenciosamente, o offset é confirmado e a mensagem é descartada — o ativo nunca é atualizado. Relançar permite que o Kafka aplique sua política de retry.
3. O Upsert de Ativos
@Transactional
public void updateAsset(AssetMarketDataDTO dto) {
Asset asset = assetRepository.findByTicker(dto.getTicker())
.orElse(Asset.builder()
.ticker(dto.getTicker())
.status(AssetStatus.ACTIVE)
.build());
asset.setName(dto.getName());
asset.setCurrentPrice(dto.getPrice());
// Uses real market timestamp from event — not LocalDateTime.now()
asset.setLastUpdate(dto.getUpdatedAt() != null ? dto.getUpdatedAt() : LocalDateTime.now());
assetRepository.save(asset);
// Update Redis cache after persisting
marketPriceCacheService.updatePrice(dto.getTicker(), dto.getPrice());
}
Decisão importante: usamos o updatedAt real do evento Kafka, não LocalDateTime.now(). Isso garante que o lastUpdate no banco reflete quando o preço foi gerado pelo mercado, não quando foi processado pelo serviço.
4. O Cache Redis com TTL
private static final Duration CACHE_TTL = Duration.ofMinutes(10);
public void updatePrice(String ticker, BigDecimal price) {
String key = "market:price:" + ticker;
// TTL of 10 minutes — prevents stale prices for inactive or delisted assets
redisTemplate.opsForValue().set(key, price.toString(), CACHE_TTL);
}
Por que TTL? Se um ativo for inativado ou deslistado, seu preço vai expirar do Redis em 10 minutos. Sem TTL, o Matching Engine poderia usar preços de ativos que não existem mais.
5. Filtro de Status nas Queries
public List<AssetDTO> findAllActive() {
// Query directly by status — not findAll() + filter in memory
return assetRepository.findAllByStatus(AssetStatus.ACTIVE).stream()
.map(AssetDTO::fromEntity)
.toList();
}
public Optional<AssetDTO> findByTicker(String ticker) {
// Only returns ACTIVE assets — inactive returns 404
return assetRepository.findByTickerAndStatus(ticker.toUpperCase(), AssetStatus.ACTIVE)
.map(AssetDTO::fromEntity);
}
Quando a trading-broker-order valida um ticker antes de criar uma ordem, ela chama GET /api/v1/assets/{ticker}. Se o ativo estiver inativo, recebe 404 e rejeita a ordem. Correto e previsível.
🌐 API REST
| Método | Endpoint | Descrição |
|---|---|---|
| GET | /api/v1/assets |
Lista todos os ativos ativos |
| GET | /api/v1/assets/{ticker} |
Busca ativo por ticker (404 se inativo) |
📄 Swagger UI: http://localhost:8083/swagger-ui.html
✅ Validando a Execução
Com a aplicação rodando e o trading-broker-market-data publicando no Kafka:
- ✅ Consumer conectado ao tópico
trading-assets-market-data-v1 - ✅ Ativos sendo criados/atualizados no MySQL em tempo real
- ✅ Redis sendo atualizado com TTL após cada evento
- ✅ Logs mostrando
Updating ticker PETR4 with price R$ 47.37
🚀 O que vem a seguir?
Com o catálogo de ativos funcionando, o próximo passo é o b3-market-sync-api — que sincroniza os preços diretamente para o Redis da B3 — seguido do b3-matching-engine-api, que usa esses preços para executar as ordens.
🔎 Sobre a série
⬅️ Post Anterior: Dica de Ferramentas: MongoDB e Kafka
➡️ Próximo Post: Sincronizando o Mercado Real: Consumindo a Brapi e Alimentando o Redis com Spring Boot
📘 Índice da Série: Guia da Série
Links:
Top comments (0)