DEV Community

Cover image for Como a MiroFish Cria Mundos Digitais Paralelos
Lucas
Lucas

Posted on • Originally published at apidog.com

Como a MiroFish Cria Mundos Digitais Paralelos

Introdução

As redes sociais evoluem rapidamente. Uma única publicação pode gerar uma cadeia de reações, reformulações e até contra-movimentos inesperados. E se fosse possível prever como um cenário se desenrola antes de ocorrer no mundo real?

Experimente o Apidog hoje

MiroFish faz exatamente isso: um motor de inteligência de enxame que cria mundos digitais paralelos onde milhares de agentes de IA com personalidades, memórias e comportamentos distintos interagem livremente. Basta fazer upload de materiais-base — artigo, policy draft, romance — e o MiroFish monta uma simulação detalhada de como os eventos podem se desenrolar.

💡 Dica: Para garantir APIs confiáveis desde o início, a equipe usou o Apidog para projetar, depurar e documentar todo o backend antes de implementar a lógica de simulação. Isso antecipou problemas de endpoint e manteve o backend Python e o frontend Vue sincronizados.

Este artigo detalha a arquitetura técnica do MiroFish, mostrando como transformar documentos brutos em simulações vivas, como os agentes tomam decisões e como o fluxo de trabalho de cinco etapas automatiza desde a criação do grafo de conhecimento até o monitoramento em tempo real.

Fluxo de trabalho

Visão Geral do Sistema: O Fluxo de Trabalho de Cinco Etapas

O MiroFish processa simulações em cinco etapas práticas:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Etapa 1   │ ──► │   Etapa 2   │ ──► │   Etapa 3   │ ──► │   Etapa 4   │ ──► │   Etapa 5   │
│  Geração de │     │ Construção  │     │ Configuração│     │  Execução   │     │  Geração de │
│  Ontologia  │     │ de GraphRAG │     │ do Ambiente │     │ da Simulação│     │   Relatório │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
Enter fullscreen mode Exit fullscreen mode

Etapa 1: Geração de Ontologia

  • Analise os documentos de entrada e requisitos de simulação.
  • Use um LLM para gerar uma ontologia personalizada, definindo:
    • 10 tipos de entidade (ex: Estudante, Professor, Universidade, etc.)
    • 10 tipos de relacionamento (ex: TRABALHA_PARA, RESPONDE_A)
    • Atributos para cada tipo (evitando palavras reservadas como name, uuid, created_at).
  • Estrutura: 8 tipos específicos + 2 tipos de fallback (Pessoa e Organização).

Etapa 2: Construção de GraphRAG

  • Divida os documentos em "chunks" (500 caracteres com 50 de sobreposição).
  • Envie para Zep Cloud em lotes.
  • Passos:
    1. Crie o grafo com ID único.
    2. Defina a ontologia.
    3. Extraia entidades e relacionamentos.
    4. Aguarde processamento da Zep.
    5. Recupere o grafo final com nós e arestas.

Etapa 3: Configuração do Ambiente

  • Analise o grafo e gere parâmetros detalhados:
    • Tempo: padrões com base no fuso horário chinês (picos 19-22h, mortos 0-5h).
    • Eventos: publicações iniciais, tópicos quentes.
    • Atividades dos agentes: publicações/hora, atraso de resposta, influência.
    • Plataformas: Twitter e Reddit com limites de viralidade diferenciados.

Etapa 4: Execução da Simulação

  • Agentes ativam conforme cronogramas.
  • Publicação, comentários e reações em tempo real.
  • Execução paralela para Twitter e Reddit, registrando ações em JSONL.

Etapa 5: Geração de Relatórios

  • O agente de relatórios usa três ferramentas:
    • InsightForge: busca aprofundada com subconsultas.
    • PanoramaSearch: visão total, incluindo fatos históricos.
    • InterviewAgents: entrevistas em tempo real via IPC.

Aprofundamento Técnico: Geração de Ontologia

O gerador de ontologia (backend/app/services/ontology_generator.py) usa prompts bem definidos para o LLM. Ele impõe as seguintes regras:

  • Só entidades "atuantes" são válidas (pessoas, organizações, mídias).
  • Conceitos abstratos/temas são descartados.
  • Pós-processamento rigoroso:
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Zep API limits: max 10 entity types, max 10 edge types
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Ensure fallback types exist
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Trim if adding fallbacks would exceed limit
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result
Enter fullscreen mode Exit fullscreen mode

Assim, a ontologia gerada se mantém dentro dos limites da API e cobre todos os casos.


Construção do Grafo de Conhecimento: Integração Zep

O serviço de construção do grafo (backend/app/services/graph_builder.py) executa um fluxo assíncrono eficiente:

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Create graph
    graph_id = self.create_graph(graph_name)

    # 2. Set ontology
    self.set_ontology(graph_id, ontology)

    # 3. Chunk text
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Send batches
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Wait for Zep processing
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Retrieve final graph
    graph_info = self._get_graph_info(graph_id)
Enter fullscreen mode Exit fullscreen mode

Geração Dinâmica de Modelos Pydantic

Os modelos de entidade são gerados dinamicamente, conforme a ontologia:

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class
Enter fullscreen mode Exit fullscreen mode

Isso permite validação automática na Zep.

Paginação de Grafos Grandes

Para resultados paginados, utilize o utilitário:

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes
Enter fullscreen mode Exit fullscreen mode

Simulação de Atividade de Agentes Baseada em Tempo

O gerador de configuração (backend/app/services/simulation_config_generator.py) define padrões realistas:

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],
    "morning_hours": [6, 7, 8],
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}
Enter fullscreen mode Exit fullscreen mode

Diferentes tipos de agentes recebem padrões distintos:

Tipo de Agente Nível de Atividade Horas Ativas Atraso de Resposta Influência
Universidade 0.2 9-17 60-240 min 3.0
Meio de Comunicação 0.5 7-23 5-30 min 2.5
Estudante 0.8 8-12, 18-23 1-15 min 0.8
Professor 0.4 8-21 15-90 min 2.0

Se o LLM não retornar valores adequados, usa-se fallback baseado em regras.


Rastreamento de Ações em Tempo Real

O executor (backend/app/services/simulation_runner.py) transmite logs em JSONL:

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            # Handle events
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # ou reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            # Parse agent actions
            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()
Enter fullscreen mode Exit fullscreen mode

O monitoramento ocorre em thread de background, com frontend consultando o status a cada 2 segundos.


Gerenciamento de Processos Multiplataforma

Encerrar simulações exige controle de processos cross-platform:

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        # Windows: use taskkill para matar árvore de processos
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        # Unix: kill process group
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Enter fullscreen mode Exit fullscreen mode

Registro de handlers de sinal para desligamento seguro:

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Then call original handler

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)
Enter fullscreen mode Exit fullscreen mode

Geração de Relatórios: Recuperação de Três Níveis

O serviço de ferramentas da Zep (backend/app/services/zep_tools.py) oferece:

InsightForge (Análise Aprofundada)

Decomponha perguntas em subconsultas e agregue os resultados:

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    # 1. Gerar subconsultas via LLM
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    # 2. Buscar cada subconsulta
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    # 3. Extrair UUIDs de entidades
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    # 4. Buscar detalhes das entidades
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    # 5. Montar cadeias de relacionamento
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)
Enter fullscreen mode Exit fullscreen mode

PanoramaSearch (Escopo Total)

Recupere todos os fatos, ativos e históricos:

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)
Enter fullscreen mode Exit fullscreen mode

InterviewAgents (Tempo Real)

Converse com agentes ativos usando API de entrevista:

def interview_agents(self, simulation_id: str, interview_requirement: str):
    # 1. Carregar perfis de agentes
    profiles = self._load_agent_profiles(simulation_id)

    # 2. Selecionar agentes relevantes via LLM
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    # 3. Gerar perguntas de entrevista
    questions = self._generate_interview_questions(...)

    # 4. Chamar API real de entrevista
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,  # Entrevista Twitter e Reddit
        timeout=180.0
    )

    # 5. Formatar resultados
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Enter fullscreen mode Exit fullscreen mode

Principais Decisões de Engenharia

1. Gerenciamento de Tarefas Assíncronas

Operações longas (grafo, simulação) usam tarefas assíncronas com status consultável:

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id
Enter fullscreen mode Exit fullscreen mode

O frontend acompanha via /api/graph/task/{task_id}.

2. Chamadas em Lote de LLM com Retry

Divida grandes listas de agentes em lotes de 15 e trate JSON truncado:

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)
Enter fullscreen mode Exit fullscreen mode

Reparo de JSON:

def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content
Enter fullscreen mode Exit fullscreen mode

3. Simulação Paralela em Duas Plataformas

Twitter e Reddit rodam em paralelo com bancos de dados e logs separados:

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
Enter fullscreen mode Exit fullscreen mode

A conclusão é detectada via eventos simulation_end por plataforma.


Considerações de Desempenho

Gerenciamento de Memória

  • Documentos são truncados para 50k caracteres para o LLM.
  • Resumos de entidades: até 300 caracteres.
  • Apenas 50 ações recentes em memória (histórico completo em JSONL).

Isolamento de Banco de Dados

Cada plataforma usa seu SQLite próprio para evitar contenção em operações paralelas.

Degradação Graciosa

Fallback automático para busca local se a API da Zep falhar:

try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"Zep Search API failed, falling back to local search: {e}")
    return self._local_search(graph_id, query, limit, scope)
Enter fullscreen mode Exit fullscreen mode

Conclusão

O MiroFish mostra como implementar um sistema de simulação multiagente completo, transformando documentos em mundos digitais vivos com agentes realistas.

Principais aprendizados para implementação:

  1. Design de Ontologia: Estrutura de dois níveis (8 tipos específicos + 2 fallback) garante abrangência sem exceder limites de API.
  2. Fluxos Assíncronos: Tarefas longas são rastreadas com progresso em tempo real.
  3. Atividade Baseada em Tempo: Uso de padrões reais de comportamento cria resultados críveis.
  4. Simulação Paralela: Executar Twitter e Reddit em paralelo permite comparações de dinâmica de plataforma.
  5. Recuperação de Três Níveis: InsightForge para profundidade, PanoramaSearch para amplitude, InterviewAgents para perspectiva direta.

Código-fonte completo disponível em github.com/666ghj/MiroFish.

Quer experimentar o MiroFish? Visite a demonstração ao vivo para ver uma simulação em ação.

Top comments (0)