Giới thiệu
Mạng xã hội phát triển nhanh chóng, nơi mỗi bài đăng có thể kích hoạt hàng loạt phản ứng và tạo ra các phong trào khó lường. Nhưng nếu bạn có thể mô phỏng trước các kịch bản này trước khi chúng xảy ra?
MiroFish là công cụ mô phỏng xã hội đa tác nhân, cho phép bạn tải lên tài liệu (báo, chính sách, tiểu thuyết) và xây dựng mô phỏng số hóa với hàng ngàn tác nhân AI có tính cách, ký ức và hành vi riêng biệt.
💡 Việc phát triển MiroFish đòi hỏi nền tảng kiểm thử API ổn định. Nhóm đã sử dụng Apidog để thiết kế, debug và tạo tài liệu cho toàn bộ API backend trước khi xây dựng logic mô phỏng, giúp phát hiện sớm lỗi endpoint và duy trì đồng bộ giữa backend Python và frontend Vue.
Bài viết này hướng dẫn cách triển khai kiến trúc kỹ thuật của MiroFish: từ chuyển đổi tài liệu thành mô phỏng, quy trình quyết định của tác nhân, đến workflow năm bước gồm xây dựng ontology, tri thức, mô phỏng và giám sát real-time.
Tổng quan hệ thống: Quy trình 5 bước
MiroFish tổ chức mô phỏng thành 5 giai đoạn:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │ ──► │ Step 2 │ ──► │ Step 3 │ ──► │ Step 4 │ ──► │ Step 5 │
│ Ontology │ │ GraphRAG │ │ Env │ │ Simulation │ │ Report │
│ Generation │ │ Build │ │ Setup │ │ Run │ │ Generation │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
Bước 1: Tạo Ontology
- Phân tích tài liệu đầu vào + yêu cầu mô phỏng.
- Sử dụng LLM để sinh ontology tùy chỉnh:
- 10 loại thực thể (VD: Student, Professor, University, MediaOutlet, GovernmentAgency).
- 10 loại mối quan hệ (VD: WORKS_FOR, COMMENTS_ON, RESPONDS_TO).
-
Các thuộc tính cho từng loại (loại trừ các reserved word như
name,uuid,created_at).
- Ontology luôn gồm 8 loại tùy chỉnh + 2 loại fallback (
Person,Organization).
Bước 2: Xây dựng GraphRAG
- Chia tài liệu thành đoạn 500 ký tự (overlap 50), gửi batch lên Zep Cloud.
- Quy trình:
- Tạo graph mới (unique ID).
- Thiết lập ontology.
- Gửi các đoạn text để trích xuất entity & relation.
- Đợi Zep xử lý.
- Lấy graph hoàn chỉnh gồm node & edge.
Bước 3: Thiết lập môi trường
- Phân tích graph tri thức để tạo thông số tác nhân:
- Cấu hình thời gian (theo mẫu múi giờ Trung Quốc: peak 19-22h, dead 0-5h).
- Cấu hình sự kiện (bài đăng khởi tạo, chủ đề hot).
- Cấu hình agent (số bài post/giờ, độ trễ phản hồi, trọng số ảnh hưởng).
- Cấu hình nền tảng (Twitter, Reddit - mỗi nền tảng ngưỡng lan truyền riêng).
Bước 4: Chạy mô phỏng
- Agent hoạt động theo lịch trình, thực hiện post, comment, react.
- Mô phỏng chạy song song trên Twitter/Reddit.
- Ghi lại toàn bộ action vào file JSONL real-time.
Bước 5: Tạo báo cáo
- Tác nhân Báo cáo sử dụng 3 công cụ:
- InsightForge: Truy vấn sâu, chia nhỏ câu hỏi phức tạp.
- PanoramaSearch: Truy xuất toàn bộ sự kiện, bao gồm cả expired/invalid.
- InterviewAgents: Giao tiếp real-time với agent qua IPC.
Đi sâu kỹ thuật: Tạo Ontology
File chính: backend/app/services/ontology_generator.py
- Prompt hệ thống quy định rõ tiêu chuẩn entity hợp lệ (person, organization, media outlet) và loại trừ các khái niệm trừu tượng.
- Sau khi LLM sinh ontology, hàm
_validate_and_processenforce giới hạn 10 loại entity/edge, bổ sung fallback nếu thiếu, đảm bảo tương thích Zep.
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
Xây dựng biểu đồ tri thức: Tích hợp Zep
Dịch vụ backend/app/services/graph_builder.py xử lý quy trình không đồng bộ tạo graph:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
graph_id = self.create_graph(graph_name)
self.set_ontology(graph_id, ontology)
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
self._wait_for_episodes(episode_uuids, progress_callback)
graph_info = self._get_graph_info(graph_id)
Sinh mô hình Pydantic động
Hệ thống tự động generate Pydantic model cho mỗi entity type khi runtime:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
Phân trang graph lớn
Khi lấy node/edge từ Zep, kết quả trả về dạng phân trang. Dùng helper để fetch toàn bộ:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
Mô phỏng hoạt động agent theo thời gian
File: backend/app/services/simulation_config_generator.py
Cấu hình hành vi agent dựa trên múi giờ Trung Quốc:
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5],
"morning_hours": [6, 7, 8],
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22],
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
| Loại tác nhân | Mức độ hoạt động | Giờ hoạt động | Độ trễ phản hồi | Ảnh hưởng |
|---|---|---|---|---|
| Đại học | 0.2 | 9-17 | 60-240 min | 3.0 |
| Cơ quan truyền thông | 0.5 | 7-23 | 5-30 min | 2.5 |
| Sinh viên | 0.8 | 8-12, 18-23 | 1-15 min | 0.8 |
| Giáo sư | 0.4 | 8-21 | 15-90 min | 2.0 |
Các thông số được LLM tuỳ chỉnh theo kịch bản, fallback về giá trị mặc định nếu LLM lỗi.
Theo dõi hành động real-time
backend/app/services/simulation_runner.py stream action log JSONL:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True # or reddit
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
Luồng này chạy nền, cập nhật trạng thái mô phỏng 2s/lần. Frontend polling để hiện tiến độ real-time.
Quản lý tiến trình đa nền tảng
Dừng mô phỏng cần xử lý tiến trình trên Windows/Unix:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Đăng ký cleanup handler với SIGINT, SIGTERM, SIGHUP để đảm bảo dọn dẹp tiến trình khi server tắt:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
Tạo báo cáo: Truy xuất ba cấp
backend/app/services/zep_tools.py gồm ba chức năng:
InsightForge (Tìm hiểu chuyên sâu)
Truy vấn chia nhỏ, tổng hợp nhiều kết quả:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
sub_queries = self._generate_sub_queries(query, simulation_requirement)
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (Phạm vi đầy đủ)
Trả về toàn bộ node/edge, kể cả historical:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (Phỏng vấn real-time)
Tương tác với agent đang hoạt động qua API:
def interview_agents(self, simulation_id: str, interview_requirement: str):
profiles = self._load_agent_profiles(simulation_id)
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
questions = self._generate_interview_questions(...)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None,
timeout=180.0
)
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Các quyết định kỹ thuật quan trọng
1. Quản lý task bất đồng bộ
Các thao tác dài (build graph, chạy simulation) đều chạy async, track status:
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
Frontend polling /api/graph/task/{task_id} để lấy status.
2. Gọi LLM theo lô + sửa lỗi JSON
Chia danh sách agent lớn thành batch 15, sửa lỗi JSON bị cắt cụt:
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. Mô phỏng song song trên hai nền tảng
Twitter và Reddit chạy độc lập, lưu action/database riêng biệt:
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
Các cân nhắc về hiệu suất
Quản lý bộ nhớ
- Tài liệu cắt ngắn 50.000 ký tự khi gửi LLM.
- Tóm tắt entity giới hạn 300 ký tự.
- Lưu tối đa 50 action gần nhất trong RAM, full history lưu file JSONL.
Cách ly database
- Mỗi nền tảng một file SQLite riêng, tránh deadlock khi ghi song song.
Fallback tìm kiếm local
- Nếu API Zep search lỗi, fallback sang local keyword search:
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
Kết luận
MiroFish là ví dụ thực tế về xây dựng hệ thống mô phỏng đa tác nhân, biến tài liệu thành thế giới số sống động với quy trình 5 bước.
Điểm quan trọng để triển khai:
- Thiết kế ontology 2 tầng giúp bao phủ đủ entity mà không vượt quá limit API.
- Workflow bất đồng bộ + cập nhật tiến trình giúp UX tốt cho tác vụ dài.
- Hoạt động agent dựa trên thời gian tăng tính chân thực.
- So sánh song song Twitter/Reddit làm rõ sự khác biệt nền tảng.
- Truy xuất báo cáo 3 cấp đáp ứng nhu cầu depth, breadth, real-time.
Mã nguồn đầy đủ: github.com/666ghj/MiroFish
Bạn muốn thử MiroFish? Truy cập bản demo trực tiếp để xem mô phỏng sự kiện điểm nóng.

Top comments (0)