DEV Community

Hugo Marques
Hugo Marques

Posted on • Edited on

12 1 2 1 1

Estudo de caso: Thread pools e Out-of-memory

Em sistemas que utilizam processamento assíncrono, como chamadas gRPC não bloqueantes, é comum encontrar cenários onde o gerenciamento inadequado de pools de threads pode levar a problemas de desempenho e até mesmo a um OutOfMemoryError (OOM). Um caso típico ocorre quando o mesmo pool de threads é utilizado tanto para enviar requisições quanto para processar as respostas. Isso pode resultar em um acúmulo de tarefas pendentes que consomem toda a memória disponível, causando um OOM.

Esse artigo descreve o problema que eu passei a semana investigando e como podemos atacá-lo! Vamos lá 👊!

Simulando o problema

Vamos considerar um exemplo onde um FixedThreadPool é utilizado para enviar milhões de requisições a um "fake" cliente gRPC e, simultaneamente, processar as respostas dessas requisições. O código abaixo demonstra como esse cenário pode levar a um OOM:

public class ThreadPoolsOOMExample {
    private static final int THREAD_POOL_SIZE = 10;
    private static final int TOTAL_TASKS = 1_000_000;

    private final ExecutorService mainExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);

    public static void main(String[] args) {
        ThreadPoolsOOMExample example = new ThreadPoolsOOMExample();
        example.run();
    }

    public void run() {
        for (int i = 0; i < TOTAL_TASKS; i++) {
            mainExecutor.submit(this::submitRequest);
        }
    }

    private void submitRequest() {
        // Simula o envio de uma requisição para o cliente gRPC
        CompletableFuture<Response> future = asyncGrpcCall();

        // Processa a resposta usando o mesmo executor
        future.thenApplyAsync(this::processResponse, mainExecutor);
    }

    private CompletableFuture<Response> asyncGrpcCall() {
        // Simula uma chamada gRPC assíncrona
        CompletableFuture<Response> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(100); // Simula o atraso da rede
                future.complete(new Response(512));
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        }).start();
        int queueSize = ((ThreadPoolExecutor) mainExecutor).getQueue().size();
        System.out.println("Current queue size: " + queueSize);
        printHeapSize();
        return future;
    }

    private Response processResponse(Response response) {
        // Processa a resposta do cliente gRPC
        System.out.println("Processando resposta");
        return response;
    }

    private void printHeapSize() {
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        long maxMemory = runtime.maxMemory();

        System.out.println("Heap size (total): " + totalMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (used): " + usedMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (max): " + maxMemory / (1024 * 1024) + " MB");
    }

    public static class Response {
        private byte[] data;

        public Response(int sizeInKB) {
            this.data = new byte[sizeInKB * 1024]; // 1 KB = 1024 bytes
        }

        public byte[] getData() {
            return data;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Neste exemplo, o mainExecutor é utilizado tanto para enviar requisições quanto para processar as respostas. Como o número de tarefas é muito grande (TOTAL_TASKS = 1000000), as respostas ficam acumuladas no final da fila do mainExecutor, esperando que todas as requisições sejam enviadas primeiro. Isso leva a um acúmulo de respostas pendentes, consumindo muita memória e eventualmente causando um OutOfMemoryError.

Possíveis soluções

1. Use Pools de Threads Separados

Uma solução eficaz é utilizar pools de threads separados para a produção de requisições e o processamento de respostas. Isso garante que as respostas possam ser processadas independentemente do envio de novas requisições, evitando o acúmulo de tarefas na fila.

package com.hugodesmarques.threads;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class SeparateThreadPoolsExample {
    private static final int PRODUCER_THREAD_POOL_SIZE = 10;
    private static final int CONSUMER_THREAD_POOL_SIZE = 10;
    private static final int TOTAL_TASKS = 1_000_000;

    private final ExecutorService producerThreadPool = Executors.newFixedThreadPool(PRODUCER_THREAD_POOL_SIZE);
    private final ExecutorService consumerThreadPool = Executors.newFixedThreadPool(CONSUMER_THREAD_POOL_SIZE);

    public static void main(String[] args) {
        SeparateThreadPoolsExample example = new SeparateThreadPoolsExample();
        example.run();
    }

    public void run() {
        for (int i = 0; i < TOTAL_TASKS; i++) {
            producerThreadPool.submit(this::submitRequest);
            int producerQueueSize = ((ThreadPoolExecutor) producerThreadPool).getQueue().size();
            System.out.println("Producer queue size: " + producerQueueSize);
        }
    }

    private void submitRequest() {
        // Simula o envio de uma requisição para o cliente gRPC
        CompletableFuture<Response> future = asyncGrpcCall();

        // Processa a resposta usando o mesmo executor
        future.thenApplyAsync(this::processResponse, consumerThreadPool);
    }

    private CompletableFuture<Response> asyncGrpcCall() {
        // Simula uma chamada gRPC assíncrona
        CompletableFuture<Response> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(100); // Simula o atraso da rede
                future.complete(new Response(512));
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        }).start();
        int consumerQueueSize = ((ThreadPoolExecutor) consumerThreadPool).getQueue().size();
        System.out.println("Consumer queue size: " + consumerQueueSize);
        printHeapSize();
        return future;
    }

    private Response processResponse(Response response) {
        // Processa a resposta do cliente gRPC
        System.out.println("Processando resposta");
        return response;
    }

    private void printHeapSize() {
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        long maxMemory = runtime.maxMemory();

        System.out.println("Heap size (total): " + totalMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (used): " + usedMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (max): " + maxMemory / (1024 * 1024) + " MB");
    }

    public static class Response {
        private byte[] data;

        public Response(int sizeInKB) {
            this.data = new byte[sizeInKB * 1024]; // 1 KB = 1024 bytes
        }

        public byte[] getData() {
            return data;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2. Use um Executor Com Capacidade Limitada

Outra abordagem é limitar a capacidade do ThreadPoolExecutor para evitar que ele aceite mais tarefas do que pode processar. Isso pode ser feito usando um BlockingQueue com capacidade limitada e uma política de rejeição adequada.

package com.hugodesmarques.threads;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolWithLimitsExample {

    private static final int TOTAL_TASKS = 1_000_000;
    private static final int THREAD_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;

    private final ExecutorService mainExecutor = new ThreadPoolExecutor(
            THREAD_POOL_SIZE,
            THREAD_POOL_SIZE,
            0L,
            TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(QUEUE_CAPACITY),
            new NamedThreadFactory("Producer"), // ThreadFactory com expressão lambda)
            new RejectionLoggingPolicy(new ThreadPoolExecutor.CallerRunsPolicy()) // Política de rejeição com log
    );

    public static void main(String[] args) {
        ThreadPoolWithLimitsExample example = new ThreadPoolWithLimitsExample();
        example.run();
    }

    public void run() {
        for (int i = 0; i < TOTAL_TASKS; i++) {
            mainExecutor.submit(this::submitRequest);
        }
    }

    private void submitRequest() {
        // Simula o envio de uma requisição para o cliente gRPC
        CompletableFuture<Response> future = asyncGrpcCall();

        // Processa a resposta usando o mesmo executor
        future.thenApplyAsync(this::processResponse, mainExecutor);
    }

    private CompletableFuture<Response> asyncGrpcCall() {
        // Simula uma chamada gRPC assíncrona
        CompletableFuture<Response> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(100); // Simula o atraso da rede
                future.complete(new Response(512));
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        }).start();
        int queueSize = ((ThreadPoolExecutor) mainExecutor).getQueue().size();
        System.out.println("Current queue size: " + queueSize);
        printHeapSize();
        return future;
    }

    private Response processResponse(Response response) {
        // Processa a resposta do cliente gRPC
        System.out.println("Processando resposta... thread: " + Thread.currentThread().getName());
        return response;
    }

    private void printHeapSize() {
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        long maxMemory = runtime.maxMemory();

        System.out.println("Heap size (total): " + totalMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (used): " + usedMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (max): " + maxMemory / (1024 * 1024) + " MB");
    }

    // Política de rejeição personalizada que registra um log quando uma tarefa é rejeitada
    static class RejectionLoggingPolicy implements RejectedExecutionHandler {
        private final RejectedExecutionHandler handler;

        public RejectionLoggingPolicy(RejectedExecutionHandler handler) {
            this.handler = handler;
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("Tarefa rejeitada: " + r.toString() + " thread: " + Thread.currentThread().getName());
            handler.rejectedExecution(r, executor);
        }
    }

    // ThreadFactory personalizado para nomear threads
    static class NamedThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        public NamedThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, namePrefix + "-thread-" + threadNumber.getAndIncrement());
        }
    }

    public static class Response {
        private byte[] data;

        public Response(int sizeInKB) {
            this.data = new byte[sizeInKB * 1024]; // 1 KB = 1024 bytes
        }

        public byte[] getData() {
            return data;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

3. Controle o fluxo de produção com o uso de semáforos

Implementar controle de fluxo é uma abordagem eficaz para garantir que a produção de novas requisições não ultrapasse a capacidade do sistema de processar respostas. Isso pode ser feito usando semáforos (Semaphore) ou outras técnicas de controle de fluxo. A ideia é limitar o número de requisições simultâneas para evitar sobrecarregar o sistema.

package com.hugodesmarques.threads;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class ThreadPoolsWithSemaphores {
    private static final int THREAD_POOL_SIZE = 10;
    private static final int TOTAL_TASKS = 1_000_000;
    private static final int MAX_CONCURRENT_REQUESTS = 100;
    private final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS);

    private final ExecutorService mainExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);

    public static void main(String[] args) {
        ThreadPoolsWithSemaphores example = new ThreadPoolsWithSemaphores();
        example.run();
    }

    public void run() {
        for (int i = 0; i < TOTAL_TASKS; i++) {
            try {
                System.out.println("Submetendo request: " + i);
                semaphore.acquire();
                mainExecutor.submit(this::submitRequest);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void submitRequest() {
        try {
            // Simula o envio de uma requisição para o cliente gRPC
            CompletableFuture<Response> future = asyncGrpcCall();

            // Processa a resposta usando o mesmo executor
            future.thenApplyAsync(this::processResponse, mainExecutor)
                    .whenComplete((result, ex) -> semaphore.release());
        } catch (Exception e) {
            semaphore.release();
        }
    }

    private CompletableFuture<Response> asyncGrpcCall() {
        // Simula uma chamada gRPC assíncrona
        CompletableFuture<Response> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(100); // Simula o atraso da rede
                future.complete(new Response(512));
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        }).start();
        printHeapSize();
        return future;
    }

    private Response processResponse(Response response) {
        // Processa a resposta do cliente gRPC
        System.out.println("Processando resposta");
        return response;
    }

    private void printHeapSize() {
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        long maxMemory = runtime.maxMemory();

        System.out.println("Heap size (total): " + totalMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (used): " + usedMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (max): " + maxMemory / (1024 * 1024) + " MB");
    }

    public static class Response {
        private byte[] data;

        public Response(int sizeInKB) {
            this.data = new byte[sizeInKB * 1024]; // 1 KB = 1024 bytes
        }

        public byte[] getData() {
            return data;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Conclusão

O gerenciamento adequado de pools de threads em sistemas assíncronos é crucial para evitar problemas de desempenho e OutOfMemoryError (OOM). Utilizar pools de threads separados, limitar a capacidade do ThreadPoolExecutor e implementar controle de fluxo são abordagens eficazes para garantir que a produção de novas requisições não ultrapasse a capacidade do sistema de processar respostas.

Ao aplicar essas soluções, você pode garantir que seu sistema permaneça eficiente e responsivo, mesmo sob alta carga, evitando o acúmulo excessivo de tarefas na fila e prevenindo OOM.

Todos os exemplos deste post estão disponíveis no meu repositório:

GitHub logo hugomarques / sandbox

Repo to store POC, experiments, tutorials.

sandbox

Repo to store POC, experiments, tutorials.

  • com.hugodesmarques.parallel - Experimento para o artigo "Paralelismo e Concorrência 102: Java parallel streams na prática".
  • com.hugodesmarques.parallel.benchmark - Código de benchmark com a ferramenta jmh pra verificar os experimentos com parallel vs sequential.
  • com.hugodesmarques.threads - Experimento para o artigo "Estudo de caso: Thread pools e OOM".





Fica a dica, pra você não passar pela mesma dor de cabeça que eu passei 😅.

AWS Security LIVE!

Join us for AWS Security LIVE!

Discover the future of cloud security. Tune in live for trends, tips, and solutions from AWS and AWS Partners.

Learn More

Top comments (1)

Collapse
 
rponte profile image
Rafael Ponte

Excelente artigo, Hugo! Valeu por compartlihar! 👏🏻👏🏻

Retry later