DEV Community

Cover image for Comment MiroFish Crée des Mondes Parallèles Numériques ?
Antoine Laurent
Antoine Laurent

Posted on • Originally published at apidog.com

Comment MiroFish Crée des Mondes Parallèles Numériques ?

Introduction

Les médias sociaux évoluent rapidement. Un simple message peut déclencher des cascades de réactions, de reformulations et de contre-mouvements que personne n'aurait prédits. Et si vous pouviez voir comment un scénario se déroule avant qu'il ne se produise dans le monde réel ?

Essayez Apidog dès aujourd'hui

MiroFish fait exactement cela. C'est un moteur d'intelligence collective qui crée des mondes parallèles numériques où des milliers d'agents IA dotés de personnalités, de mémoires et de modèles comportementaux distincts interagissent librement. Vous téléchargez du matériel source — un article de presse, un projet de politique, même un roman — et MiroFish construit une simulation haute fidélité de la manière dont les événements pourraient se dérouler.

💡 La construction de MiroFish a nécessité une base fiable pour les tests d'API. L'équipe a utilisé Apidog pour concevoir, déboguer et documenter toutes les API backend avant d'écrire la logique de simulation. Cela a permis de détecter les problèmes de points d'extrémité tôt et de maintenir le backend Python et le frontend Vue synchronisés tout au long du développement.

Ce billet décompose l'architecture technique derrière MiroFish. Vous apprendrez comment le système transforme des documents bruts en simulations vivantes, comment les agents prennent des décisions, et comment le flux de travail en cinq étapes orchestre tout, de la construction du graphe de connaissances à la surveillance en temps réel.

Architecture MiroFish

Vue d'ensemble du système : Le flux de travail en cinq étapes

MiroFish traite les simulations à travers cinq phases distinctes :

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Étape 1   │ ──► │   Étape 2   │ ──► │   Étape 3   │ ──► │   Étape 4   │ ──► │   Étape 5   │
│  Génération │     │  Construction │     │ Configuration │     │ Exécution   │     │ Génération  │
│  d'ontologie│     │   GraphRAG    │     │  Environnement│     │ Simulation  │     │ de rapports │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
Enter fullscreen mode Exit fullscreen mode

Étape 1 : Génération de l'ontologie

Le système analyse vos documents d'entrée et vos exigences de simulation, puis utilise un LLM pour générer une ontologie personnalisée. Cela définit :

  • 10 types d'entités (ex : Étudiant, Professeur, Université, Organe de presse, Agence gouvernementale)
  • 10 types de relations (ex : TRAVAILLE_POUR, COMMENTE_SUR, RÉPOND_À)
  • Attributs pour chaque type (en évitant les mots réservés comme name, uuid, created_at)

La structure impose deux niveaux : 8 types spécifiques basés sur votre contenu, plus 2 types de repli (Personne et Organisation) pour couvrir tous les cas.

Étape 2 : Construction du GraphRAG

Les documents sont découpés en morceaux (500 caractères, 50 de chevauchement) et envoyés à Zep Cloud par lots. Le pipeline :

  1. Crée un graphe autonome avec un ID unique
  2. Définit l'ontologie personnalisée
  3. Envoie des lots de texte pour extraction d'entités et de relations
  4. Attend le traitement par Zep pour chaque lot
  5. Récupère le graphe final avec nœuds et arêtes

Étape 3 : Configuration de l'environnement

Le générateur de configuration de simulation analyse le graphe de connaissances et crée des paramètres d'agent détaillés :

  • Configuration temporelle (modèles fuseau horaire chinois : pics 19–22h, creux 0–5h)
  • Configuration des événements (publications initiales, sujets d'actualité)
  • Configurations d'activité des agents (posts/heure, délais de réponse, poids d'influence)
  • Configurations de plateforme (Twitter, Reddit, seuils viraux différents)

Étape 4 : Exécution de la simulation

Les agents se réveillent selon leurs horaires d'activité et commencent à publier, commenter et réagir. Simulations parallèles sur Twitter et Reddit, chaque action étant enregistrée dans des fichiers JSONL en temps réel.

Étape 5 : Génération de rapports

L'Agent de rapport utilise trois outils de récupération principaux pour analyser les sorties :

  • InsightForge : Recherche approfondie avec sous-requêtes
  • PanoramaSearch : Vue d’ensemble incluant faits historiques expirés/invalides
  • InterviewAgents : Entretiens temps réel avec agents actifs via IPC

Approfondissement technique : Génération de l'ontologie

Le générateur d'ontologie est situé dans backend/app/services/ontology_generator.py. Il utilise une invite LLM détaillée et applique des règles strictes :

  • Entités valides : personnes, organisations, médias
  • Non valides : concepts abstraits, thèmes, points de vue

Après génération, la méthode _validate_and_process applique les contraintes pour rester dans les limites API et garantir la présence des types de repli :

def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Zep API limits: max 10 entity types, max 10 edge types
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Ensure fallback types exist
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Trim if adding fallbacks would exceed limit
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result
Enter fullscreen mode Exit fullscreen mode

Construction du graphe de connaissances : Intégration Zep

Le service de construction (backend/app/services/graph_builder.py) orchestre le flux asynchrone :

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Create graph
    graph_id = self.create_graph(graph_name)

    # 2. Set ontology
    self.set_ontology(graph_id, ontology)

    # 3. Chunk text
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Send batches
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Wait for Zep processing
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Retrieve final graph
    graph_info = self._get_graph_info(graph_id)
Enter fullscreen mode Exit fullscreen mode

Génération dynamique de modèles Pydantic

Pour chaque type d'entité, le système génère un modèle Pydantic à l’exécution, garantissant la validation côté Zep :

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class
Enter fullscreen mode Exit fullscreen mode

Pagination des grands graphes

Pour récupérer tous les nœuds paginés :

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes
Enter fullscreen mode Exit fullscreen mode

Simulation d'activité d'agent basée sur le temps

Le générateur de configuration (backend/app/services/simulation_config_generator.py) crée des modèles d’activité réalistes :

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],           # 凌晨几乎无人
    "morning_hours": [6, 7, 8],                  # 早间逐渐活跃
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],              # 晚间高峰
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}
Enter fullscreen mode Exit fullscreen mode

Différents types d'agents :

Type d'Agent Niveau d'activité Heures d'activité Délai de réponse Influence
Université 0.2 9-17 60-240 min 3.0
Organe de presse 0.5 7-23 5-30 min 2.5
Étudiant 0.8 8-12, 18-23 1-15 min 0.8
Professeur 0.4 8-21 15-90 min 2.0

Le générateur adapte ces paramètres via LLM ou valeurs par défaut.

Suivi des actions en temps réel

L'exécuteur de simulation (backend/app/services/simulation_runner.py) lit et diffuse les actions :

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            # Handle events
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # or reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            # Parse agent actions
            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()
Enter fullscreen mode Exit fullscreen mode

Exécution en thread arrière-plan, mise à jour toutes les 2 secondes, frontend en polling.

Gestion des processus multiplateforme

Pour arrêter proprement les simulations :

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        # Windows: use taskkill to kill process tree
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        # Unix: kill process group (created with start_new_session=True)
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Enter fullscreen mode Exit fullscreen mode

Enregistrement des gestionnaires de signaux pour nettoyage :

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Then call original handler

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)
Enter fullscreen mode Exit fullscreen mode

Génération de rapports : Récupération à trois niveaux

Le service Zep Tools (backend/app/services/zep_tools.py) :

InsightForge (Analyse approfondie)

Décomposition des requêtes complexes et agrégation :

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    # 1. Generate sub-queries using LLM
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    # 2. Search for each sub-query
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    # 3. Extract entity UUIDs from edges
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    # 4. Fetch detailed entity info
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    # 5. Build relationship chains
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)
Enter fullscreen mode Exit fullscreen mode

PanoramaSearch (Portée complète)

Inclut les faits historiques :

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)
Enter fullscreen mode Exit fullscreen mode

InterviewAgents (Temps réel)

Dialogue direct avec les agents actifs :

def interview_agents(self, simulation_id: str, interview_requirement: str):
    # 1. Load agent profiles from CSV/JSON
    profiles = self._load_agent_profiles(simulation_id)

    # 2. Use LLM to select relevant agents
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    # 3. Generate interview questions
    questions = self._generate_interview_questions(...)

    # 4. Call real interview API (dual-platform)
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,  # Interview both Twitter and Reddit
        timeout=180.0
    )

    # 5. Parse and format results
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Enter fullscreen mode Exit fullscreen mode

Décisions d'ingénierie clés

1. Gestion des tâches asynchrones

Les traitements longs utilisent des threads de tâches asynchrones, identifiables via un ID :

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id
Enter fullscreen mode Exit fullscreen mode

Le frontend suit l'avancement via /api/graph/task/{task_id}.

2. Appels LLM par lots avec réessai

Pour la génération de configuration, division en lots de 15 :

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)
Enter fullscreen mode Exit fullscreen mode

Correction automatique du JSON tronqué :

def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content
Enter fullscreen mode Exit fullscreen mode

3. Simulation parallèle multiplateforme

Chaque plateforme a sa base et son log :

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
Enter fullscreen mode Exit fullscreen mode

L'exécuteur détecte la fin d'exécution par événement simulation_end.

Considérations de performance

Gestion de la mémoire

  • Documents : tronqués à 50 000 caractères pour contexte LLM
  • Résumés d’entités : max 300 caractères chacun
  • Actions récentes : 50 en mémoire (historique complet dans JSONL)

Isolation de la base de données

Chaque plateforme utilise sa propre base SQLite pour éviter les conflits lors d'écritures parallèles.

Dégradation gracieuse

Si l’API Zep échoue, fallback sur la recherche locale :

try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"Zep Search API failed, falling back to local search: {e}")
    return self._local_search(graph_id, query, limit, scope)
Enter fullscreen mode Exit fullscreen mode

Conclusion

MiroFish montre comment construire un simulateur multi-agents complet. Le workflow en 5 étapes transforme des documents bruts en mondes numériques dynamiques où des milliers d’agents interagissent selon des modèles réalistes.

À retenir :

  1. Conception de l’ontologie cruciale : structure à 2 niveaux garantit la couverture sans dépasser les limites API.
  2. Travail asynchrone : suivi d’avancement sur les traitements longs.
  3. Activité temporelle réaliste : modèles horaires adaptés au contexte.
  4. Simulation multiplateforme : comparaison directe des dynamiques Twitter/Reddit.
  5. Récupération à trois niveaux : InsightForge (profondeur), PanoramaSearch (étendue), InterviewAgents (vue directe).

Le code source complet est disponible sur github.com/666ghj/MiroFish.

Vous voulez essayer MiroFish ? Visitez la démo en direct pour voir une simulation d'événement hotspot en action.

Top comments (0)