บทนำ
โซเชียลมีเดียเคลื่อนไหวรวดเร็ว โพสต์เดียวสามารถกระตุ้นปฏิกิริยา การปรับเปลี่ยน และการเคลื่อนไหวตอบโต้ที่ไม่มีใครคาดคิดได้ จะเป็นอย่างไรถ้าคุณสามารถเห็นว่าสถานการณ์จะคลี่คลายลงอย่างไรก่อนที่จะเกิดขึ้นจริงในโลก?
MiroFish ทำเช่นนั้นได้อย่างแม่นยำ มันคือเอ็นจิ้นปัญญาแบบฝูงที่สร้างโลกดิจิทัลคู่ขนานที่ซึ่งเอเจนต์ AI นับพันตัวซึ่งมีบุคลิก ความทรงจำ และรูปแบบพฤติกรรมที่แตกต่างกันมีปฏิสัมพันธ์กันอย่างอิสระ คุณอัปโหลดข้อมูลเริ่มต้น—เช่น ข่าว บทความ ร่างนโยบาย หรือแม้แต่นวนิยาย—แล้ว MiroFish จะสร้างการจำลองสถานการณ์ที่มีความแม่นยำสูงว่าเหตุการณ์ต่างๆ อาจคลี่คลายลงอย่างไร
💡 การสร้าง MiroFish ต้องอาศัยรากฐานการทดสอบ API ที่เชื่อถือได้
ทีมงานใช้ Apidog ในการออกแบบ ดีบั๊ก และจัดทำเอกสาร API แบ็กเอนด์ทั้งหมดก่อนที่จะเขียนตรรกะการจำลอง ซึ่งช่วยให้ตรวจพบปัญหาของเอนด์พอยต์ได้ตั้งแต่เนิ่นๆ และทำให้แบ็กเอนด์ Python และฟรอนต์เอนด์ Vue ทำงานสอดคล้องกันตลอดการพัฒนา
โพสต์นี้จะเจาะลึกสถาปัตยกรรมทางเทคนิคเบื้องหลัง MiroFish คุณจะได้เรียนรู้ว่าระบบเปลี่ยนเอกสารดิบให้เป็นการจำลองที่มีชีวิตได้อย่างไร เอเจนต์ตัดสินใจได้อย่างไร และเวิร์กโฟลว์ห้าขั้นตอนจัดระเบียบทุกอย่างตั้งแต่การสร้างกราฟความรู้ไปจนถึงการตรวจสอบแบบเรียลไทม์ได้อย่างไร
ภาพรวมระบบ: เวิร์กโฟลว์ห้าขั้นตอน
MiroFish ประมวลผลการจำลองผ่านห้าขั้นตอนที่แตกต่างกัน:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │ ──► │ Step 2 │ ──► │ Step 3 │ ──► │ Step 4 │ ──► │ Step 5 │
│ Ontology │ │ GraphRAG │ │ Env │ │ Simulation │ │ Report │
│ Generation │ │ Build │ │ Setup │ │ Run │ │ Generation │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
ขั้นตอนที่ 1: การสร้าง Ontology
- วิเคราะห์เอกสารอินพุตและข้อกำหนดการจำลอง
- ใช้ LLM เพื่อสร้าง Ontology ที่กำหนดสิ่งต่อไปนี้:
- 10 ประเภทเอนทิตี (เช่น นักเรียน, ศาสตราจารย์, มหาวิทยาลัย ฯลฯ)
- 10 ประเภทความสัมพันธ์ (เช่น ทำงานให้, แสดงความคิดเห็นเกี่ยวกับ ฯลฯ)
-
คุณสมบัติ (Attributes) สำหรับแต่ละประเภท (หลีกเลี่ยงคำสงวน เช่น
name,uuid,created_at)
- โครงสร้างสองชั้น: 8 ประเภทเฉพาะ + 2 ประเภทสำรอง (
Person,Organization)
ขั้นตอนที่ 2: การสร้าง GraphRAG
- แบ่งเอกสารเป็นชิ้นย่อย (500 อักขระ, ซ้อนทับ 50)
- ส่งไปยัง Zep Cloud แบบ batch
- สร้างกราฟอิสระ, ตั้งค่า ontology, ส่งชุดข้อความเพื่อแยก entity/relationship
- รอ Zep ประมวลผลแต่ละตอน
- รับกราฟที่ได้ (nodes + edges)
ขั้นตอนที่ 3: การตั้งค่าสภาพแวดล้อม
- วิเคราะห์กราฟเพื่อกำหนดพารามิเตอร์เอเจนต์:
- เวลา (อิงเขตเวลาจีน)
- เหตุการณ์ (โพสต์เริ่มต้น/หัวข้อ)
- กิจกรรมของเอเจนต์ (จำนวนโพสต์/ชั่วโมง, ความล่าช้า, น้ำหนักอิทธิพล)
- แพลตฟอร์ม (Twitter, Reddit)
ขั้นตอนที่ 4: การเรียกใช้การจำลอง
- เอเจนต์ทำงานตามตารางกิจกรรม, โพสต์ แสดงความคิดเห็น ตอบสนอง
- การจำลองแบบคู่ขนานทั้ง Twitter และ Reddit
- บันทึกการกระทำทุกอย่างลง JSONL แบบเรียลไทม์
ขั้นตอนที่ 5: การสร้างรายงาน
- ใช้ 3 เครื่องมือหลัก:
- InsightForge: แยกและค้นหาคำถามเชิงลึก
- PanoramaSearch: ดึงข้อมูลทั้งหมด (รวม expired/invalid)
- InterviewAgents: สัมภาษณ์เอเจนต์ที่ใช้งานอยู่แบบ IPC
เจาะลึกทางเทคนิค: การสร้าง Ontology
ตัวสร้าง Ontology (backend/app/services/ontology_generator.py) ใช้ prompt ระบบที่บังคับใช้กฎอย่างเข้มงวด เช่น แยกสิ่งที่เป็น entity จริงกับสิ่งที่เป็นนามธรรม เพื่อให้การจำลองเกิดขึ้นได้จริงบนโซเชียลมีเดีย
หลังจาก LLM สร้าง ontology แล้ว ใช้เมธอด _validate_and_process บังคับข้อจำกัด (เช่น จำนวน entity/edge type ไม่เกิน 10):
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
การสร้างกราฟความรู้: การผสานรวม Zep
บริการสร้างกราฟ (backend/app/services/graph_builder.py) ดูแลเวิร์กโฟลว์แบบ async:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
การสร้างโมเดล Pydantic แบบไดนามิก
สร้างโมเดล Pydantic สำหรับแต่ละ entity type ขณะรันไทม์:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
การแบ่งหน้าผ่านกราฟขนาดใหญ่
ดึงข้อมูลทั้งกราฟจาก Zep ด้วยการแบ่งหน้า:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
การจำลองกิจกรรมเอเจนต์ตามเวลา
ตัวสร้างการกำหนดค่าการจำลอง (backend/app/services/simulation_config_generator.py) สร้างกิจกรรมสมจริงตามช่วงเวลาต่างๆ (เขตเวลาจีน):
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5], # 凌晨几乎无人
"morning_hours": [6, 7, 8], # 早间逐渐活跃
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22], # 晚间高峰
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
ประเภทเอเจนต์แต่ละแบบมีรูปแบบกิจกรรมต่างกัน:
| ประเภทเอเจนต์ | ระดับกิจกรรม | ชั่วโมงที่ใช้งาน | ความล่าช้าในการตอบสนอง | อิทธิพล |
|---|---|---|---|---|
| มหาวิทยาลัย | 0.2 | 9-17 | 60-240 นาที | 3.0 |
| สำนักข่าว | 0.5 | 7-23 | 5-30 นาที | 2.5 |
| นักเรียน | 0.8 | 8-12, 18-23 | 1-15 นาที | 0.8 |
| ศาสตราจารย์ | 0.4 | 8-21 | 15-90 นาที | 2.0 |
ค่าต่างๆ จะถูกปรับแต่งตามสถานการณ์ด้วย LLM เรียกใช้ค่า default หาก LLM ล้มเหลว
การติดตามการกระทำแบบเรียลไทม์
ตัวเรียกใช้การจำลอง (backend/app/services/simulation_runner.py) สตรีม log JSONL เพื่ออัปเดตสถานะ:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
# Handle events
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
# Parse agent actions
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
อัปเดตสถานะจำลองทุก 2 วินาที ฟรอนต์เอนด์ดึงเพื่อแสดงความคืบหน้าแบบ real-time
การจัดการกระบวนการข้ามแพลตฟอร์ม
หยุดการจำลองอย่างปลอดภัย ทั้งบน Windows/Unix:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
# Windows: use taskkill to kill process tree
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
# Unix: kill process group (created with start_new_session=True)
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
ลงทะเบียน signal handler สำหรับ SIGINT, SIGTERM, SIGHUP:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Then call original handler
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
การสร้างรายงาน: การเรียกค้นแบบสามระดับ
บริการเครื่องมือ Zep (backend/app/services/zep_tools.py) มี 3 ฟังก์ชันหลัก:
InsightForge (การเจาะลึก)
แยก query เป็น sub-query ค้นหาทีละส่วน:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
# 1. Generate sub-queries using LLM
sub_queries = self._generate_sub_queries(query, simulation_requirement)
# 2. Search for each sub-query
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
# 3. Extract entity UUIDs from edges
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
# 4. Fetch detailed entity info
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
# 5. Build relationship chains
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (ครอบคลุมทั้งหมด)
ดึงข้อมูลทุกอย่างรวม expired/invalid:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (เรียลไทม์)
เรียก API สัมภาษณ์ agent จริง:
def interview_agents(self, simulation_id: str, interview_requirement: str):
# 1. Load agent profiles from CSV/JSON
profiles = self._load_agent_profiles(simulation_id)
# 2. Use LLM to select relevant agents
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
# 3. Generate interview questions
questions = self._generate_interview_questions(...)
# 4. Call real interview API (dual-platform)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None, # Interview both Twitter and Reddit
timeout=180.0
)
# 5. Parse and format results
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
การตัดสินใจทางวิศวกรรมที่สำคัญ
1. การจัดการงานแบบ Async
งานที่ใช้เวลานาน (เช่น การสร้างกราฟ, การจำลอง) ใช้ async/thread:
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
สอบถามความคืบหน้าผ่าน /api/graph/task/{task_id}
2. การเรียกใช้ LLM แบบแบทช์พร้อมการลองใหม่
แบ่งรายการ agent ขนาดใหญ่เป็น batch ละ 15:
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
ซ่อมแซม JSON ที่ถูกตัดขาด:
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. การจำลองแบบคู่ขนานหลายแพลตฟอร์ม
Twitter/Reddit ทำงานแยกฐานข้อมูลและ log:
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
ตรวจจับการเสร็จสิ้นด้วย event simulation_end
ข้อควรพิจารณาด้านประสิทธิภาพ
การจัดการหน่วยความจำ
- จำกัดเอกสาร 50k ตัวอักษรสำหรับ LLM context
- สรุป entity จำกัด 300 ตัวอักษร/รายการ
- การกระทำล่าสุดจำกัด 50 (ทั้งหมดล็อกใน JSONL)
การแยกฐานข้อมูล
แต่ละแพลตฟอร์มใช้ SQLite แยกกัน ลดปัญหา lock
การลดทอนประสิทธิภาพอย่างนุ่มนวล
ถ้า Zep Search API ล้มเหลว fallback ไป local search:
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
บทสรุป
MiroFish แสดงให้เห็นการสร้างระบบจำลองหลายเอเจนต์ตั้งแต่ต้นจนจบ เวิร์กโฟลว์ห้าขั้นตอนเปลี่ยนเอกสารดิบให้เป็นโลกดิจิทัลที่มีชีวิตซึ่งเอเจนต์นับพันมีปฏิสัมพันธ์ตามรูปแบบพฤติกรรมที่สมจริง
สิ่งที่ควรเน้นสำหรับการใช้งานจริง:
- การออกแบบ Ontology มีความสำคัญ: โครงสร้างสองชั้น (8 เฉพาะ + 2 สำรอง) ครอบคลุมครบ ไม่เกินขีดจำกัด API
- เวิร์กโฟลว์แบบ Async: ช่วยให้การดำเนินการที่ใช้เวลานานยังตอบสนองผู้ใช้ได้
- กิจกรรมตามเวลาสร้างความสมจริง: พฤติกรรมเอเจนต์อิงช่วงเวลาจริง
- การจำลองสองแพลตฟอร์ม: เปรียบเทียบผลกระทบของแต่ละแพลตฟอร์มได้ชัดเจน
- การเรียกค้นแบบสามระดับ: ตอบโจทย์ทั้งลึก-กว้าง-มุมมองตรงจากเอเจนต์
ดูซอร์สโค้ดเต็มที่ github.com/666ghj/MiroFish
ต้องการลองใช้ MiroFish ไหม? เยี่ยมชม เดโมสด เพื่อดูการจำลองเหตุการณ์ฮอตสปอตในการทำงาน

Top comments (0)