DEV Community

Cover image for MiroFish สร้างโลกคู่ขนานดิจิทัล ได้อย่างไร
Thanawat Wongchai
Thanawat Wongchai

Posted on • Originally published at apidog.com

MiroFish สร้างโลกคู่ขนานดิจิทัล ได้อย่างไร

บทนำ

โซเชียลมีเดียเคลื่อนไหวรวดเร็ว โพสต์เดียวสามารถกระตุ้นปฏิกิริยา การปรับเปลี่ยน และการเคลื่อนไหวตอบโต้ที่ไม่มีใครคาดคิดได้ จะเป็นอย่างไรถ้าคุณสามารถเห็นว่าสถานการณ์จะคลี่คลายลงอย่างไรก่อนที่จะเกิดขึ้นจริงในโลก?

ทดลองใช้ Apidog วันนี้

MiroFish ทำเช่นนั้นได้อย่างแม่นยำ มันคือเอ็นจิ้นปัญญาแบบฝูงที่สร้างโลกดิจิทัลคู่ขนานที่ซึ่งเอเจนต์ AI นับพันตัวซึ่งมีบุคลิก ความทรงจำ และรูปแบบพฤติกรรมที่แตกต่างกันมีปฏิสัมพันธ์กันอย่างอิสระ คุณอัปโหลดข้อมูลเริ่มต้น—เช่น ข่าว บทความ ร่างนโยบาย หรือแม้แต่นวนิยาย—แล้ว MiroFish จะสร้างการจำลองสถานการณ์ที่มีความแม่นยำสูงว่าเหตุการณ์ต่างๆ อาจคลี่คลายลงอย่างไร

💡 การสร้าง MiroFish ต้องอาศัยรากฐานการทดสอบ API ที่เชื่อถือได้

ทีมงานใช้ Apidog ในการออกแบบ ดีบั๊ก และจัดทำเอกสาร API แบ็กเอนด์ทั้งหมดก่อนที่จะเขียนตรรกะการจำลอง ซึ่งช่วยให้ตรวจพบปัญหาของเอนด์พอยต์ได้ตั้งแต่เนิ่นๆ และทำให้แบ็กเอนด์ Python และฟรอนต์เอนด์ Vue ทำงานสอดคล้องกันตลอดการพัฒนา

โพสต์นี้จะเจาะลึกสถาปัตยกรรมทางเทคนิคเบื้องหลัง MiroFish คุณจะได้เรียนรู้ว่าระบบเปลี่ยนเอกสารดิบให้เป็นการจำลองที่มีชีวิตได้อย่างไร เอเจนต์ตัดสินใจได้อย่างไร และเวิร์กโฟลว์ห้าขั้นตอนจัดระเบียบทุกอย่างตั้งแต่การสร้างกราฟความรู้ไปจนถึงการตรวจสอบแบบเรียลไทม์ได้อย่างไร

workflow overview

ภาพรวมระบบ: เวิร์กโฟลว์ห้าขั้นตอน

MiroFish ประมวลผลการจำลองผ่านห้าขั้นตอนที่แตกต่างกัน:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Step 1    │ ──► │   Step 2    │ ──► │   Step 3    │ ──► │   Step 4    │ ──► │   Step 5    │
│  Ontology   │     │  GraphRAG   │     │   Env       │     │ Simulation  │     │   Report    │
│  Generation │     │   Build     │     │   Setup     │     │   Run       │     │ Generation  │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
Enter fullscreen mode Exit fullscreen mode

ขั้นตอนที่ 1: การสร้าง Ontology

  • วิเคราะห์เอกสารอินพุตและข้อกำหนดการจำลอง
  • ใช้ LLM เพื่อสร้าง Ontology ที่กำหนดสิ่งต่อไปนี้:
    • 10 ประเภทเอนทิตี (เช่น นักเรียน, ศาสตราจารย์, มหาวิทยาลัย ฯลฯ)
    • 10 ประเภทความสัมพันธ์ (เช่น ทำงานให้, แสดงความคิดเห็นเกี่ยวกับ ฯลฯ)
    • คุณสมบัติ (Attributes) สำหรับแต่ละประเภท (หลีกเลี่ยงคำสงวน เช่น name, uuid, created_at)
  • โครงสร้างสองชั้น: 8 ประเภทเฉพาะ + 2 ประเภทสำรอง (Person, Organization)

ขั้นตอนที่ 2: การสร้าง GraphRAG

  • แบ่งเอกสารเป็นชิ้นย่อย (500 อักขระ, ซ้อนทับ 50)
  • ส่งไปยัง Zep Cloud แบบ batch
  • สร้างกราฟอิสระ, ตั้งค่า ontology, ส่งชุดข้อความเพื่อแยก entity/relationship
  • รอ Zep ประมวลผลแต่ละตอน
  • รับกราฟที่ได้ (nodes + edges)

ขั้นตอนที่ 3: การตั้งค่าสภาพแวดล้อม

  • วิเคราะห์กราฟเพื่อกำหนดพารามิเตอร์เอเจนต์:
    • เวลา (อิงเขตเวลาจีน)
    • เหตุการณ์ (โพสต์เริ่มต้น/หัวข้อ)
    • กิจกรรมของเอเจนต์ (จำนวนโพสต์/ชั่วโมง, ความล่าช้า, น้ำหนักอิทธิพล)
    • แพลตฟอร์ม (Twitter, Reddit)

ขั้นตอนที่ 4: การเรียกใช้การจำลอง

  • เอเจนต์ทำงานตามตารางกิจกรรม, โพสต์ แสดงความคิดเห็น ตอบสนอง
  • การจำลองแบบคู่ขนานทั้ง Twitter และ Reddit
  • บันทึกการกระทำทุกอย่างลง JSONL แบบเรียลไทม์

ขั้นตอนที่ 5: การสร้างรายงาน

  • ใช้ 3 เครื่องมือหลัก:
    • InsightForge: แยกและค้นหาคำถามเชิงลึก
    • PanoramaSearch: ดึงข้อมูลทั้งหมด (รวม expired/invalid)
    • InterviewAgents: สัมภาษณ์เอเจนต์ที่ใช้งานอยู่แบบ IPC

เจาะลึกทางเทคนิค: การสร้าง Ontology

ตัวสร้าง Ontology (backend/app/services/ontology_generator.py) ใช้ prompt ระบบที่บังคับใช้กฎอย่างเข้มงวด เช่น แยกสิ่งที่เป็น entity จริงกับสิ่งที่เป็นนามธรรม เพื่อให้การจำลองเกิดขึ้นได้จริงบนโซเชียลมีเดีย

หลังจาก LLM สร้าง ontology แล้ว ใช้เมธอด _validate_and_process บังคับข้อจำกัด (เช่น จำนวน entity/edge type ไม่เกิน 10):

def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Zep API limits: max 10 entity types, max 10 edge types
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Ensure fallback types exist
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Trim if adding fallbacks would exceed limit
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result
Enter fullscreen mode Exit fullscreen mode

การสร้างกราฟความรู้: การผสานรวม Zep

บริการสร้างกราฟ (backend/app/services/graph_builder.py) ดูแลเวิร์กโฟลว์แบบ async:

def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Create graph
    graph_id = self.create_graph(graph_name)

    # 2. Set ontology
    self.set_ontology(graph_id, ontology)

    # 3. Chunk text
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Send batches
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Wait for Zep processing
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Retrieve final graph
    graph_info = self._get_graph_info(graph_id)
Enter fullscreen mode Exit fullscreen mode

การสร้างโมเดล Pydantic แบบไดนามิก

สร้างโมเดล Pydantic สำหรับแต่ละ entity type ขณะรันไทม์:

def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class
Enter fullscreen mode Exit fullscreen mode

การแบ่งหน้าผ่านกราฟขนาดใหญ่

ดึงข้อมูลทั้งกราฟจาก Zep ด้วยการแบ่งหน้า:

def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes
Enter fullscreen mode Exit fullscreen mode

การจำลองกิจกรรมเอเจนต์ตามเวลา

ตัวสร้างการกำหนดค่าการจำลอง (backend/app/services/simulation_config_generator.py) สร้างกิจกรรมสมจริงตามช่วงเวลาต่างๆ (เขตเวลาจีน):

CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],           # 凌晨几乎无人
    "morning_hours": [6, 7, 8],                  # 早间逐渐活跃
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],              # 晚间高峰
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}
Enter fullscreen mode Exit fullscreen mode

ประเภทเอเจนต์แต่ละแบบมีรูปแบบกิจกรรมต่างกัน:

ประเภทเอเจนต์ ระดับกิจกรรม ชั่วโมงที่ใช้งาน ความล่าช้าในการตอบสนอง อิทธิพล
มหาวิทยาลัย 0.2 9-17 60-240 นาที 3.0
สำนักข่าว 0.5 7-23 5-30 นาที 2.5
นักเรียน 0.8 8-12, 18-23 1-15 นาที 0.8
ศาสตราจารย์ 0.4 8-21 15-90 นาที 2.0

ค่าต่างๆ จะถูกปรับแต่งตามสถานการณ์ด้วย LLM เรียกใช้ค่า default หาก LLM ล้มเหลว


การติดตามการกระทำแบบเรียลไทม์

ตัวเรียกใช้การจำลอง (backend/app/services/simulation_runner.py) สตรีม log JSONL เพื่ออัปเดตสถานะ:

def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            # Handle events
            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True  # or reddit
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            # Parse agent actions
            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()
Enter fullscreen mode Exit fullscreen mode

อัปเดตสถานะจำลองทุก 2 วินาที ฟรอนต์เอนด์ดึงเพื่อแสดงความคืบหน้าแบบ real-time


การจัดการกระบวนการข้ามแพลตฟอร์ม

หยุดการจำลองอย่างปลอดภัย ทั้งบน Windows/Unix:

def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        # Windows: use taskkill to kill process tree
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        # Unix: kill process group (created with start_new_session=True)
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Enter fullscreen mode Exit fullscreen mode

ลงทะเบียน signal handler สำหรับ SIGINT, SIGTERM, SIGHUP:

def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Then call original handler

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)
Enter fullscreen mode Exit fullscreen mode

การสร้างรายงาน: การเรียกค้นแบบสามระดับ

บริการเครื่องมือ Zep (backend/app/services/zep_tools.py) มี 3 ฟังก์ชันหลัก:

InsightForge (การเจาะลึก)

แยก query เป็น sub-query ค้นหาทีละส่วน:

def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    # 1. Generate sub-queries using LLM
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    # 2. Search for each sub-query
    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    # 3. Extract entity UUIDs from edges
    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    # 4. Fetch detailed entity info
    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    # 5. Build relationship chains
    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)
Enter fullscreen mode Exit fullscreen mode

PanoramaSearch (ครอบคลุมทั้งหมด)

ดึงข้อมูลทุกอย่างรวม expired/invalid:

def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)
Enter fullscreen mode Exit fullscreen mode

InterviewAgents (เรียลไทม์)

เรียก API สัมภาษณ์ agent จริง:

def interview_agents(self, simulation_id: str, interview_requirement: str):
    # 1. Load agent profiles from CSV/JSON
    profiles = self._load_agent_profiles(simulation_id)

    # 2. Use LLM to select relevant agents
    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    # 3. Generate interview questions
    questions = self._generate_interview_questions(...)

    # 4. Call real interview API (dual-platform)
    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,  # Interview both Twitter and Reddit
        timeout=180.0
    )

    # 5. Parse and format results
    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Enter fullscreen mode Exit fullscreen mode

การตัดสินใจทางวิศวกรรมที่สำคัญ

1. การจัดการงานแบบ Async

งานที่ใช้เวลานาน (เช่น การสร้างกราฟ, การจำลอง) ใช้ async/thread:

def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id
Enter fullscreen mode Exit fullscreen mode

สอบถามความคืบหน้าผ่าน /api/graph/task/{task_id}

2. การเรียกใช้ LLM แบบแบทช์พร้อมการลองใหม่

แบ่งรายการ agent ขนาดใหญ่เป็น batch ละ 15:

num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)
Enter fullscreen mode Exit fullscreen mode

ซ่อมแซม JSON ที่ถูกตัดขาด:

def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content
Enter fullscreen mode Exit fullscreen mode

3. การจำลองแบบคู่ขนานหลายแพลตฟอร์ม

Twitter/Reddit ทำงานแยกฐานข้อมูลและ log:

uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
Enter fullscreen mode Exit fullscreen mode

ตรวจจับการเสร็จสิ้นด้วย event simulation_end


ข้อควรพิจารณาด้านประสิทธิภาพ

การจัดการหน่วยความจำ

  • จำกัดเอกสาร 50k ตัวอักษรสำหรับ LLM context
  • สรุป entity จำกัด 300 ตัวอักษร/รายการ
  • การกระทำล่าสุดจำกัด 50 (ทั้งหมดล็อกใน JSONL)

การแยกฐานข้อมูล

แต่ละแพลตฟอร์มใช้ SQLite แยกกัน ลดปัญหา lock

การลดทอนประสิทธิภาพอย่างนุ่มนวล

ถ้า Zep Search API ล้มเหลว fallback ไป local search:

try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"Zep Search API failed, falling back to local search: {e}")
    return self._local_search(graph_id, query, limit, scope)
Enter fullscreen mode Exit fullscreen mode

บทสรุป

MiroFish แสดงให้เห็นการสร้างระบบจำลองหลายเอเจนต์ตั้งแต่ต้นจนจบ เวิร์กโฟลว์ห้าขั้นตอนเปลี่ยนเอกสารดิบให้เป็นโลกดิจิทัลที่มีชีวิตซึ่งเอเจนต์นับพันมีปฏิสัมพันธ์ตามรูปแบบพฤติกรรมที่สมจริง

สิ่งที่ควรเน้นสำหรับการใช้งานจริง:

  1. การออกแบบ Ontology มีความสำคัญ: โครงสร้างสองชั้น (8 เฉพาะ + 2 สำรอง) ครอบคลุมครบ ไม่เกินขีดจำกัด API
  2. เวิร์กโฟลว์แบบ Async: ช่วยให้การดำเนินการที่ใช้เวลานานยังตอบสนองผู้ใช้ได้
  3. กิจกรรมตามเวลาสร้างความสมจริง: พฤติกรรมเอเจนต์อิงช่วงเวลาจริง
  4. การจำลองสองแพลตฟอร์ม: เปรียบเทียบผลกระทบของแต่ละแพลตฟอร์มได้ชัดเจน
  5. การเรียกค้นแบบสามระดับ: ตอบโจทย์ทั้งลึก-กว้าง-มุมมองตรงจากเอเจนต์

ดูซอร์สโค้ดเต็มที่ github.com/666ghj/MiroFish

ต้องการลองใช้ MiroFish ไหม? เยี่ยมชม เดโมสด เพื่อดูการจำลองเหตุการณ์ฮอตสปอตในการทำงาน

Top comments (0)