Introdução
O Syros é uma plataforma de coordenação distribuída construída em Rust que oferece soluções robustas para sistemas distribuídos modernos. Neste artigo, vamos construir uma aplicação prática que demonstra como usar o Syros para gerenciar locks distribuídos em um sistema de inventário.
O que é o Syros?
O Syros é um crate Rust disponível em crates.io que implementa:
- Lock Manager: Locks distribuídos para coordenação de recursos
- Saga Orchestrator: Transações distribuídas com compensação
- Event Store: Armazenamento de eventos para auditoria
- Cache Manager: Cache distribuído para performance
- APIs Múltiplas: REST, gRPC, WebSocket e GraphQL
Cenário: Sistema de Inventário Distribuído
Vamos construir um sistema onde múltiplas aplicações (microserviços) precisam atualizar o inventário de produtos simultaneamente, evitando condições de corrida e garantindo consistência dos dados.
Problema
- Múltiplas aplicações atualizando a mesma tabela
inventory
- Necessidade de evitar overselling (vender mais do que tem em estoque)
- Garantir consistência em operações concorrentes
Solução
Usar locks distribuídos do Syros para coordenar o acesso ao inventário.
Passo a Passo: Implementação
1. Configuração do Projeto
Primeiro, vamos criar um novo projeto Rust:
cargo new inventory-system
cd inventory-system
Adicione as dependências no Cargo.toml
:
[package]
name = "inventory-system"
version = "0.1.0"
edition = "2021"
[dependencies]
syros = "1.0.0"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.0", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
anyhow = "1.0"
2. Estrutura do Sistema
// src/main.rs
use syros::core::lock_manager::{LockManager, LockRequest, ReleaseLockRequest};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Product {
pub id: String,
pub name: String,
pub price: f64,
pub stock: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InventoryUpdate {
pub product_id: String,
pub quantity: i32,
pub operation: String, // "add", "subtract", "set"
pub timestamp: DateTime<Utc>,
pub app_name: String,
}
pub struct InventoryService {
products: Arc<RwLock<HashMap<String, Product>>>,
lock_manager: Arc<LockManager>,
}
impl InventoryService {
pub fn new(lock_manager: Arc<LockManager>) -> Self {
let mut products = HashMap::new();
// Inicializar alguns produtos de exemplo
products.insert("prod_001".to_string(), Product {
id: "prod_001".to_string(),
name: "Smartphone".to_string(),
price: 899.99,
stock: 100,
});
products.insert("prod_002".to_string(), Product {
id: "prod_002".to_string(),
name: "Notebook".to_string(),
price: 1299.99,
stock: 50,
});
Self {
products: Arc::new(RwLock::new(products)),
lock_manager,
}
}
pub async fn update_inventory(
&self,
update: InventoryUpdate,
) -> Result<String, anyhow::Error> {
let lock_key = format!("inventory_{}", update.product_id);
let app_name = update.app_name.clone();
// 1. Adquirir lock para o produto específico
let lock_request = LockRequest {
key: lock_key.clone(),
owner: app_name.clone(),
ttl: std::time::Duration::from_secs(30),
metadata: Some(format!("Atualizando inventário do produto {}", update.product_id)),
wait_timeout: Some(std::time::Duration::from_secs(10)),
};
let lock_response = self.lock_manager.acquire_lock(lock_request).await?;
if !lock_response.success {
return Err(anyhow::anyhow!("Falha ao adquirir lock: {}", lock_response.message));
}
let lock_id = lock_response.lock_id;
// 2. Executar operação crítica
let result = self.execute_inventory_update(update).await;
// 3. Liberar lock
let release_request = ReleaseLockRequest {
key: lock_key,
lock_id,
owner: app_name,
};
self.lock_manager.release_lock(release_request).await?;
result
}
async fn execute_inventory_update(
&self,
update: InventoryUpdate,
) -> Result<String, anyhow::Error> {
let mut products = self.products.write().await;
if let Some(product) = products.get_mut(&update.product_id) {
match update.operation.as_str() {
"add" => {
product.stock += update.quantity;
}
"subtract" => {
if product.stock < update.quantity {
return Err(anyhow::anyhow!(
"Estoque insuficiente. Disponível: {}, Solicitado: {}",
product.stock,
update.quantity
));
}
product.stock -= update.quantity;
}
"set" => {
product.stock = update.quantity;
}
_ => {
return Err(anyhow::anyhow!("Operação inválida: {}", update.operation));
}
}
println!(
"{}: Produto {} - Estoque atualizado para {} unidades",
update.app_name, product.name, product.stock
);
Ok(format!(
"Inventário atualizado: {} unidades de {}",
product.stock,
product.name
))
} else {
Err(anyhow::anyhow!("Produto não encontrado: {}", update.product_id))
}
}
pub async fn get_product(&self, product_id: &str) -> Option<Product> {
let products = self.products.read().await;
products.get(product_id).cloned()
}
pub async fn list_products(&self) -> Vec<Product> {
let products = self.products.read().await;
products.values().cloned().collect()
}
}
3. Simulador de Aplicações Concorrentes
// src/simulator.rs
use crate::{InventoryService, InventoryUpdate};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use chrono::Utc;
use uuid::Uuid;
pub struct AppSimulator {
inventory_service: Arc<InventoryService>,
app_name: String,
}
impl AppSimulator {
pub fn new(inventory_service: Arc<InventoryService>, app_name: String) -> Self {
Self {
inventory_service,
app_name,
}
}
pub async fn simulate_workload(&self, product_id: &str, operations: Vec<i32>) {
println!("{} iniciando operações no produto {}", self.app_name, product_id);
for (i, quantity) in operations.iter().enumerate() {
let update = InventoryUpdate {
product_id: product_id.to_string(),
quantity: *quantity,
operation: "subtract".to_string(),
timestamp: Utc::now(),
app_name: self.app_name.clone(),
};
match self.inventory_service.update_inventory(update).await {
Ok(result) => {
println!("{} - Operação {}: {}", self.app_name, i + 1, result);
}
Err(e) => {
println!("{} - Operação {} falhou: {}", self.app_name, i + 1, e);
}
}
// Simular tempo de processamento
sleep(Duration::from_millis(100)).await;
}
println!("{} finalizou operações", self.app_name);
}
}
4. Função Principal
// src/main.rs (continuação)
mod simulator;
use simulator::AppSimulator;
use syros::core::lock_manager::LockManager;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// Inicializar logging
tracing_subscriber::fmt::init();
println!("Sistema de Inventário Distribuído com Syros");
println!("================================================");
// 1. Inicializar o Lock Manager do Syros
let lock_manager = Arc::new(LockManager::new());
// 2. Criar o serviço de inventário
let inventory_service = Arc::new(InventoryService::new(lock_manager));
// 3. Mostrar estado inicial
println!("\nEstado inicial do inventário:");
for product in inventory_service.list_products().await {
println!(" - {}: {} unidades (R$ {:.2})",
product.name, product.stock, product.price);
}
// 4. Simular múltiplas aplicações trabalhando simultaneamente
println!("\nSimulando operações concorrentes...");
let product_id = "prod_001"; // Smartphone
// Criar simuladores para diferentes aplicações
let order_service = AppSimulator::new(
inventory_service.clone(),
"OrderService".to_string(),
);
let return_service = AppSimulator::new(
inventory_service.clone(),
"ReturnService".to_string(),
);
let stock_service = AppSimulator::new(
inventory_service.clone(),
"StockService".to_string(),
);
// Executar operações concorrentes
let tasks = vec![
tokio::spawn(async move {
order_service.simulate_workload(product_id, vec![5, 3, 2, 1]).await;
}),
tokio::spawn(async move {
sleep(Duration::from_millis(50)).await; // Delay para simular concorrência
return_service.simulate_workload(product_id, vec![2, 1]).await;
}),
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
stock_service.simulate_workload(product_id, vec![10, 5]).await;
}),
];
// Aguardar todas as operações
for task in tasks {
task.await?;
}
// 5. Mostrar estado final
println!("\nEstado final do inventário:");
for product in inventory_service.list_products().await {
println!(" - {}: {} unidades (R$ {:.2})",
product.name, product.stock, product.price);
}
println!("\nSimulação concluída com sucesso!");
println!("💡 Observe como o Syros coordenou as operações concorrentes");
Ok(())
}
5. Executando a Aplicação
# Compilar e executar
cargo run
# Ou em modo release para melhor performance
cargo run --release
Saída Esperada
Sistema de Inventário Distribuído com Syros
================================================
Estado inicial do inventário:
- Smartphone: 100 unidades (R$ 899.99)
- Notebook: 50 unidades (R$ 1299.99)
Simulando operações concorrentes...
OrderService iniciando operações no produto prod_001
OrderService - Operação 1: Inventário atualizado: 95 unidades de Smartphone
OrderService - Operação 2: Inventário atualizado: 92 unidades de Smartphone
OrderService - Operação 3: Inventário atualizado: 90 unidades de Smartphone
OrderService - Operação 4: Inventário atualizado: 89 unidades de Smartphone
OrderService finalizou operações
ReturnService iniciando operações no produto prod_001
ReturnService - Operação 1: Inventário atualizado: 91 unidades de Smartphone
ReturnService - Operação 2: Inventário atualizado: 92 unidades de Smartphone
ReturnService finalizou operações
StockService iniciando operações no produto prod_001
StockService - Operação 1: Inventário atualizado: 82 unidades de Smartphone
StockService - Operação 2: Inventário atualizado: 77 unidades de Smartphone
StockService finalizou operações
Estado final do inventário:
- Smartphone: 77 unidades (R$ 899.99)
- Notebook: 50 unidades (R$ 1299.99)
Simulação concluída com sucesso!
Observe como o Syros coordenou as operações concorrentes
Funcionalidades Avançadas do Syros
1. Usando a API REST
use reqwest;
use serde_json::json;
async fn update_inventory_via_rest(product_id: &str, quantity: i32) -> Result<(), anyhow::Error> {
let client = reqwest::Client::new();
// Adquirir lock via REST API
let lock_request = json!({
"key": format!("inventory_{}", product_id),
"owner": "rest-client",
"ttl_seconds": 30,
"metadata": "Atualização via REST API"
});
let response = client
.post("http://localhost:8080/api/v1/locks")
.json(&lock_request)
.send()
.await?;
if response.status().is_success() {
let lock_data: serde_json::Value = response.json().await?;
let lock_id = lock_data["lock_id"].as_str().unwrap();
// Fazer operação crítica
println!("Executando operação crítica...");
// Liberar lock
client
.delete(&format!("http://localhost:8080/api/v1/locks/inventory_{}", product_id))
.json(&json!({
"lock_id": lock_id,
"owner": "rest-client"
}))
.send()
.await?;
}
Ok(())
}
2. Monitoramento e Métricas
use syros::api::metrics::MetricsCollector;
async fn monitor_system() {
let metrics = MetricsCollector::new();
// Verificar métricas de locks
let lock_metrics = metrics.get_lock_metrics().await;
println!("Locks ativos: {}", lock_metrics.active_locks);
println!("Locks adquiridos: {}", lock_metrics.total_acquired);
println!("Locks liberados: {}", lock_metrics.total_released);
}
3. Configuração Avançada
use syros::config::Config;
async fn setup_advanced_config() -> Result<(), anyhow::Error> {
let config = Config::builder()
.lock_manager()
.default_ttl(Duration::from_secs(60))
.max_wait_time(Duration::from_secs(30))
.build()
.build();
let lock_manager = LockManager::with_config(config.lock_manager);
// Usar o lock manager configurado...
Ok(())
}
Benefícios do Syros
1. Prevenção de Race Conditions
- Garante que apenas uma aplicação modifique um recurso por vez
- Evita condições de corrida em operações críticas
2. Consistência de Dados
- Operações atômicas garantem integridade dos dados
- Prevenção de overselling e inconsistências
3. Escalabilidade
- Funciona com múltiplas instâncias de aplicação
- Coordenação distribuída sem ponto único de falha
4. Observabilidade
- Métricas detalhadas para monitoramento
- Logs estruturados para debugging
5. Flexibilidade
- Múltiplas APIs (REST, gRPC, WebSocket)
- SDKs para diferentes linguagens
- Configuração flexível
Próximos Passos
- Explore os SDKs: O Syros oferece SDKs para Python, Node.js, Java, C# e Go
- Implemente Sagas: Use o Saga Orchestrator para transações distribuídas complexas
- Configure Observabilidade: Integre com Prometheus e Grafana
- Deploy em Produção: Use Docker e Kubernetes para deployment
Conclusão
O Syros oferece uma solução robusta e eficiente para coordenação em sistemas distribuídos. Através deste exemplo prático, vimos como implementar locks distribuídos para garantir consistência em operações concorrentes, evitando problemas comuns como condições de corrida e inconsistências de dados.
A plataforma é especialmente útil em cenários de microserviços onde múltiplas aplicações precisam coordenar o acesso a recursos compartilhados, garantindo a integridade e consistência dos dados em todo o sistema.
Para mais informações, consulte:
Top comments (0)