はじめに
ソーシャルメディアは変化が速いです。1つの投稿が、誰も予測しなかった反応、再形成、対抗運動の連鎖を引き起こすことがあります。現実世界でシナリオが起こる前に、それがどのように展開するかを見ることができたらどうでしょうか?
MiroFishはそのための群知能エンジンです。数千のAIエージェントが異なる個性・記憶・行動パターンを持ち、自由に相互作用するデジタル並行世界を構築します。ニュース記事や政策草案、小説などのシードマテリアルをアップロードすると、MiroFishはイベントがどのように展開するかを高精度でシミュレーションします。
💡 MiroFishのAPI基盤にはApidogを活用
信頼性の高いAPIテストのため、開発チームはApidogを用いてバックエンドAPIを設計・デバッグ・ドキュメント化。これによりPythonバックエンドとVueフロントエンドの同期や、エンドポイントの早期不具合発見を実現しています。
この記事ではMiroFishの技術アーキテクチャを、実装者がすぐに応用できる形で解説します。生データがどのようにシミュレーションに変換され、エージェントが意思決定し、5段階のワークフローでシステム全体が連携する仕組みを掘り下げます。
システム概要: 5段階のワークフロー
MiroFishは、下記5フェーズでシミュレーションを構築します。
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │ ──► │ Step 2 │ ──► │ Step 3 │ ──► │ Step 4 │ ──► │ Step 5 │
│ Ontology │ │ GraphRAG │ │ Env │ │ Simulation │ │ Report │
│ Generation │ │ Build │ │ Setup │ │ Run │ │ Generation │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
ステップ1: オントロジー生成
- 入力ドキュメントと要件をLLMで解析し、10種類のエンティティタイプ(例: 学生、教授、大学等)、10種類の関係タイプ、および属性セットを自動生成。
- 2層構造(8つの特定タイプ+2つのフォールバック:
Person,Organization)でAPI制約を満たします。
ステップ2: GraphRAG構築
- ドキュメントを500文字/50文字重複でチャンク化、Zep Cloudにバッチ送信。
- 各バッチからエンティティ・関係を抽出し、グラフを生成。
ステップ3: 環境設定
- 知識グラフからエージェントパラメータを自動生成。
- 中国タイムゾーンのピーク・閑散時間
- 初期イベント設定/ホットトピック
- エージェント活動(投稿頻度・応答遅延・影響度)
- プラットフォーム別バイラル閾値
ステップ4: シミュレーション実行
- エージェントがスケジュールに従い投稿・コメント・リアクションを実施。
- Twitter/Reddit両プラットフォームで並列シミュレーション。
- 全行動をリアルタイムでJSONLログ保存。
ステップ5: レポート生成
- 3種の検索ツールで事象を分析。
- InsightForge: サブクエリ分解による詳細検索
- PanoramaSearch: 履歴的事実も含めた全範囲のビュー
- InterviewAgents: IPC越しにエージェントへリアルタイムインタビュー
技術詳細: オントロジー生成
backend/app/services/ontology_generator.py
LLMシステムプロンプトで有効なエンティティ/関係のルールを厳格に定義し、以下のバリデーションでAPI制約を必ず満たします。
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
この検証により2層構造とAPI制限を常に維持。
知識グラフ構築: Zep統合
backend/app/services/graph_builder.py
非同期で以下の処理を実装。
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
graph_id = self.create_graph(graph_name)
self.set_ontology(graph_id, ontology)
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
self._wait_for_episodes(episode_uuids, progress_callback)
graph_info = self._get_graph_info(graph_id)
動的なPydanticモデル生成
エンティティ種別ごとに実行時にPydanticモデルを動的生成。
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
大規模グラフのページング
Zepからページングデータを全件取得:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
時間ベースのエージェント活動シミュレーション
backend/app/services/simulation_config_generator.py
中国の時間帯行動に応じた現実的なパターンを自動生成。
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5],
"morning_hours": [6, 7, 8],
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22],
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
エージェントタイプごとに下記のようなパラメータを割り当て:
| エージェントタイプ | 活動レベル | 活動時間 | 応答遅延 | 影響力 |
|---|---|---|---|---|
| 大学 | 0.2 | 9-17 | 60-240 分 | 3.0 |
| メディア | 0.5 | 7-23 | 5-30 分 | 2.5 |
| 学生 | 0.8 | 8-12, 18-23 | 1-15 分 | 0.8 |
| 教授 | 0.4 | 8-21 | 15-90 分 | 2.0 |
LLM失敗時はデフォルト値でフォールバック。
リアルタイム行動追跡
backend/app/services/simulation_runner.py
JSONLログをバックグラウンドでストリーミング監視。
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
この状態を2秒ごとに更新し、フロントエンドのリアルタイム表示に対応。
クロスプラットフォームのプロセス管理
Windows/Unix両対応の安全なシミュレーション停止ロジック。
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
クリーンアップハンドラーも信号(SIGINT/SIGTERM/SIGHUP)に対応:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
レポート生成: 3層検索
backend/app/services/zep_tools.py
各APIで異なる分析を実装。
InsightForge (詳細分析)
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
sub_queries = self._generate_sub_queries(query, simulation_requirement)
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (全範囲)
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (リアルタイム)
def interview_agents(self, simulation_id: str, interview_requirement: str):
profiles = self._load_agent_profiles(simulation_id)
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
questions = self._generate_interview_questions(...)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None,
timeout=180.0
)
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
主要な設計上の決定事項
1. 非同期タスク管理
グラフ構築・シミュレーション実行は以下のように非同期スレッドで進捗追跡。
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
状態は/api/graph/task/{task_id}でポーリング。
2. リトライ付きバッチLLM呼び出し
大量エージェントは15件ごとにバッチ化し、切り詰めJSONの自動修復も実装。
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. デュアルプラットフォーム並列シミュレーション
Twitter/Redditは分離DB・アクションログで並列実行。
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
simulation_endイベントで個別完了を判定。
パフォーマンスに関する考慮事項
メモリ管理
- LLM入力は最大5万文字にトリム
- エンティティ要約は300文字制限
- 最近のアクションは50件のみメモリ保持(全履歴はJSONL)
データベースの分離
- 各プラットフォームで独立したSQLite DBを利用し、書き込みロック競合を回避
段階的な劣化 (Graceful Degradation)
- Zep API失敗時はローカルキーワード検索へ自動フォールバック
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search APIが失敗しました。ローカル検索にフォールバックします: {e}")
return self._local_search(graph_id, query, limit, scope)
結論
MiroFishはゼロから完全なマルチエージェント・ソーシャルシミュレーションを構築するための具体的なワークフローを提供します。
この設計・実装を応用すれば、現実の複雑なネットワーク活動を高精度で再現可能です。
技術的ポイントまとめ:
- オントロジー設計を2層構造でAPI制約内に最適化
- 非同期ワークフロー+進捗追跡で長時間処理に対応
- タイムゾーン・エージェント種別ごとの活動パターンでリアリティ
- Twitter/Reddit並列シミュレーションでプラットフォーム差を可視化
- 3層検索で詳細・全体・リアルタイム分析を柔軟に展開
ソースコードはgithub.com/666ghj/MiroFishで公開中です。
ライブデモはこちら

Top comments (0)