DEV Community

Cover image for MiroFish デジタルパラレルワールドの作り方
Akira
Akira

Posted on • Originally published at apidog.com

MiroFish デジタルパラレルワールドの作り方

はじめに

ソーシャルメディアは変化が速いです。1つの投稿が、誰も予測しなかった反応、再形成、対抗運動の連鎖を引き起こすことがあります。現実世界でシナリオが起こる前に、それがどのように展開するかを見ることができたらどうでしょうか?

Apidogを今すぐ試してみよう

MiroFishはそのための群知能エンジンです。数千のAIエージェントが異なる個性・記憶・行動パターンを持ち、自由に相互作用するデジタル並行世界を構築します。ニュース記事や政策草案、小説などのシードマテリアルをアップロードすると、MiroFishはイベントがどのように展開するかを高精度でシミュレーションします。

💡 MiroFishのAPI基盤にはApidogを活用

信頼性の高いAPIテストのため、開発チームはApidogを用いてバックエンドAPIを設計・デバッグ・ドキュメント化。これによりPythonバックエンドとVueフロントエンドの同期や、エンドポイントの早期不具合発見を実現しています。

この記事ではMiroFishの技術アーキテクチャを、実装者がすぐに応用できる形で解説します。生データがどのようにシミュレーションに変換され、エージェントが意思決定し、5段階のワークフローでシステム全体が連携する仕組みを掘り下げます。

MiroFishワークフロー

システム概要: 5段階のワークフロー

MiroFishは、下記5フェーズでシミュレーションを構築します。

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Step 1    │ ──► │   Step 2    │ ──► │   Step 3    │ ──► │   Step 4    │ ──► │   Step 5    │
│  Ontology   │     │  GraphRAG   │     │   Env       │     │ Simulation  │     │   Report    │
│  Generation │     │   Build     │     │   Setup     │     │   Run       │     │ Generation  │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
Enter fullscreen mode Exit fullscreen mode

ステップ1: オントロジー生成

  • 入力ドキュメントと要件をLLMで解析し、10種類のエンティティタイプ(例: 学生、教授、大学等)、10種類の関係タイプ、および属性セットを自動生成。
  • 2層構造(8つの特定タイプ+2つのフォールバック: Person, Organization)でAPI制約を満たします。

ステップ2: GraphRAG構築

  • ドキュメントを500文字/50文字重複でチャンク化、Zep Cloudにバッチ送信。
  • 各バッチからエンティティ・関係を抽出し、グラフを生成。

ステップ3: 環境設定

  • 知識グラフからエージェントパラメータを自動生成。
    • 中国タイムゾーンのピーク・閑散時間
    • 初期イベント設定/ホットトピック
    • エージェント活動(投稿頻度・応答遅延・影響度)
    • プラットフォーム別バイラル閾値

ステップ4: シミュレーション実行

  • エージェントがスケジュールに従い投稿・コメント・リアクションを実施。
  • Twitter/Reddit両プラットフォームで並列シミュレーション。
  • 全行動をリアルタイムでJSONLログ保存。

ステップ5: レポート生成

  • 3種の検索ツールで事象を分析。
    • InsightForge: サブクエリ分解による詳細検索
    • PanoramaSearch: 履歴的事実も含めた全範囲のビュー
    • InterviewAgents: IPC越しにエージェントへリアルタイムインタビュー

技術詳細: オントロジー生成

backend/app/services/ontology_generator.py

LLMシステムプロンプトで有効なエンティティ/関係のルールを厳格に定義し、以下のバリデーションでAPI制約を必ず満たします。

def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result
Enter fullscreen mode Exit fullscreen mode

この検証により2層構造とAPI制限を常に維持。


知識グラフ構築: Zep統合

backend/app/services/graph_builder.py

非同期で以下の処理を実装。

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    graph_id = self.create_graph(graph_name)
    self.set_ontology(graph_id, ontology)
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
    self._wait_for_episodes(episode_uuids, progress_callback)
    graph_info = self._get_graph_info(graph_id)
Enter fullscreen mode Exit fullscreen mode

動的なPydanticモデル生成

エンティティ種別ごとに実行時にPydanticモデルを動的生成。

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class
Enter fullscreen mode Exit fullscreen mode

大規模グラフのページング

Zepからページングデータを全件取得:

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes
Enter fullscreen mode Exit fullscreen mode

時間ベースのエージェント活動シミュレーション

backend/app/services/simulation_config_generator.py

中国の時間帯行動に応じた現実的なパターンを自動生成。

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],
    "morning_hours": [6, 7, 8],
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}
Enter fullscreen mode Exit fullscreen mode

エージェントタイプごとに下記のようなパラメータを割り当て:

エージェントタイプ 活動レベル 活動時間 応答遅延 影響力
大学 0.2 9-17 60-240 分 3.0
メディア 0.5 7-23 5-30 分 2.5
学生 0.8 8-12, 18-23 1-15 分 0.8
教授 0.4 8-21 15-90 分 2.0

LLM失敗時はデフォルト値でフォールバック。


リアルタイム行動追跡

backend/app/services/simulation_runner.py

JSONLログをバックグラウンドでストリーミング監視。

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # or reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)
        return f.tell()
Enter fullscreen mode Exit fullscreen mode

この状態を2秒ごとに更新し、フロントエンドのリアルタイム表示に対応。


クロスプラットフォームのプロセス管理

Windows/Unix両対応の安全なシミュレーション停止ロジック。

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Enter fullscreen mode Exit fullscreen mode

クリーンアップハンドラーも信号(SIGINT/SIGTERM/SIGHUP)に対応:

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)
    atexit.register(cls.cleanup_all_simulations)
Enter fullscreen mode Exit fullscreen mode

レポート生成: 3層検索

backend/app/services/zep_tools.py

各APIで異なる分析を実装。

InsightForge (詳細分析)

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    sub_queries = self._generate_sub_queries(query, simulation_requirement)
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)
Enter fullscreen mode Exit fullscreen mode

PanoramaSearch (全範囲)

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)
    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)
Enter fullscreen mode Exit fullscreen mode

InterviewAgents (リアルタイム)

def interview_agents(self, simulation_id: str, interview_requirement: str):
    profiles = self._load_agent_profiles(simulation_id)
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
    questions = self._generate_interview_questions(...)
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,
        timeout=180.0
    )
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Enter fullscreen mode Exit fullscreen mode

主要な設計上の決定事項

1. 非同期タスク管理

グラフ構築・シミュレーション実行は以下のように非同期スレッドで進捗追跡。

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()
    return task_id
Enter fullscreen mode Exit fullscreen mode

状態は/api/graph/task/{task_id}でポーリング。

2. リトライ付きバッチLLM呼び出し

大量エージェントは15件ごとにバッチ化し、切り詰めJSONの自動修復も実装。

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)
Enter fullscreen mode Exit fullscreen mode
def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')
    if content and content[-1] not in '",}]':
        content += '"'
    content += ']' * open_brackets
    content += '}' * open_braces
    return content
Enter fullscreen mode Exit fullscreen mode

3. デュアルプラットフォーム並列シミュレーション

Twitter/Redditは分離DB・アクションログで並列実行。

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
Enter fullscreen mode Exit fullscreen mode

simulation_endイベントで個別完了を判定。


パフォーマンスに関する考慮事項

メモリ管理

  • LLM入力は最大5万文字にトリム
  • エンティティ要約は300文字制限
  • 最近のアクションは50件のみメモリ保持(全履歴はJSONL)

データベースの分離

  • 各プラットフォームで独立したSQLite DBを利用し、書き込みロック競合を回避

段階的な劣化 (Graceful Degradation)

  • Zep API失敗時はローカルキーワード検索へ自動フォールバック
try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"Zep Search APIが失敗しました。ローカル検索にフォールバックします: {e}")
    return self._local_search(graph_id, query, limit, scope)
Enter fullscreen mode Exit fullscreen mode

結論

MiroFishはゼロから完全なマルチエージェント・ソーシャルシミュレーションを構築するための具体的なワークフローを提供します。

この設計・実装を応用すれば、現実の複雑なネットワーク活動を高精度で再現可能です。

技術的ポイントまとめ:

  1. オントロジー設計を2層構造でAPI制約内に最適化
  2. 非同期ワークフロー+進捗追跡で長時間処理に対応
  3. タイムゾーン・エージェント種別ごとの活動パターンでリアリティ
  4. Twitter/Reddit並列シミュレーションでプラットフォーム差を可視化
  5. 3層検索で詳細・全体・リアルタイム分析を柔軟に展開

ソースコードはgithub.com/666ghj/MiroFishで公開中です。

ライブデモはこちら

Top comments (0)