Introduction
Social media moves fast. A single post can trigger cascades of reactions, reshapes, and counter-movements that nobody predicted. What if you could see how a scenario plays out before it happens in the real world?
MiroFish does exactly that. Itβs a swarm intelligence engine that creates digital parallel worlds where thousands of AI agents with distinct personalities, memories, and behavioral patterns interact freely. You upload seed materialβa news article, a policy draft, even a novelβand MiroFish builds a high-fidelity simulation of how events might unfold.
π‘ Pro tip: Building MiroFish required a reliable API testing foundation. The team used Apidog to design, debug, and document all backend APIs before writing simulation logic. This caught endpoint issues early and kept the Python backend and Vue frontend in sync throughout development.
This post breaks down the technical architecture behind MiroFish. You'll learn how to transform raw documents into living simulations, how agents make decisions, and how a five-step workflow orchestrates everything from knowledge graph construction to real-time monitoring.
System Overview: The Five-Step Workflow
MiroFish structures simulations into five practical phases:
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Step 1 β βββΊ β Step 2 β βββΊ β Step 3 β βββΊ β Step 4 β βββΊ β Step 5 β
β Ontology β β GraphRAG β β Env β β Simulation β β Report β
β Generation β β Build β β Setup β β Run β β Generation β
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
Step 1: Ontology Generation
- Input documents and simulation requirements are analyzed.
- An LLM generates a custom ontology, defining:
- 10 entity types (e.g., Student, Professor, University, MediaOutlet, GovernmentAgency)
- 10 relationship types (e.g., WORKS_FOR, COMMENTS_ON, RESPONDS_TO)
-
Attributes for each type (avoiding reserved words like
name,uuid,created_at)
- Ontology enforces a two-tier structure: 8 scenario-specific types and 2 fallback types (
Person,Organization).
Step 2: GraphRAG Construction
- Documents are chunked (500 chars, 50 overlap) and sent in batches to Zep Cloud.
- Process:
- Create a new graph with a unique ID.
- Set the custom ontology.
- Send text batches for entity and relationship extraction.
- Await Zepβs processing.
- Retrieve the final graph (nodes and edges).
Step 3: Environment Setup
- The config generator analyzes the knowledge graph and builds agent parameters:
- Time configuration (e.g., peak hours 19-22, dead hours 0-5 in Chinese timezone)
- Event configuration (initial posts, hot topics)
- Agent activity configs (posts/hour, delays, influence weights)
- Platform configs for Twitter and Reddit (different viral thresholds)
Step 4: Simulation Run
- Agents act according to schedules: posting, commenting, reacting.
- Simulations run in parallel on Twitter and Reddit.
- Every action is logged to JSONL files in real time.
Step 5: Report Generation
- The Report Agent uses three core retrieval tools:
- InsightForge: Breaks down complex questions into sub-queries.
- PanoramaSearch: Retrieves a full-scope view, including historical facts.
- InterviewAgents: Conducts real-time interviews with active agents.
Technical Deep Dive: Ontology Generation
The ontology generator is implemented in backend/app/services/ontology_generator.py using a strict system prompt to control outputs.
- The prompt enforces:
- Only valid entities (people, organizations, media outlets)
- No abstract concepts/themes
- Agents must be able to "act" on social media
After LLM output, the _validate_and_process() method enforces constraints and API limits:
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
This ensures compatibility with Zep API constraints and maintains the two-tier ontology.
Knowledge Graph Construction: Zep Integration
The graph builder (backend/app/services/graph_builder.py) manages async graph construction:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
Dynamic Pydantic Model Generation
The system generates Pydantic models at runtime for each entity type:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
This allows Zep to validate data without pre-defined models.
Paging Through Large Graphs
Paginated node retrieval is handled by a utility:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
Time-Based Agent Activity Simulation
The simulation config generator (backend/app/services/simulation_config_generator.py) creates realistic activity profiles:
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # Dead hours
"morning_hours": [6, 7, 8], # Morning
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # Evening peak
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
Examples by agent type:
| Agent Type | Activity Level | Active Hours | Response Delay | Influence |
|---|---|---|---|---|
| University | 0.2 | 9-17 | 60-240 min | 3.0 |
| MediaOutlet | 0.5 | 7-23 | 5-30 min | 2.5 |
| Student | 0.8 | 8-12, 18-23 | 1-15 min | 0.8 |
| Professor | 0.4 | 8-21 | 15-90 min | 2.0 |
LLM calls customize these values per scenario, falling back to defaults if needed.
Real-Time Action Tracking
The simulation runner (backend/app/services/simulation_runner.py) streams JSONL logs to monitor agent actions:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Handle events
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parse agent actions
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
This runs in a background thread, updating state every 2 seconds. The frontend polls for real-time status.
Cross-Platform Process Management
To handle simulation lifecycle robustly on Windows and Unix, processes are terminated as follows:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: use taskkill to kill process tree
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: kill process group (created with start_new_session=True)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Cleanup handlers are registered for graceful shutdown:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Then call original handler
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
Report Generation: Three-Tier Retrieval
The Zep tools service (backend/app/services/zep_tools.py) exposes three retrieval methods:
InsightForge (Deep Dive)
Breaks complex questions into sub-queries and aggregates results:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Generate sub-queries using LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Search for each sub-query
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extract entity UUIDs from edges
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Fetch detailed entity info
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Build relationship chains
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (Full Scope)
Retrieves all facts, including historical/expired data:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (Real-Time)
Interviews active agents via the OASIS API:
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Load agent profiles from CSV/JSON
profiles = self._load_agent_profiles(simulation_id)
# 2. Use LLM to select relevant agents
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Generate interview questions
questions = self._generate_interview_questions(...)
# 4. Call real interview API (dual-platform)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Interview both Twitter and Reddit
timeout=180.0
)
# 5. Parse and format results
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Key Engineering Decisions
1. Async Task Management
Long-running tasks use async threading with progress tracking:
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
Frontend polls task status via /api/graph/task/{task_id}.
2. Batch LLM Calls with Retry
Agent config generation splits large batches and repairs truncated LLM outputs:
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. Dual-Platform Parallel Simulation
Simulations run in parallel, storing data in platform-specific folders:
uploads/simulations/{simulation_id}/
βββ twitter/
β βββ actions.jsonl
β βββ twitter_simulation.db
βββ reddit/
β βββ actions.jsonl
β βββ reddit_simulation.db
βββ simulation_config.json
βββ run_state.json
βββ simulation.log
Completion is detected per-platform via simulation_end events.
Performance Considerations
Memory Management
- Large documents truncated to 50k chars for LLM context
- Entity summaries capped at 300 chars
- Only 50 recent actions kept in memory (full logs in JSONL)
Database Isolation
- Each platform uses its own SQLite DB to avoid write lock contention
Graceful Degradation
- If Zep Search API fails, the system falls back to local keyword search:
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
Conclusion
MiroFish demonstrates a complete, actionable workflow for building multi-agent social simulations. The five-step architecture transforms your raw documents into realistic, interactive digital societies.
Key takeaways for implementation:
- Ontology design matters: Two-tier structure (8 specific + 2 fallback) ensures coverage and stays within API limits.
- Async workflows: Use threaded tasks and progress APIs for long operations.
- Time-based activity: Layer timezone and agent-type-specific schedules for realism.
- Dual-platform simulation: Parallel Twitter/Reddit runs reveal platform effects.
- Three-tier data retrieval: Use InsightForge for depth, PanoramaSearch for breadth, InterviewAgents for live insights.
Get the source code at github.com/666ghj/MiroFish.
Want to try MiroFish? Visit the live demo to see a hotspot event simulation in action.

Top comments (0)