Introduction
Les médias sociaux évoluent rapidement. Un simple message peut déclencher des cascades de réactions, de reformulations et de contre-mouvements que personne n'aurait prédits. Et si vous pouviez voir comment un scénario se déroule avant qu'il ne se produise dans le monde réel ?
Essayez Apidog dès aujourd'hui
MiroFish fait exactement cela. C'est un moteur d'intelligence collective qui crée des mondes parallèles numériques où des milliers d'agents IA dotés de personnalités, de mémoires et de modèles comportementaux distincts interagissent librement. Vous téléchargez du matériel source — un article de presse, un projet de politique, même un roman — et MiroFish construit une simulation haute fidélité de la manière dont les événements pourraient se dérouler.
💡 La construction de MiroFish a nécessité une base fiable pour les tests d'API. L'équipe a utilisé Apidog pour concevoir, déboguer et documenter toutes les API backend avant d'écrire la logique de simulation. Cela a permis de détecter les problèmes de points d'extrémité tôt et de maintenir le backend Python et le frontend Vue synchronisés tout au long du développement.
Ce billet décompose l'architecture technique derrière MiroFish. Vous apprendrez comment le système transforme des documents bruts en simulations vivantes, comment les agents prennent des décisions, et comment le flux de travail en cinq étapes orchestre tout, de la construction du graphe de connaissances à la surveillance en temps réel.
Vue d'ensemble du système : Le flux de travail en cinq étapes
MiroFish traite les simulations à travers cinq phases distinctes :
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Étape 1 │ ──► │ Étape 2 │ ──► │ Étape 3 │ ──► │ Étape 4 │ ──► │ Étape 5 │
│ Génération │ │ Construction │ │ Configuration │ │ Exécution │ │ Génération │
│ d'ontologie│ │ GraphRAG │ │ Environnement│ │ Simulation │ │ de rapports │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
Étape 1 : Génération de l'ontologie
Le système analyse vos documents d'entrée et vos exigences de simulation, puis utilise un LLM pour générer une ontologie personnalisée. Cela définit :
- 10 types d'entités (ex : Étudiant, Professeur, Université, Organe de presse, Agence gouvernementale)
- 10 types de relations (ex : TRAVAILLE_POUR, COMMENTE_SUR, RÉPOND_À)
-
Attributs pour chaque type (en évitant les mots réservés comme
name,uuid,created_at)
La structure impose deux niveaux : 8 types spécifiques basés sur votre contenu, plus 2 types de repli (Personne et Organisation) pour couvrir tous les cas.
Étape 2 : Construction du GraphRAG
Les documents sont découpés en morceaux (500 caractères, 50 de chevauchement) et envoyés à Zep Cloud par lots. Le pipeline :
- Crée un graphe autonome avec un ID unique
- Définit l'ontologie personnalisée
- Envoie des lots de texte pour extraction d'entités et de relations
- Attend le traitement par Zep pour chaque lot
- Récupère le graphe final avec nœuds et arêtes
Étape 3 : Configuration de l'environnement
Le générateur de configuration de simulation analyse le graphe de connaissances et crée des paramètres d'agent détaillés :
- Configuration temporelle (modèles fuseau horaire chinois : pics 19–22h, creux 0–5h)
- Configuration des événements (publications initiales, sujets d'actualité)
- Configurations d'activité des agents (posts/heure, délais de réponse, poids d'influence)
- Configurations de plateforme (Twitter, Reddit, seuils viraux différents)
Étape 4 : Exécution de la simulation
Les agents se réveillent selon leurs horaires d'activité et commencent à publier, commenter et réagir. Simulations parallèles sur Twitter et Reddit, chaque action étant enregistrée dans des fichiers JSONL en temps réel.
Étape 5 : Génération de rapports
L'Agent de rapport utilise trois outils de récupération principaux pour analyser les sorties :
- InsightForge : Recherche approfondie avec sous-requêtes
- PanoramaSearch : Vue d’ensemble incluant faits historiques expirés/invalides
- InterviewAgents : Entretiens temps réel avec agents actifs via IPC
Approfondissement technique : Génération de l'ontologie
Le générateur d'ontologie est situé dans backend/app/services/ontology_generator.py. Il utilise une invite LLM détaillée et applique des règles strictes :
- Entités valides : personnes, organisations, médias
- Non valides : concepts abstraits, thèmes, points de vue
Après génération, la méthode _validate_and_process applique les contraintes pour rester dans les limites API et garantir la présence des types de repli :
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
Construction du graphe de connaissances : Intégration Zep
Le service de construction (backend/app/services/graph_builder.py) orchestre le flux asynchrone :
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
Génération dynamique de modèles Pydantic
Pour chaque type d'entité, le système génère un modèle Pydantic à l’exécution, garantissant la validation côté Zep :
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
Pagination des grands graphes
Pour récupérer tous les nœuds paginés :
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
Simulation d'activité d'agent basée sur le temps
Le générateur de configuration (backend/app/services/simulation_config_generator.py) crée des modèles d’activité réalistes :
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # 凌晨几乎无人
"morning_hours": [6, 7, 8], # 早间逐渐活跃
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # 晚间高峰
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
Différents types d'agents :
| Type d'Agent | Niveau d'activité | Heures d'activité | Délai de réponse | Influence |
|---|---|---|---|---|
| Université | 0.2 | 9-17 | 60-240 min | 3.0 |
| Organe de presse | 0.5 | 7-23 | 5-30 min | 2.5 |
| Étudiant | 0.8 | 8-12, 18-23 | 1-15 min | 0.8 |
| Professeur | 0.4 | 8-21 | 15-90 min | 2.0 |
Le générateur adapte ces paramètres via LLM ou valeurs par défaut.
Suivi des actions en temps réel
L'exécuteur de simulation (backend/app/services/simulation_runner.py) lit et diffuse les actions :
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Handle events
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parse agent actions
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
Exécution en thread arrière-plan, mise à jour toutes les 2 secondes, frontend en polling.
Gestion des processus multiplateforme
Pour arrêter proprement les simulations :
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: use taskkill to kill process tree
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: kill process group (created with start_new_session=True)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Enregistrement des gestionnaires de signaux pour nettoyage :
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Then call original handler
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
Génération de rapports : Récupération à trois niveaux
Le service Zep Tools (backend/app/services/zep_tools.py) :
InsightForge (Analyse approfondie)
Décomposition des requêtes complexes et agrégation :
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Generate sub-queries using LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Search for each sub-query
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extract entity UUIDs from edges
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Fetch detailed entity info
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Build relationship chains
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (Portée complète)
Inclut les faits historiques :
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (Temps réel)
Dialogue direct avec les agents actifs :
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Load agent profiles from CSV/JSON
profiles = self._load_agent_profiles(simulation_id)
# 2. Use LLM to select relevant agents
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Generate interview questions
questions = self._generate_interview_questions(...)
# 4. Call real interview API (dual-platform)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Interview both Twitter and Reddit
timeout=180.0
)
# 5. Parse and format results
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Décisions d'ingénierie clés
1. Gestion des tâches asynchrones
Les traitements longs utilisent des threads de tâches asynchrones, identifiables via un ID :
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
Le frontend suit l'avancement via /api/graph/task/{task_id}.
2. Appels LLM par lots avec réessai
Pour la génération de configuration, division en lots de 15 :
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
Correction automatique du JSON tronqué :
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. Simulation parallèle multiplateforme
Chaque plateforme a sa base et son log :
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
L'exécuteur détecte la fin d'exécution par événement simulation_end.
Considérations de performance
Gestion de la mémoire
- Documents : tronqués à 50 000 caractères pour contexte LLM
- Résumés d’entités : max 300 caractères chacun
- Actions récentes : 50 en mémoire (historique complet dans JSONL)
Isolation de la base de données
Chaque plateforme utilise sa propre base SQLite pour éviter les conflits lors d'écritures parallèles.
Dégradation gracieuse
Si l’API Zep échoue, fallback sur la recherche locale :
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
Conclusion
MiroFish montre comment construire un simulateur multi-agents complet. Le workflow en 5 étapes transforme des documents bruts en mondes numériques dynamiques où des milliers d’agents interagissent selon des modèles réalistes.
À retenir :
- Conception de l’ontologie cruciale : structure à 2 niveaux garantit la couverture sans dépasser les limites API.
- Travail asynchrone : suivi d’avancement sur les traitements longs.
- Activité temporelle réaliste : modèles horaires adaptés au contexte.
- Simulation multiplateforme : comparaison directe des dynamiques Twitter/Reddit.
- Récupération à trois niveaux : InsightForge (profondeur), PanoramaSearch (étendue), InterviewAgents (vue directe).
Le code source complet est disponible sur github.com/666ghj/MiroFish.
Vous voulez essayer MiroFish ? Visitez la démo en direct pour voir une simulation d'événement hotspot en action.

Top comments (0)