Forem

Cover image for Cómo MiroFish Crea Mundos Paralelos Digitales
Roobia
Roobia

Posted on • Originally published at apidog.com

Cómo MiroFish Crea Mundos Paralelos Digitales

Introducción

Las redes sociales se mueven rápido. Una sola publicación puede desencadenar cascadas de reacciones, remodelaciones y contramovimientos que nadie predijo. ¿Qué pasaría si pudieras ver cómo se desarrolla un escenario antes de que suceda en el mundo real?

Prueba Apidog hoy

MiroFish hace exactamente eso. Es un motor de inteligencia de enjambre que crea mundos paralelos digitales donde miles de agentes de IA con personalidades, recuerdos y patrones de comportamiento distintos interactúan libremente. Subes material de origen —un artículo de noticias, un borrador de política, incluso una novela— y MiroFish construye una simulación de alta fidelidad de cómo podrían desarrollarse los eventos.

💡 La construcción de MiroFish requirió una base sólida para las pruebas de API. El equipo utilizó Apidog para diseñar, depurar y documentar todas las API de backend antes de escribir la lógica de simulación. Esto permitió detectar problemas en los endpoints tempranamente y mantener sincronizados el backend de Python y el frontend de Vue durante todo el desarrollo.

Esta publicación desglosa la arquitectura técnica detrás de MiroFish. Aprenderás cómo el sistema transforma documentos brutos en simulaciones vivas, cómo los agentes toman decisiones y cómo el flujo de trabajo de cinco pasos orquesta todo, desde la construcción del grafo de conocimiento hasta la monitorización en tiempo real.

Diagrama de flujo de trabajo de MiroFish

Visión General del Sistema: El Flujo de Trabajo de Cinco Pasos

MiroFish procesa simulaciones a través de cinco fases distintas:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Paso 1    │ ──► │   Paso 2    │ ──► │   Paso 3    │ ──► │   Paso 4    │ ──► │   Paso 5    │
│  Generación │     │ Construcción│     │ Config. del │     │ Ejecución de│     │  Generación │
│  de Ontología│     │  de GraphRAG│     │  Entorno    │     │ Simulación  │     │ de Informe  │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
Enter fullscreen mode Exit fullscreen mode

Paso 1: Generación de Ontología

El sistema analiza tus documentos de entrada y requisitos de simulación, y utiliza un LLM para generar una ontología personalizada. Esto define:

  • 10 tipos de entidades (ej: Estudiante, Profesor, Universidad, Medio de Comunicación, Agencia Gubernamental)
  • 10 tipos de relaciones (ej: TRABAJA_PARA, COMENTA_SOBRE, RESPONDE_A)
  • Atributos para cada tipo (evitando palabras reservadas como name, uuid, created_at)

La ontología aplica una estructura de dos niveles: 8 tipos específicos basados en tu contenido, más 2 tipos de respaldo (Persona y Organización) para capturar cualquier caso fuera de lo habitual.

Paso 2: Construcción de GraphRAG

Los documentos se dividen en fragmentos de 500 caracteres (con 50 de superposición) y se envían a Zep Cloud por lotes. El flujo es:

  1. Crear un grafo independiente con ID único
  2. Establecer la ontología personalizada
  3. Enviar lotes de texto para extracción de entidades y relaciones
  4. Esperar procesamiento de Zep por episodio
  5. Recuperar el grafo final con nodos y aristas

Paso 3: Configuración del Entorno

El generador de configuración analiza el grafo de conocimiento y produce parámetros detallados para los agentes:

  • Configuración de tiempo (zonas horarias, horas pico/muertas)
  • Eventos (publicaciones iniciales, temas candentes)
  • Actividad del agente (posts/hora, delays, pesos de influencia)
  • Configuración de plataforma (umbral viral para Twitter/Reddit)

Paso 4: Ejecución de la Simulación

Los agentes se activan según su horario y comienzan a publicar, comentar y reaccionar. La simulación corre en paralelo para Twitter y Reddit, registrando cada acción en archivos JSONL en tiempo real.

Paso 5: Generación de Informes

El agente de reportes utiliza tres herramientas de recuperación:

  • InsightForge: búsqueda profunda, descompone preguntas en subconsultas
  • PanoramaSearch: vista completa, incluye hechos históricos caducados/inválidos
  • InterviewAgents: entrevistas en tiempo real a agentes activos vía IPC

Análisis Técnico Profundo: Generación de Ontología

El generador de ontología (backend/app/services/ontology_generator.py) usa un system prompt detallado para definir qué es una entidad válida (personas, organizaciones, medios) y qué no (conceptos abstractos, temas).

Después de la generación, _validate_and_process aplica restricciones para no superar los límites de la API de Zep:

def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Límites de la API de Zep: máximo 10 tipos de entidades, máximo 10 tipos de aristas
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Asegurarse de que existan los tipos de respaldo
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Recortar si añadir los tipos de respaldo excedería el límite
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result
Enter fullscreen mode Exit fullscreen mode

Esto garantiza compatibilidad con la API y mantiene la estructura de dos niveles.

Construcción del Grafo de Conocimiento: Integración de Zep

El constructor de grafos (backend/app/services/graph_builder.py) orquesta el procesamiento asíncrono:

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Crear grafo
    graph_id = self.create_graph(graph_name)

    # 2. Establecer ontología
    self.set_ontology(graph_id, ontology)

    # 3. Dividir texto en fragmentos
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Enviar lotes
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Esperar procesamiento de Zep
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Recuperar grafo final
    graph_info = self._get_graph_info(graph_id)
Enter fullscreen mode Exit fullscreen mode

Generación Dinámica de Modelos Pydantic

Se crean modelos Pydantic dinámicamente según la ontología:

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class
Enter fullscreen mode Exit fullscreen mode

Esto permite a Zep validar los datos de entidades sin modelos predefinidos.

Paginación en Grafos Grandes

Para recuperar todos los nodos de un grafo paginado:

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes
Enter fullscreen mode Exit fullscreen mode

Simulación de Actividad de Agentes Basada en el Tiempo

El generador de configuración (backend/app/services/simulation_config_generator.py) define patrones realistas basados en la zona horaria china:

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],           # Horas de poca actividad (casi nadie)
    "morning_hours": [6, 7, 8],                  # Mañana (gradualmente activo)
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],              # Pico de la tarde
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}
Enter fullscreen mode Exit fullscreen mode

Diferentes tipos de agentes tienen distintos patrones:

Tipo de Agente Nivel de Actividad Horas Activas Retraso de Respuesta Influencia
Universidad 0.2 9-17 60-240 min 3.0
Medio de Comunicación 0.5 7-23 5-30 min 2.5
Estudiante 0.8 8-12, 18-23 1-15 min 0.8
Profesor 0.4 8-21 15-90 min 2.0

La configuración puede personalizarse por LLM y recurre a valores por defecto si es necesario.

Seguimiento de Acciones en Tiempo Real

El ejecutor de simulación (backend/app/services/simulation_runner.py) monitoriza las acciones de los agentes con logs JSONL:

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            # Manejar eventos
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # o reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            # Parsear acciones de agente
            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()
Enter fullscreen mode Exit fullscreen mode

Esto corre en un hilo de fondo y el frontend consulta el estado para mostrar progreso en tiempo real.

Gestión de Procesos Multiplataforma

Para detener simulaciones de forma segura en Windows y Unix:

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        # Windows: usar taskkill para detener el árbol de procesos
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        # Unix: detener grupo de procesos (creado con start_new_session=True)
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Enter fullscreen mode Exit fullscreen mode

El manejador de limpieza registra manejadores de señales para SIGINT, SIGTERM y SIGHUP:

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Luego llamar al manejador original

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)
Enter fullscreen mode Exit fullscreen mode

Esto asegura que las simulaciones se detengan de forma controlada al apagar el servidor.

Generación de Informes: Recuperación de Tres Niveles

El servicio de herramientas de Zep (backend/app/services/zep_tools.py) implementa tres funciones de recuperación:

InsightForge (Análisis Profundo)

Descompone consultas complejas en subconsultas, realiza búsquedas y agrega resultados:

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    # 1. Generar subconsultas usando LLM
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    # 2. Buscar por cada subconsulta
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    # 3. Extraer UUIDs de entidad de los bordes
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    # 4. Obtener información detallada de la entidad
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    # 5. Construir cadenas de relaciones
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)
Enter fullscreen mode Exit fullscreen mode

PanoramaSearch (Alcance Completo)

Recupera todos los hechos, incluyendo históricos y caducados:

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)
Enter fullscreen mode Exit fullscreen mode

InterviewAgents (Tiempo Real)

Permite entrevistas automáticas con agentes activos vía API OASIS:

def interview_agents(self, simulation_id: str, interview_requirement: str):
    # 1. Cargar perfiles de agente desde CSV/JSON
    profiles = self._load_agent_profiles(simulation_id)

    # 2. Usar LLM para seleccionar agentes relevantes
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    # 3. Generar preguntas de entrevista
    questions = self._generate_interview_questions(...)

    # 4. Llamar a la API de entrevista real (doble plataforma)
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,  # Entrevistar a ambos Twitter y Reddit
        timeout=180.0
    )

    # 5. Parsear y formatear resultados
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Enter fullscreen mode Exit fullscreen mode

Decisiones Clave de Ingeniería

1. Gestión de Tareas Asíncronas

Las operaciones largas usan tareas asíncronas y seguimiento de progreso:

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id
Enter fullscreen mode Exit fullscreen mode

El frontend consulta el estado en /api/graph/task/{task_id}.

2. Llamadas a LLM en Lotes con Reintento

La configuración de agentes se divide en lotes de 15 y se reintenta en caso de errores JSON:

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)
Enter fullscreen mode Exit fullscreen mode

Lógica de reparación de JSON truncado:

def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content
Enter fullscreen mode Exit fullscreen mode

3. Simulación Paralela en Plataforma Doble

Twitter y Reddit se simulan en paralelo, con bases de datos y logs separados:

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
Enter fullscreen mode Exit fullscreen mode

El ejecutor detecta finalización por eventos simulation_end.

Consideraciones de Rendimiento

Gestión de Memoria

  • Los documentos se truncan a 50k caracteres en el contexto del LLM.
  • Los resúmenes de entidades se limitan a 300 caracteres.
  • Solo las últimas 50 acciones se mantienen en memoria (el historial completo queda en archivos JSONL).

Aislamiento de la Base de Datos

Cada plataforma utiliza su propia base de datos SQLite para evitar bloqueos en escrituras paralelas.

Degradación Elegante

Si la API de búsqueda de Zep falla, se recurre a búsqueda local por palabras clave:

try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"La API de búsqueda de Zep falló, recurriendo a la búsqueda local: {e}")
    return self._local_search(graph_id, query, limit, scope)
Enter fullscreen mode Exit fullscreen mode

Conclusión

MiroFish es una referencia práctica para construir un sistema de simulación multiagente de extremo a extremo. El flujo de cinco pasos convierte documentos en mundos digitales donde miles de agentes interactúan bajo patrones realistas.

Puntos clave:

  1. El diseño de la ontología es esencial: La estructura de dos niveles (8 específicos + 2 de respaldo) cumple los límites de API y cubre todos los casos.
  2. Los flujos asíncronos facilitan operaciones prolongadas: El seguimiento de tareas y progreso mantiene informados a los usuarios.
  3. La actividad temporal añade realismo: Los patrones horarios y roles específicos producen simulaciones creíbles.
  4. Simulación en plataformas paralelas permite comparación real: Observa cómo Twitter y Reddit generan resultados diferentes.
  5. La recuperación de tres niveles cubre todas las necesidades: InsightForge para profundidad, PanoramaSearch para amplitud, InterviewAgents para insights directos de los agentes.

El código fuente completo está disponible en github.com/666ghj/MiroFish.

¿Quieres probar MiroFish? Visita la demostración en vivo para ver una simulación de evento de punto caliente en acción.

Top comments (0)