DEV Community

Jackson Wendel Santos Sá
Jackson Wendel Santos Sá

Posted on

Construindo uma Aplicação com Syros: Sistema de Inventário Distribuído

Introdução

O Syros é uma plataforma de coordenação distribuída construída em Rust que oferece soluções robustas para sistemas distribuídos modernos. Neste artigo, vamos construir uma aplicação prática que demonstra como usar o Syros para gerenciar locks distribuídos em um sistema de inventário.

O que é o Syros?

O Syros é um crate Rust disponível em crates.io que implementa:

  • Lock Manager: Locks distribuídos para coordenação de recursos
  • Saga Orchestrator: Transações distribuídas com compensação
  • Event Store: Armazenamento de eventos para auditoria
  • Cache Manager: Cache distribuído para performance
  • APIs Múltiplas: REST, gRPC, WebSocket e GraphQL

Cenário: Sistema de Inventário Distribuído

Vamos construir um sistema onde múltiplas aplicações (microserviços) precisam atualizar o inventário de produtos simultaneamente, evitando condições de corrida e garantindo consistência dos dados.

Problema

  • Múltiplas aplicações atualizando a mesma tabela inventory
  • Necessidade de evitar overselling (vender mais do que tem em estoque)
  • Garantir consistência em operações concorrentes

Solução

Usar locks distribuídos do Syros para coordenar o acesso ao inventário.

Passo a Passo: Implementação

1. Configuração do Projeto

Primeiro, vamos criar um novo projeto Rust:

cargo new inventory-system
cd inventory-system
Enter fullscreen mode Exit fullscreen mode

Adicione as dependências no Cargo.toml:

[package]
name = "inventory-system"
version = "0.1.0"
edition = "2021"

[dependencies]
syros = "1.0.0"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.0", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
anyhow = "1.0"
Enter fullscreen mode Exit fullscreen mode

2. Estrutura do Sistema

// src/main.rs
use syros::core::lock_manager::{LockManager, LockRequest, ReleaseLockRequest};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use chrono::{DateTime, Utc};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Product {
    pub id: String,
    pub name: String,
    pub price: f64,
    pub stock: i32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InventoryUpdate {
    pub product_id: String,
    pub quantity: i32,
    pub operation: String, // "add", "subtract", "set"
    pub timestamp: DateTime<Utc>,
    pub app_name: String,
}

pub struct InventoryService {
    products: Arc<RwLock<HashMap<String, Product>>>,
    lock_manager: Arc<LockManager>,
}

impl InventoryService {
    pub fn new(lock_manager: Arc<LockManager>) -> Self {
        let mut products = HashMap::new();

        // Inicializar alguns produtos de exemplo
        products.insert("prod_001".to_string(), Product {
            id: "prod_001".to_string(),
            name: "Smartphone".to_string(),
            price: 899.99,
            stock: 100,
        });

        products.insert("prod_002".to_string(), Product {
            id: "prod_002".to_string(),
            name: "Notebook".to_string(),
            price: 1299.99,
            stock: 50,
        });

        Self {
            products: Arc::new(RwLock::new(products)),
            lock_manager,
        }
    }

    pub async fn update_inventory(
        &self,
        update: InventoryUpdate,
    ) -> Result<String, anyhow::Error> {
        let lock_key = format!("inventory_{}", update.product_id);
        let app_name = update.app_name.clone();

        // 1. Adquirir lock para o produto específico
        let lock_request = LockRequest {
            key: lock_key.clone(),
            owner: app_name.clone(),
            ttl: std::time::Duration::from_secs(30),
            metadata: Some(format!("Atualizando inventário do produto {}", update.product_id)),
            wait_timeout: Some(std::time::Duration::from_secs(10)),
        };

        let lock_response = self.lock_manager.acquire_lock(lock_request).await?;

        if !lock_response.success {
            return Err(anyhow::anyhow!("Falha ao adquirir lock: {}", lock_response.message));
        }

        let lock_id = lock_response.lock_id;

        // 2. Executar operação crítica
        let result = self.execute_inventory_update(update).await;

        // 3. Liberar lock
        let release_request = ReleaseLockRequest {
            key: lock_key,
            lock_id,
            owner: app_name,
        };

        self.lock_manager.release_lock(release_request).await?;

        result
    }

    async fn execute_inventory_update(
        &self,
        update: InventoryUpdate,
    ) -> Result<String, anyhow::Error> {
        let mut products = self.products.write().await;

        if let Some(product) = products.get_mut(&update.product_id) {
            match update.operation.as_str() {
                "add" => {
                    product.stock += update.quantity;
                }
                "subtract" => {
                    if product.stock < update.quantity {
                        return Err(anyhow::anyhow!(
                            "Estoque insuficiente. Disponível: {}, Solicitado: {}",
                            product.stock,
                            update.quantity
                        ));
                    }
                    product.stock -= update.quantity;
                }
                "set" => {
                    product.stock = update.quantity;
                }
                _ => {
                    return Err(anyhow::anyhow!("Operação inválida: {}", update.operation));
                }
            }

            println!(
                "{}: Produto {} - Estoque atualizado para {} unidades",
                update.app_name, product.name, product.stock
            );

            Ok(format!(
                "Inventário atualizado: {} unidades de {}",
                product.stock,
                product.name
            ))
        } else {
            Err(anyhow::anyhow!("Produto não encontrado: {}", update.product_id))
        }
    }

    pub async fn get_product(&self, product_id: &str) -> Option<Product> {
        let products = self.products.read().await;
        products.get(product_id).cloned()
    }

    pub async fn list_products(&self) -> Vec<Product> {
        let products = self.products.read().await;
        products.values().cloned().collect()
    }
}
Enter fullscreen mode Exit fullscreen mode

3. Simulador de Aplicações Concorrentes

// src/simulator.rs
use crate::{InventoryService, InventoryUpdate};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use chrono::Utc;
use uuid::Uuid;

pub struct AppSimulator {
    inventory_service: Arc<InventoryService>,
    app_name: String,
}

impl AppSimulator {
    pub fn new(inventory_service: Arc<InventoryService>, app_name: String) -> Self {
        Self {
            inventory_service,
            app_name,
        }
    }

    pub async fn simulate_workload(&self, product_id: &str, operations: Vec<i32>) {
        println!("{} iniciando operações no produto {}", self.app_name, product_id);

        for (i, quantity) in operations.iter().enumerate() {
            let update = InventoryUpdate {
                product_id: product_id.to_string(),
                quantity: *quantity,
                operation: "subtract".to_string(),
                timestamp: Utc::now(),
                app_name: self.app_name.clone(),
            };

            match self.inventory_service.update_inventory(update).await {
                Ok(result) => {
                    println!("{} - Operação {}: {}", self.app_name, i + 1, result);
                }
                Err(e) => {
                    println!("{} - Operação {} falhou: {}", self.app_name, i + 1, e);
                }
            }

            // Simular tempo de processamento
            sleep(Duration::from_millis(100)).await;
        }

        println!("{} finalizou operações", self.app_name);
    }
}
Enter fullscreen mode Exit fullscreen mode

4. Função Principal

// src/main.rs (continuação)
mod simulator;

use simulator::AppSimulator;
use syros::core::lock_manager::LockManager;
use std::sync::Arc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    // Inicializar logging
    tracing_subscriber::fmt::init();

    println!("Sistema de Inventário Distribuído com Syros");
    println!("================================================");

    // 1. Inicializar o Lock Manager do Syros
    let lock_manager = Arc::new(LockManager::new());

    // 2. Criar o serviço de inventário
    let inventory_service = Arc::new(InventoryService::new(lock_manager));

    // 3. Mostrar estado inicial
    println!("\nEstado inicial do inventário:");
    for product in inventory_service.list_products().await {
        println!("  - {}: {} unidades (R$ {:.2})", 
                product.name, product.stock, product.price);
    }

    // 4. Simular múltiplas aplicações trabalhando simultaneamente
    println!("\nSimulando operações concorrentes...");

    let product_id = "prod_001"; // Smartphone

    // Criar simuladores para diferentes aplicações
    let order_service = AppSimulator::new(
        inventory_service.clone(),
        "OrderService".to_string(),
    );

    let return_service = AppSimulator::new(
        inventory_service.clone(),
        "ReturnService".to_string(),
    );

    let stock_service = AppSimulator::new(
        inventory_service.clone(),
        "StockService".to_string(),
    );

    // Executar operações concorrentes
    let tasks = vec![
        tokio::spawn(async move {
            order_service.simulate_workload(product_id, vec![5, 3, 2, 1]).await;
        }),
        tokio::spawn(async move {
            sleep(Duration::from_millis(50)).await; // Delay para simular concorrência
            return_service.simulate_workload(product_id, vec![2, 1]).await;
        }),
        tokio::spawn(async move {
            sleep(Duration::from_millis(100)).await;
            stock_service.simulate_workload(product_id, vec![10, 5]).await;
        }),
    ];

    // Aguardar todas as operações
    for task in tasks {
        task.await?;
    }

    // 5. Mostrar estado final
    println!("\nEstado final do inventário:");
    for product in inventory_service.list_products().await {
        println!("  - {}: {} unidades (R$ {:.2})", 
                product.name, product.stock, product.price);
    }

    println!("\nSimulação concluída com sucesso!");
    println!("💡 Observe como o Syros coordenou as operações concorrentes");

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

5. Executando a Aplicação

# Compilar e executar
cargo run

# Ou em modo release para melhor performance
cargo run --release
Enter fullscreen mode Exit fullscreen mode

Saída Esperada

Sistema de Inventário Distribuído com Syros
================================================

Estado inicial do inventário:
  - Smartphone: 100 unidades (R$ 899.99)
  - Notebook: 50 unidades (R$ 1299.99)

Simulando operações concorrentes...
OrderService iniciando operações no produto prod_001
  OrderService - Operação 1: Inventário atualizado: 95 unidades de Smartphone
  OrderService - Operação 2: Inventário atualizado: 92 unidades de Smartphone
  OrderService - Operação 3: Inventário atualizado: 90 unidades de Smartphone
  OrderService - Operação 4: Inventário atualizado: 89 unidades de Smartphone
OrderService finalizou operações

ReturnService iniciando operações no produto prod_001
  ReturnService - Operação 1: Inventário atualizado: 91 unidades de Smartphone
  ReturnService - Operação 2: Inventário atualizado: 92 unidades de Smartphone
ReturnService finalizou operações

StockService iniciando operações no produto prod_001
  StockService - Operação 1: Inventário atualizado: 82 unidades de Smartphone
  StockService - Operação 2: Inventário atualizado: 77 unidades de Smartphone
StockService finalizou operações

Estado final do inventário:
  - Smartphone: 77 unidades (R$ 899.99)
  - Notebook: 50 unidades (R$ 1299.99)

Simulação concluída com sucesso!
Observe como o Syros coordenou as operações concorrentes
Enter fullscreen mode Exit fullscreen mode

Funcionalidades Avançadas do Syros

1. Usando a API REST

use reqwest;
use serde_json::json;

async fn update_inventory_via_rest(product_id: &str, quantity: i32) -> Result<(), anyhow::Error> {
    let client = reqwest::Client::new();

    // Adquirir lock via REST API
    let lock_request = json!({
        "key": format!("inventory_{}", product_id),
        "owner": "rest-client",
        "ttl_seconds": 30,
        "metadata": "Atualização via REST API"
    });

    let response = client
        .post("http://localhost:8080/api/v1/locks")
        .json(&lock_request)
        .send()
        .await?;

    if response.status().is_success() {
        let lock_data: serde_json::Value = response.json().await?;
        let lock_id = lock_data["lock_id"].as_str().unwrap();

        // Fazer operação crítica
        println!("Executando operação crítica...");

        // Liberar lock
        client
            .delete(&format!("http://localhost:8080/api/v1/locks/inventory_{}", product_id))
            .json(&json!({
                "lock_id": lock_id,
                "owner": "rest-client"
            }))
            .send()
            .await?;
    }

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

2. Monitoramento e Métricas

use syros::api::metrics::MetricsCollector;

async fn monitor_system() {
    let metrics = MetricsCollector::new();

    // Verificar métricas de locks
    let lock_metrics = metrics.get_lock_metrics().await;
    println!("Locks ativos: {}", lock_metrics.active_locks);
    println!("Locks adquiridos: {}", lock_metrics.total_acquired);
    println!("Locks liberados: {}", lock_metrics.total_released);
}
Enter fullscreen mode Exit fullscreen mode

3. Configuração Avançada

use syros::config::Config;

async fn setup_advanced_config() -> Result<(), anyhow::Error> {
    let config = Config::builder()
        .lock_manager()
            .default_ttl(Duration::from_secs(60))
            .max_wait_time(Duration::from_secs(30))
            .build()
        .build();

    let lock_manager = LockManager::with_config(config.lock_manager);

    // Usar o lock manager configurado...

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

Benefícios do Syros

1. Prevenção de Race Conditions

  • Garante que apenas uma aplicação modifique um recurso por vez
  • Evita condições de corrida em operações críticas

2. Consistência de Dados

  • Operações atômicas garantem integridade dos dados
  • Prevenção de overselling e inconsistências

3. Escalabilidade

  • Funciona com múltiplas instâncias de aplicação
  • Coordenação distribuída sem ponto único de falha

4. Observabilidade

  • Métricas detalhadas para monitoramento
  • Logs estruturados para debugging

5. Flexibilidade

  • Múltiplas APIs (REST, gRPC, WebSocket)
  • SDKs para diferentes linguagens
  • Configuração flexível

Próximos Passos

  1. Explore os SDKs: O Syros oferece SDKs para Python, Node.js, Java, C# e Go
  2. Implemente Sagas: Use o Saga Orchestrator para transações distribuídas complexas
  3. Configure Observabilidade: Integre com Prometheus e Grafana
  4. Deploy em Produção: Use Docker e Kubernetes para deployment

Conclusão

O Syros oferece uma solução robusta e eficiente para coordenação em sistemas distribuídos. Através deste exemplo prático, vimos como implementar locks distribuídos para garantir consistência em operações concorrentes, evitando problemas comuns como condições de corrida e inconsistências de dados.

A plataforma é especialmente útil em cenários de microserviços onde múltiplas aplicações precisam coordenar o acesso a recursos compartilhados, garantindo a integridade e consistência dos dados em todo o sistema.

Para mais informações, consulte:

Top comments (0)