DEV Community

João Victor
João Victor

Posted on

Implementação de um Microserviço para Envio de Mensagens via WhatsApp Utilizando Docker, Kafka e Evolution API

Resumo

O uso de aplicações de mensagens instantâneas, como o WhatsApp, tornou-se amplamente difundido no contexto empresarial, principalmente devido à sua facilidade de uso e ampla aceitação por usuários com baixo nível de familiaridade tecnológica. Em muitos casos, clientes apresentam dificuldades em utilizar o e-mail ou outras plataformas digitais, tornando o WhatsApp um meio mais acessível e eficiente de comunicação.

Este trabalho apresenta a implementação de um microserviço para envio automatizado de mensagens via WhatsApp, utilizando uma arquitetura baseada em microserviços, conteinerização com Docker e Docker Compose, mensageria assíncrona por meio do Apache Kafka e integração com a Evolution API. A solução proposta busca oferecer escalabilidade, desacoplamento entre componentes e facilidade de implantação, sendo adequada para cenários que demandam envio de grandes volumes de mensagens.

Palavras-chave: Microserviços; Docker; Kafka; WhatsApp; Mensageria Assíncrona.


1. Introdução

Com o avanço da transformação digital, a comunicação entre sistemas e usuários tornou-se um elemento essencial para o sucesso de aplicações modernas. O WhatsApp destaca-se como uma das principais plataformas de comunicação no Brasil, sendo amplamente utilizado para troca de mensagens, envio de documentos e atendimento ao cliente.

Apesar disso, muitas aplicações ainda dependem exclusivamente do e-mail como meio de comunicação, o que pode dificultar o acesso para usuários menos familiarizados com tecnologia. Nesse contexto, torna-se relevante o desenvolvimento de soluções que possibilitem a integração de sistemas com o WhatsApp de forma automatizada, segura e escalável.

Este trabalho tem como objetivo apresentar a implementação de um microserviço para envio de mensagens via WhatsApp, utilizando tecnologias modernas amplamente empregadas no desenvolvimento de software, como Docker, Apache Kafka e FastAPI.


2. Arquitetura da Solução

A solução proposta adota o padrão de arquitetura de microserviços, onde cada componente possui responsabilidades bem definidas e independentes. A Figura conceitual da arquitetura é composta pelos seguintes elementos:

  • API REST: responsável por receber requisições externas para envio de mensagens.

  • Apache Kafka: atua como intermediário entre a API e o serviço de envio, garantindo processamento assíncrono.

  • Consumer Kafka: responsável por consumir as mensagens e efetuar o envio.

  • Evolution API: serviço externo que realiza a integração com o WhatsApp.

  • MongoDB: banco de dados utilizado para autenticação e persistência de dados.

  • Docker e Docker Compose: responsáveis pela conteinerização e orquestração dos serviços.

A utilização de mensageria assíncrona permite que o sistema continue respondendo às requisições mesmo sob alta carga, garantindo maior confiabilidade e escalabilidade.


3. Configuração do Ambiente

Para a orquestração dos serviços foi utilizado o Docker Compose, que permite definir e executar múltiplos contêineres de forma integrada.

3.1 Arquivo docker-compose.yml

services:
  mongo:
    image: mongo
    container_name: mongo_whatsappsenderserver
    restart: always
    volumes:
      - ./mongo_data:/data/db
    env_file: .env
    environment:
      MONGO_INITDB_ROOT_USERNAME: ${MONGO_USERNAME}
      MONGO_INITDB_ROOT_PASSWORD: ${MONGO_PASSWORD}

  mongo-express:
    image: mongo-express
    container_name: mongo_express_whatsappsenderserver
    restart: always
    ports:
      - 8082:8081
    env_file: .env
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: ${MONGO_USERNAME}
      ME_CONFIG_MONGODB_ADMINPASSWORD: ${MONGO_PASSWORD}
      ME_CONFIG_BASICAUTH_USERNAME: ${ME_CONFIG_BASICAUTH_USERNAME}
      ME_CONFIG_BASICAUTH_PASSWORD: ${ME_CONFIG_BASICAUTH_PASSWORD}
      ME_CONFIG_MONGODB_URL: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@mongo:27017/
      ME_CONFIG_BASICAUTH: true

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    expose:
      - 9092
    restart: always
    environment:
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092,CONTROLLER://:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:9093'

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8080:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181


  evolution-api-db:
    image: postgres:16
    restart: unless-stopped
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: uma-senha-para-postgres
      POSTGRES_DB: evolutiondb
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7.4-alpine
    restart: always
    ports:
      - "6379:6379"
    command: redis-server --save 20 1 --loglevel warning 
    volumes:
      - redis-data:/data

  evolution-api:
    container_name: evolution_api
    image: atendai/evolution-api:latest
    restart: always
    depends_on:
      - kafka
      - evolution-api-db
      - redis
    ports:
      - "9001:9001"
    env_file:
      - .env
    environment:
      SERVER_PORT: 9001
      DATABASE_PROVIDER: postgresql
      DATABASE_CONNECTION_URI: 'postgresql://postgres:uma-senha-para-postgres@evolution-api-db:5432/evolutiondb?schema=public'
      CACHE_REDIS_ENABLED: true
      CACHE_REDIS_URI: redis://redis:6379
      CONFIG_SESSION_PHONE_VERSION: 2.3000.1031445238
      CONFIG_SESSION_PHONE_CLIENT: Evolution API
      CONFIG_SESSION_PHONE_NAME: Chrome
      AUTHENTICATION_API_KEY: ${AUTHENTICATION_API_KEY}
      AUTHENTICATION_TYPE: apikey

    volumes:
      - ./evolution_instances:/evolution/instances

  api_whatsappsenderserver:
    build: ./src
    container_name: api_whatsappsenderserver
    depends_on:
      - kafka
      - kafka-ui
      - mongo
      - evolution-api
    ports:
      - 8000:8000
    volumes:
      - ./src/:/app
    env_file: .env
    restart: always

  consumer:
    build: ./src/consumer
    container_name: consumer_whatsappsenderserver
    depends_on:
      - api_whatsappsenderserver
    env_file: .env
    environment:
      API_KEY: ${API_KEY}
    restart: always

volumes:
  postgres_data:
  redis-data:
    driver: local
Enter fullscreen mode Exit fullscreen mode

No diretório src teremos um Dockerfile para nossa API:

FROM tiangolo/uvicorn-gunicorn:python3.11-slim

WORKDIR /app

# Copia o conteúdo do diretório atual para o container em /app
COPY . /app/
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install python-dotenv

# Expõe a porta da API (porta padrão do FastAPI é 8000)
EXPOSE 8000

# Comando para executar oFastAPI com Uvicorn
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--timeout-keep-alive=65"]
Enter fullscreen mode Exit fullscreen mode

No diretório ./src/consumer/ teremos outro Dockerfile, dessa vez o do consumer:

FROM tiangolo/uvicorn-gunicorn:python3.11-slim

WORKDIR /app
# Copia o conteúdo do diretório atual para o container em /app
COPY . /app/
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install python-dotenv

CMD ["python", "consumer.py"]
Enter fullscreen mode Exit fullscreen mode

4. Implementação do Microserviço

4.1 Estrutura do Projeto

├── docker-compose.yml
├── evolution_instances
├── requirements.txt
└── src
    ├── Dockerfile
    ├── auth_handler.py
    ├── consumer
    │   ├── Dockerfile
    │   ├── __init__.py
    │   ├── consumer.py
    │   ├── requirements.txt
    │   └── whatsappsender.py
    ├── controller.py
    ├── main.py
    ├── models.py
    ├── producer.py
    ├── requirements.txt
    └── security.py
Enter fullscreen mode Exit fullscreen mode

4.2 API REST e Producer Kafka

main.py

import json
import random

# import qrcode
from fastapi import Body, Depends, FastAPI, HTTPException, status
from loguru import logger

from auth_handler import JWTBearer, decode_jwt, sign_jwt
from controller import (
    get_user,
    insert_user,
)
from models import UserLoginSchema, UserSchema, WhatsappMessage
from security import hash_password, verify_password
from producer import kafka_producer


app = FastAPI()


@app.post("/user/signup", tags=["user"])
async def create_user(user: UserSchema = Body(...)):
    """Cadastro nesta API. (Ao cadastrar a conta ainda estará desativada, algum admin
    do sistema precisará aprovar o cadastro.)"""
    logger.info("Creating user...")

    if get_user(user.email):
        logger.warning(f"User {user.email} already exists.")
        return {"error": "User already exists."}

    if len(user.password) < 6:
        return {"error": "Password must be at least 6 characters."}

    user.password = hash_password(user.password)
    insert_user({"allowed": False, **user.model_dump()})
    return {"message": "User created successfully."}


def check_user(data: UserLoginSchema):
    """Verifica as credenciais e se o usuário tem permissão pra utilizar esta API."""
    logger.info("Checking user credentials...")
    user = get_user(data.email)
    if not user:
        logger.error("Dados de login incorretos.")
        return False

    if (
        user.get("email") == data.email
        and verify_password(data.password, user.get("password"))
        and user.get("allowed")
    ):
        return True

    return False


@app.post("/user/login", tags=["user"])
async def user_login(user: UserLoginSchema = Body(...)):
    """Faz login nesta API."""
    if check_user(user):
        return sign_jwt(user.email)
    return {"error": "Wrong login details!"}

@app.post(
    "/whatsappsender/message",
    dependencies=[Depends(JWTBearer())],
    tags=["whatsappsender"],
)
async def send_message(message: WhatsappMessage = Body(...)):
    logger.info("Sending message...")
    kafka_producer.send(
        topic="messages",
        key=f"{random.randrange(999)}".encode(),
        value=json.dumps(
            {
                "instance": message.sender_instance,
                "recipient_number": message.receiver_number,
                "message": message.message,
            }
        ).encode("utf-8"),
    )
    return {"message": "Message sent successfully."}


@app.get("/", tags=["root"])
async def read_root() -> dict:
    return {"message": ""}


def get_current_user(token: str = Depends(JWTBearer())):
    """
    Obtém o identificador do usuário (email) a partir do token.
    """
    payload = decode_jwt(token)
    if not payload:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or expired token",
        )
    user_id = payload.get("user_id")
    if not user_id:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or expired token",
        )

    return user_id
Enter fullscreen mode Exit fullscreen mode

4.3 Modelos de Dados

models.py

from typing import Optional
from pydantic import BaseModel, EmailStr, Field


class UserSchema(BaseModel):
    fullname: str = Field(...)
    email: EmailStr = Field(...)
    password: str = Field(...)

    class Config:
        json_schema_extra = {
            "example": {
                "email": "joaovictorsp@x.com",
                "fullname": "João Victor",
                "password": "somepassword",
            }
        }


class UserLoginSchema(BaseModel):
    email: EmailStr = Field(...)
    password: str = Field(...)

    class Config:
        json_schema_extra = {
            "example": {"email": "joaovictorsp@x.com", "password": "somepassword"}
        }


class WhatsappLogin(BaseModel):
    phone_number: str = Field(...)
    instance_name: str = Field(...)


class WhatsappMessage(BaseModel):
    message: str = Field(...)
    is_message_base64: bool = Field(default=False)
    sender_number: Optional[str] = ""
    sender_instance: str = Field(...)
    sender_name: Optional[str] = ""
    receiver_number: str = Field(...)
    receiver_name: Optional[str] = ""
    timestamp: Optional[str] = ""
    attachment_path: Optional[str] = ""
    sent: bool = Field(default=False)

    class Config:
        json_schema_extra = {
            "example": {
                "message": "Mensagem de teste",
                "is_message_base64": False,
                "sender_number": "sender_number",
                "sender_instance": "sender_instance",
                "sender_name": "sender_name",
                "receiver_number": "receiver_number",
                "receiver_instance": "receiver_instance",
                "receiver_name": "receiver_name",
                "timestamp": "timestamp",
                "attachment_path": "path/to/attachment",
                "sent": False,
            }
        }
Enter fullscreen mode Exit fullscreen mode

4.4 Autenticação JWT

auth_handler.py

import time
from typing import Dict

import jwt
import os
from loguru import logger
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials


JWT_SECRET = os.environ.get("JWT_SECRET")
JWT_ALGORITHM = os.environ.get("ENCRYPTION_ALGORITHM", "HS256")


def token_response(token: str):
    return {"access_token": token}


def sign_jwt(user_id: str) -> Dict[str, str]:
    payload = {"user_id": user_id, "expires": time.time() + 60 * 60 * 24}
    token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
    return token_response(token)


def decode_jwt(token: str) -> dict:
    try:
        decoded_token = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
        return decoded_token if decoded_token.get("expires") >= time.time() else None
    except Exception as e:
        logger.error(e)
        return {}


class JWTBearer(HTTPBearer):
    def __init__(self, auto_error: bool = True):
        super(JWTBearer, self).__init__(auto_error=auto_error)

    async def __call__(self, request: Request):
        credentials: HTTPAuthorizationCredentials = await super(
            JWTBearer, self
        ).__call__(request)
        if credentials:
            if not credentials.scheme == "Bearer":
                raise HTTPException(
                    status_code=403, detail="Invalid authentication scheme."
                )
            if not self.verify_jwt(credentials.credentials):
                raise HTTPException(
                    status_code=403, detail="Invalid token or expired token."
                )
            return credentials.credentials
        else:
            raise HTTPException(status_code=403, detail="Invalid authorization code.")

    def verify_jwt(self, jwtoken: str) -> bool:
        is_token_valid: bool = False

        try:
            payload = decode_jwt(jwtoken)
        except Exception as e:
            logger.warning(e)
            payload = None
        if payload:
            is_token_valid = True

        return is_token_valid
Enter fullscreen mode Exit fullscreen mode

4.5 Producer Kafka

producer.py

import os
from kafka import KafkaProducer

KAFKA_HOST = os.environ.get("KAFKA_HOST", "kafka")
KAFKA_PORT = os.environ.get("KAFKA_PORT", "9092")

kafka_producer = KafkaProducer(bootstrap_servers=f"{KAFKA_HOST}:{KAFKA_PORT}")
Enter fullscreen mode Exit fullscreen mode

5. Implementação do Consumer

consumer.py

import os
import json
from kafka import KafkaConsumer
from loguru import logger
from whatsappsender import send_message
import threading

logger.add("consumer.log")

KAFKA_PORT = os.environ.get("KAFKA_PORT", "9092")

consumer = KafkaConsumer(
    "messages",
    auto_offset_reset="earliest",
    group_id="messages-group1",
    bootstrap_servers=[f"kafka:{KAFKA_PORT}"],
)

for message in consumer:
    logger.info(
        f"""
        topic     => {message.topic}
        partition => {message.partition}
        offset    => {message.offset}
        key={message.key} value={message.value}
    """
    )
    content = json.loads(message.value.decode("utf-8"))
    t = threading.Thread(
        target=send_message,
        args=(
            content.get("instance"),
            content.get("recipient_number"),
            content.get("message"),
        ),
    )
    t.start()

Enter fullscreen mode Exit fullscreen mode

whatsappsender.py

import os
import requests
from loguru import logger

logger.add("whatsappsender.log")

EVOLUTION_API_BASE_URL = os.environ.get("EVOLUTION_URL", "evolution-api")


def create_instance(phone_number: str):
    payload = {
        "instanceName": "minhainstancia",
        "token": "",
        "qrcode": True,
        "number": phone_number,
        "integration": "WHATSAPP-BAILEYS",
        "groups_ignore": True,
    }
    response = requests.post(EVOLUTION_API_BASE_URL + "/instance/create", json=payload)
    if response.status_code >= 200 and response.status_code < 300:
        return response.json()


def send_message(instance: str, phone_number: str, message: str):
    payload = {"number": phone_number, "text": message}
    headers = {"apikey": os.environ.get("AUTHENTICATION_API_KEY")}
    response = requests.post(
        f"{EVOLUTION_API_BASE_URL}/message/sendText/{instance}",
        json=payload,
        headers=headers,
    )
    if response.status_code >= 200 and response.status_code < 300:
        logger.info("Message sent successfully.")
        return response.json()

    logger.error("Failed to send message.")
    logger.error(response.content)
    return None

Enter fullscreen mode Exit fullscreen mode

O restante do código fonte pode ser acessado em:
https://github.com/joaovictor01/WhatsappSenderServer/tree/dev

6. Discussão dos Resultados

A utilização do Apache Kafka permitiu o desacoplamento entre a API e o serviço de envio de mensagens, garantindo maior escalabilidade e tolerância a falhas. A conteinerização com Docker simplificou a implantação e padronizou o ambiente de execução, reduzindo problemas de compatibilidade.

A arquitetura demonstrou-se adequada para cenários com múltiplos clientes e alto volume de mensagens, possibilitando futuras expansões, como monitoramento, filas de prioridade e reenvio automático.


7. Conclusão

Este trabalho apresentou a implementação de um microserviço para envio de mensagens via WhatsApp utilizando uma arquitetura moderna baseada em microserviços, mensageria assíncrona e conteinerização. A solução mostrou-se eficiente, escalável e adequada para aplicações que demandam comunicação rápida e acessível com usuários finais.

Como trabalhos futuros, podem ser implementadas funcionalidades adicionais, como envio de arquivos, métricas de desempenho e autenticação avançada.


Referências

  • Docker Documentation.

  • Apache Kafka Documentation.

  • FastAPI Documentation.

  • Evolution API Documentation.

Top comments (0)