Quando iniciei a construção de uma assistente para a minha startup, a Rota Nativa, comecei com uma abordagem simples: cada nova mensagem recebida do WhatsApp era tratada em um bloco central de lógica com diversas verificações condicionais — if, elif, e algumas chamadas diretas à API da OpenAI para gerar respostas. Esse formato funcionava, mas rapidamente se tornou difícil de manter. O código cresceu, e com ele vieram decisões condicionais complexas, problemas com idempotência, dificuldade em auditar decisões e falta de clareza sobre o estado da conversa.
Para modularizar melhor o comportamento da assistente, passei a utilizar o conceito de Agents da OpenAI. Isso me ajudou a encapsular papéis como “comunicador”, “classificador de intenção” e “refinador de mensagens”. A modularização melhorou, mas ainda assim era necessário coordenar as interações entre os agentes, o que voltou a trazer estruturas condicionais crescentes. Foi nesse ponto que adotei o modelo de Flows do CrewAI. A proposta de encapsular toda a lógica de controle em um fluxo orientado a eventos, com estados tipados e roteamento explícito, resolveu uma série de problemas técnicos de uma só vez. Abaixo compartilho alguns dos aprendizados aplicados.
Modelo de Estado
Para o fluxo da conversa, defini um modelo de estado usando BaseModel do Pydantic, que armazena os principais dados da mensagem recebida:
class WhatsappState(BaseModel):
zap_in_event: Optional[ZapInEvent] = None
has_message: bool = False
has_message_errors: bool = False
message_type: Optional[Literal["text", "reaction"]] = None
phone_number: str = ""
profile_name: str = ""
message_id: str = ""
message_content: str = ""
user: Optional[User] = None
first_time: bool = False
latest_messages: Optional[List[ZapEvent]] = None
Com esse modelo, consigo controlar o fluxo inteiro com base em um único snapshot da execução.
Fluxo inicial com validação de evento e roteamento
A entrada de uma nova mensagem aciona o fluxo:
class WhatsappFlow(Flow[WhatsappState]):
@start()
def initialize_state(self):
zap_in_event = self.state.zap_in_event
self.state.has_message = zap_in_event.has_message()
self.state.has_message_errors = zap_in_event.has_message_errors()
...
Após inicializar, faço o roteamento condicional para diferentes caminhos:
@router(initialize_state)
def validate_event(self):
if self.state.has_message and not self.state.has_message_errors:
return "supported_event"
elif self.state.has_message and self.state.has_message_errors:
return "event_with_errors"
return "unsupported_event"
A legibilidade e rastreabilidade do fluxo melhoraram significativamente com essa estrutura.
Controle de idempotência e carga de estado
@listen("supported_event")
async def load_or_create_user_state(self):
...
self.state.latest_messages = await self.get_latest_messages()
self.idempotency_validation()
def idempotency_validation(self):
for message in self.state.latest_messages:
if self.state.message_id == message.message_id:
raise MessageAlreadyProcessedException
Esse controle era difícil de manter em estruturas anteriores e agora faz parte nativa do fluxo.
Segmentação de contexto e classificação de intenção
Ao detectar que o usuário tem mensagens válidas, passo para um novo flow que gerencia a conversa e toma decisões com base nas últimas mensagens:
class ConversationFlow(Flow[ConversationState]):
@start()
async def classify_intention(self):
...
result = IntentCrew().crew().kickoff(
inputs={
"last_intent": dump_state["intent"],
"last_messages": self.format_messages(self.state.text_messages[:qtd_messages])
})
...
A quantidade de mensagens analisadas é decidida dinamicamente com um modelo que segmenta o contexto atual:
def segment_context(self) -> int:
llm = LLM(model="gpt-4.1-nano", temperature=0)
response = llm.call(...)
...
Roteamento baseado em primeira vez ou retorno
A resposta enviada varia dependendo se é a primeira vez do usuário:
@router(mark_as_read_and_display_typing)
def is_beginning_conversation(self):
if self.state.first_time:
return "start_conversation"
return "continue_conversation"
Isso permite personalizar o onboarding sem acoplar múltiplos ifs dentro do mesmo nó.
Modularização de resposta
A resposta final ao usuário é gerada por uma CommunicationCrew, e enviada respeitando as regras de segmentação e formatação da plataforma (WhatsApp):
@listen("other")
async def handle_other(self):
...
result = CommunicationCrew().crew().kickoff(inputs={...})
await self.send_messages(result.pydantic.messages)
Conclusão
Ao adotar o CrewAI Flow, consegui transformar uma lógica de atendimento inicialmente centralizada, com diversos condicionais difíceis de manter, em um fluxo modular, auditável e altamente extensível.
Cada etapa hoje está isolada, com controle claro de entrada, saída e estado persistido. As decisões são roteadas explicitamente, o que me permite adaptar o comportamento da assistente sem retrabalho.
Essa abordagem tem sido essencial para garantir que o produto evolua com estabilidade e agilidade. Se você está enfrentando desafios semelhantes com lógica condicional dispersa e crescimento da complexidade, vale explorar esse modelo.
Top comments (0)