Introdução
As redes sociais evoluem rapidamente. Uma única publicação pode gerar uma cadeia de reações, reformulações e até contra-movimentos inesperados. E se fosse possível prever como um cenário se desenrola antes de ocorrer no mundo real?
MiroFish faz exatamente isso: um motor de inteligência de enxame que cria mundos digitais paralelos onde milhares de agentes de IA com personalidades, memórias e comportamentos distintos interagem livremente. Basta fazer upload de materiais-base — artigo, policy draft, romance — e o MiroFish monta uma simulação detalhada de como os eventos podem se desenrolar.
💡 Dica: Para garantir APIs confiáveis desde o início, a equipe usou o Apidog para projetar, depurar e documentar todo o backend antes de implementar a lógica de simulação. Isso antecipou problemas de endpoint e manteve o backend Python e o frontend Vue sincronizados.
Este artigo detalha a arquitetura técnica do MiroFish, mostrando como transformar documentos brutos em simulações vivas, como os agentes tomam decisões e como o fluxo de trabalho de cinco etapas automatiza desde a criação do grafo de conhecimento até o monitoramento em tempo real.
Visão Geral do Sistema: O Fluxo de Trabalho de Cinco Etapas
O MiroFish processa simulações em cinco etapas práticas:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Etapa 1 │ ──► │ Etapa 2 │ ──► │ Etapa 3 │ ──► │ Etapa 4 │ ──► │ Etapa 5 │
│ Geração de │ │ Construção │ │ Configuração│ │ Execução │ │ Geração de │
│ Ontologia │ │ de GraphRAG │ │ do Ambiente │ │ da Simulação│ │ Relatório │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
Etapa 1: Geração de Ontologia
- Analise os documentos de entrada e requisitos de simulação.
- Use um LLM para gerar uma ontologia personalizada, definindo:
- 10 tipos de entidade (ex: Estudante, Professor, Universidade, etc.)
- 10 tipos de relacionamento (ex: TRABALHA_PARA, RESPONDE_A)
-
Atributos para cada tipo (evitando palavras reservadas como
name,uuid,created_at).
- Estrutura: 8 tipos específicos + 2 tipos de fallback (
PessoaeOrganização).
Etapa 2: Construção de GraphRAG
- Divida os documentos em "chunks" (500 caracteres com 50 de sobreposição).
- Envie para Zep Cloud em lotes.
- Passos:
- Crie o grafo com ID único.
- Defina a ontologia.
- Extraia entidades e relacionamentos.
- Aguarde processamento da Zep.
- Recupere o grafo final com nós e arestas.
Etapa 3: Configuração do Ambiente
- Analise o grafo e gere parâmetros detalhados:
- Tempo: padrões com base no fuso horário chinês (picos 19-22h, mortos 0-5h).
- Eventos: publicações iniciais, tópicos quentes.
- Atividades dos agentes: publicações/hora, atraso de resposta, influência.
- Plataformas: Twitter e Reddit com limites de viralidade diferenciados.
Etapa 4: Execução da Simulação
- Agentes ativam conforme cronogramas.
- Publicação, comentários e reações em tempo real.
- Execução paralela para Twitter e Reddit, registrando ações em JSONL.
Etapa 5: Geração de Relatórios
- O agente de relatórios usa três ferramentas:
- InsightForge: busca aprofundada com subconsultas.
- PanoramaSearch: visão total, incluindo fatos históricos.
- InterviewAgents: entrevistas em tempo real via IPC.
Aprofundamento Técnico: Geração de Ontologia
O gerador de ontologia (backend/app/services/ontology_generator.py) usa prompts bem definidos para o LLM. Ele impõe as seguintes regras:
- Só entidades "atuantes" são válidas (pessoas, organizações, mídias).
- Conceitos abstratos/temas são descartados.
- Pós-processamento rigoroso:
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
Assim, a ontologia gerada se mantém dentro dos limites da API e cobre todos os casos.
Construção do Grafo de Conhecimento: Integração Zep
O serviço de construção do grafo (backend/app/services/graph_builder.py) executa um fluxo assíncrono eficiente:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
Geração Dinâmica de Modelos Pydantic
Os modelos de entidade são gerados dinamicamente, conforme a ontologia:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
Isso permite validação automática na Zep.
Paginação de Grafos Grandes
Para resultados paginados, utilize o utilitário:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
Simulação de Atividade de Agentes Baseada em Tempo
O gerador de configuração (backend/app/services/simulation_config_generator.py) define padrões realistas:
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5],
"morning_hours": [6, 7, 8],
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22],
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
Diferentes tipos de agentes recebem padrões distintos:
| Tipo de Agente | Nível de Atividade | Horas Ativas | Atraso de Resposta | Influência |
|---|---|---|---|---|
| Universidade | 0.2 | 9-17 | 60-240 min | 3.0 |
| Meio de Comunicação | 0.5 | 7-23 | 5-30 min | 2.5 |
| Estudante | 0.8 | 8-12, 18-23 | 1-15 min | 0.8 |
| Professor | 0.4 | 8-21 | 15-90 min | 2.0 |
Se o LLM não retornar valores adequados, usa-se fallback baseado em regras.
Rastreamento de Ações em Tempo Real
O executor (backend/app/services/simulation_runner.py) transmite logs em JSONL:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Handle events
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # ou reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parse agent actions
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
O monitoramento ocorre em thread de background, com frontend consultando o status a cada 2 segundos.
Gerenciamento de Processos Multiplataforma
Encerrar simulações exige controle de processos cross-platform:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: use taskkill para matar árvore de processos
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: kill process group
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Registro de handlers de sinal para desligamento seguro:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Then call original handler
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
Geração de Relatórios: Recuperação de Três Níveis
O serviço de ferramentas da Zep (backend/app/services/zep_tools.py) oferece:
InsightForge (Análise Aprofundada)
Decomponha perguntas em subconsultas e agregue os resultados:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Gerar subconsultas via LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Buscar cada subconsulta
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extrair UUIDs de entidades
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Buscar detalhes das entidades
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Montar cadeias de relacionamento
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (Escopo Total)
Recupere todos os fatos, ativos e históricos:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (Tempo Real)
Converse com agentes ativos usando API de entrevista:
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Carregar perfis de agentes
profiles = self._load_agent_profiles(simulation_id)
# 2. Selecionar agentes relevantes via LLM
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Gerar perguntas de entrevista
questions = self._generate_interview_questions(...)
# 4. Chamar API real de entrevista
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Entrevista Twitter e Reddit
timeout=180.0
)
# 5. Formatar resultados
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Principais Decisões de Engenharia
1. Gerenciamento de Tarefas Assíncronas
Operações longas (grafo, simulação) usam tarefas assíncronas com status consultável:
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
O frontend acompanha via /api/graph/task/{task_id}.
2. Chamadas em Lote de LLM com Retry
Divida grandes listas de agentes em lotes de 15 e trate JSON truncado:
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
Reparo de JSON:
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. Simulação Paralela em Duas Plataformas
Twitter e Reddit rodam em paralelo com bancos de dados e logs separados:
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
A conclusão é detectada via eventos simulation_end por plataforma.
Considerações de Desempenho
Gerenciamento de Memória
- Documentos são truncados para 50k caracteres para o LLM.
- Resumos de entidades: até 300 caracteres.
- Apenas 50 ações recentes em memória (histórico completo em JSONL).
Isolamento de Banco de Dados
Cada plataforma usa seu SQLite próprio para evitar contenção em operações paralelas.
Degradação Graciosa
Fallback automático para busca local se a API da Zep falhar:
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
Conclusão
O MiroFish mostra como implementar um sistema de simulação multiagente completo, transformando documentos em mundos digitais vivos com agentes realistas.
Principais aprendizados para implementação:
- Design de Ontologia: Estrutura de dois níveis (8 tipos específicos + 2 fallback) garante abrangência sem exceder limites de API.
- Fluxos Assíncronos: Tarefas longas são rastreadas com progresso em tempo real.
- Atividade Baseada em Tempo: Uso de padrões reais de comportamento cria resultados críveis.
- Simulação Paralela: Executar Twitter e Reddit em paralelo permite comparações de dinâmica de plataforma.
- Recuperação de Três Níveis: InsightForge para profundidade, PanoramaSearch para amplitude, InterviewAgents para perspectiva direta.
Código-fonte completo disponível em github.com/666ghj/MiroFish.
Quer experimentar o MiroFish? Visite a demonstração ao vivo para ver uma simulação em ação.

Top comments (0)