مقدمة
وسائل التواصل الاجتماعي سريعة الحركة. يمكن لمنشور واحد أن يطلق موجات متتالية من ردود الفعل، وإعادة التشكيل، والتحركات المضادة التي لم يتوقعها أحد. ماذا لو كان بإمكانك رؤية كيف يتطور سيناريو قبل حدوثه في العالم الحقيقي؟
هذا بالضبط ما يفعله MiroFish. إنه محرك ذكاء أسراب ينشئ عوالم رقمية متوازية تتفاعل فيها آلاف من وكلاء الذكاء الاصطناعي ذوي شخصيات وذكريات وأنماط سلوكية مميزة بحرية. تقوم بتحميل مادة أولية—مقالة إخبارية، مسودة سياسة، أو حتى رواية—ويقوم MiroFish ببناء محاكاة عالية الدقة لكيفية تطور الأحداث.
💡 تطلب بناء MiroFish أساسًا موثوقًا لاختبار واجهات برمجة التطبيقات (API). استخدم الفريق Apidog لتصميم وتصحيح وتوثيق جميع واجهات برمجة التطبيقات الخلفية قبل كتابة منطق المحاكاة. وقد أدى ذلك إلى اكتشاف مشكلات نقاط النهاية مبكرًا والحفاظ على تزامن الواجهة الخلفية Python والواجهة الأمامية Vue طوال فترة التطوير.
يشرح هذا المنشور البنية التقنية وراء MiroFish، مع التركيز على خطوات التنفيذ العملية: تحويل المستندات إلى محاكاة حية، منطق اتخاذ القرار للوكلاء، وتنظيم سير العمل ذو الخطوات الخمس من بناء الرسم البياني المعرفي حتى مراقبة النتائج في الوقت الفعلي.
نظرة عامة على النظام: سير العمل ذو الخطوات الخمس
يعالج MiroFish المحاكاة عبر خمس مراحل عملية متسلسلة:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │ ──► │ Step 2 │ ──► │ Step 3 │ ──► │ Step 4 │ ──► │ Step 5 │
│ Ontology │ │ GraphRAG │ │ Env │ │ Simulation │ │ Report │
│ Generation │ │ Build │ │ Setup │ │ Run │ │ Generation │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
الخطوة 1: توليد الأنطولوجيا
- حمّل مستنداتك أو متطلبات المحاكاة.
- النظام يستخدم LLM لتوليد أنطولوجيا مخصصة:
- 10 أنواع كيانات (مثال: طالب، أستاذ، جامعة، منفذ إعلامي، وكالة حكومية)
- 10 أنواع علاقات (مثال: WORKS_FOR، COMMENTS_ON، RESPONDS_TO)
-
سمات لكل نوع (مع تجنب الكلمات المحجوزة كـ
name,uuid,created_at)
- يتم فرض بنية من مستويين: 8 أنواع من السيناريو، + نوعين احتياطيين (
Person,Organization).
الخطوة 2: بناء GraphRAG
- تقسيم المستندات لأجزاء (كل جزء 500 حرف مع تداخل 50 حرف).
- إرسال الأجزاء على دفعات إلى Zep Cloud:
- إنشاء رسم بياني بمعرف فريد.
- تعيين الأنطولوجيا المولدة.
- إرسال دفعات نصية لاستخراج الكيانات والعلاقات.
- انتظار معالجة Zep.
- استرداد الرسم النهائي مع العقد والحواف.
الخطوة 3: إعداد البيئة
- تحليل الرسم المعرفي وتوليد معلمات للوكلاء:
- إعداد الوقت (ساعات الذروة والركود حسب المنطقة الزمنية الصينية)
- الأحداث المبدئية (منشورات، مواضيع ساخنة)
- نشاط الوكلاء (عدد المنشورات في الساعة، تأخير الرد، وزن التأثير)
- تكوينات المنصات (تويتر، ريديت، عتبات انتشار مختلفة)
الخطوة 4: تشغيل المحاكاة
- الوكلاء ينشطون حسب جداولهم الزمنية وينشرون ويعلقون ويتفاعلون.
- تتم المحاكاة بشكل متوازي على تويتر وريديت.
- جميع الأحداث تُسجل في ملفات JSONL في الوقت الفعلي.
الخطوة 5: توليد التقرير
- وكيل التقارير يستخدم ثلاث أدوات استرجاع:
- InsightForge: بحث معمق مع تحليل الأسئلة.
- PanoramaSearch: استرجاع شامل لكل الوقائع (بما فيها المنتهية الصلاحية).
- InterviewAgents: مقابلات حية مع الوكلاء النشطين عبر IPC.
تعمق تقني: توليد الأنطولوجيا
- يوجد مولد الأنطولوجيا في
backend/app/services/ontology_generator.py. - يستخدم موجه نظام صارم لتحديد الكيانات الصالحة (أشخاص، منظمات، منافذ إعلامية) مقابل غير الصالحة (مفاهيم مجردة...).
- بعد توليد الأنطولوجيا، تطبق دالة
_validate_and_processالقيود لضمان التوافق مع حدود Zep وعدم تجاوز الحد الأقصى للأنواع.
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
# Zep API limits: max 10 entity types, max 10 edge types
MAX_ENTITY_TYPES = 10
MAX_EDGE_TYPES = 10
# Ensure fallback types exist
fallbacks_to_add = []
if "Person" not in entity_names:
fallbacks_to_add.append(person_fallback)
if "Organization" not in entity_names:
fallbacks_to_add.append(organization_fallback)
# Trim if adding fallbacks would exceed limit
if current_count + needed_slots > MAX_ENTITY_TYPES:
result["entity_types"] = result["entity_types"][:-to_remove]
result["entity_types"].extend(fallbacks_to_add)
return result
بناء الرسم البياني المعرفي: تكامل Zep
- خدمة بناء الرسم البياني (
backend/app/services/graph_builder.py) تعتمد سير عمل غير متزامن:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
# 1. Create graph
graph_id = self.create_graph(graph_name)
# 2. Set ontology
self.set_ontology(graph_id, ontology)
# 3. Chunk text
chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)
# 4. Send batches
episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)
# 5. Wait for Zep processing
self._wait_for_episodes(episode_uuids, progress_callback)
# 6. Retrieve final graph
graph_info = self._get_graph_info(graph_id)
توليد نموذج Pydantic الديناميكي
- النظام ينشئ نماذج Pydantic ديناميكيًا لكل نوع كيان:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}
def safe_attr_name(attr_name: str) -> str:
if attr_name.lower() in RESERVED_NAMES:
return f"entity_{attr_name}"
return attr_name
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": description}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"])
attrs[attr_name] = Field(description=attr_desc, default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
التنقل عبر الرسوم البيانية الكبيرة
- لاسترداد كل العقد من Zep باستعمال الصفحات:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
nodes = []
cursor = None
while True:
result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
nodes.extend(result.nodes)
if not result.next_cursor:
break
cursor = result.next_cursor
return nodes
محاكاة نشاط الوكيل القائم على الوقت
- مولد التكوين (
backend/app/services/simulation_config_generator.py) يبني أنماط نشاط مستندة للمنطقة الزمنية الصينية:
CHINA_TIMEZONE_CONFIG = {
"dead_hours": [0, 1, 2, 3, 4, 5],
"morning_hours": [6, 7, 8],
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_hours": [19, 20, 21, 22],
"night_hours": [23],
"activity_multipliers": {
"dead": 0.05,
"morning": 0.4,
"work": 0.7,
"peak": 1.5,
"night": 0.5
}
}
- كل نوع وكيل له إعدادات مختلفة:
| نوع الوكيل | مستوى النشاط | ساعات النشاط | تأخير الاستجابة | التأثير |
|---|---|---|---|---|
| University | 0.2 | 9-17 | 60-240 min | 3.0 |
| MediaOutlet | 0.5 | 7-23 | 5-30 min | 2.5 |
| Student | 0.8 | 8-12, 18-23 | 1-15 min | 0.8 |
| Professor | 0.4 | 8-21 | 15-90 min | 2.0 |
- يتم تخصيص هذه القيم عبر LLM، والرجوع للقيم الافتراضية عند الحاجة.
تتبع الإجراءات في الوقت الفعلي
- مشغل المحاكاة (
backend/app/services/simulation_runner.py) يبث أحداث JSONL:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
action_data = json.loads(line)
if "event_type" in action_data:
if action_data["event_type"] == "simulation_end":
state.twitter_completed = True
elif action_data["event_type"] == "round_end":
state.current_round = action_data["round"]
continue
action = AgentAction(
round_num=action_data.get("round", 0),
platform=platform,
agent_id=action_data.get("agent_id", 0),
action_type=action_data.get("action_type", ""),
...
)
state.add_action(action)
return f.tell()
- التحديث كل ثانيتين، مع استعلام الواجهة الأمامية عن حالة التقدم.
إدارة العمليات عبر المنصات
- إغلاق المحاكاة بشكل نظيف عبر منصات Windows وUnix:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
if IS_WINDOWS:
subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
else:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
- تسجيل معالجات الإشارات:
def register_cleanup(cls):
def cleanup_handler(signum, frame):
cls.cleanup_all_simulations()
# Then call original handler
signal.signal(signal.SIGTERM, cleanup_handler)
signal.signal(signal.SIGINT, cleanup_handler)
if has_sighup:
signal.signal(signal.SIGHUP, cleanup_handler)
atexit.register(cls.cleanup_all_simulations)
توليد التقرير: استرجاع ثلاثي الطبقات
- خدمة أدوات Zep (
backend/app/services/zep_tools.py) تدعم:
InsightForge (بحث معمق)
- تحليل الأسئلة المعقدة إلى استعلامات فرعية وجمع النتائج:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
sub_queries = self._generate_sub_queries(query, simulation_requirement)
for sub_query in sub_queries:
search_result = self.search_graph(graph_id, query=sub_query)
all_facts.extend(search_result.facts)
entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)
for uuid in entity_uuids:
node = self.get_node_detail(uuid)
entity_insights.append({...})
for edge in all_edges:
chain = f"{source_name} --[{relation_name}]--> {target_name}"
relationship_chains.append(chain)
PanoramaSearch (نطاق كامل)
- استرجاع كل الوقائع بما فيها المنتهية الصلاحية:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
all_nodes = self.get_all_nodes(graph_id)
all_edges = self.get_all_edges(graph_id, include_temporal=True)
for edge in all_edges:
is_historical = edge.is_expired or edge.is_invalid
if is_historical:
historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
else:
active_facts.append(edge.fact)
InterviewAgents (في الوقت الفعلي)
- مقابلات حية مع الوكلاء عبر واجهة OASIS:
def interview_agents(self, simulation_id: str, interview_requirement: str):
profiles = self._load_agent_profiles(simulation_id)
selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)
questions = self._generate_interview_questions(...)
api_result = SimulationRunner.interview_agents_batch(
simulation_id=simulation_id,
interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
platform=None,
timeout=180.0
)
for i, agent_idx in enumerate(selected_indices):
twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
قرارات هندسية رئيسية
1. إدارة المهام غير المتزامنة
- استخدم المهام غير المتزامنة مع تتبع التقدم للعمليات الطويلة (بناء الرسم البياني، تشغيل المحاكاة):
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})
thread = threading.Thread(
target=self._build_graph_worker,
args=(task_id, text, ontology, ...)
)
thread.daemon = True
thread.start()
return task_id
- استعلام حالة المهمة عبر
/api/graph/task/{task_id}.
2. استدعاءات LLM المجمعة مع إعادة المحاولة
- تقسيم القوائم الكبيرة إلى دفعات (كل دفعة 15 وكيل):
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
batch_entities = entities[start_idx:end_idx]
batch_configs = self._generate_agent_configs_batch(context, batch_entities)
all_agent_configs.extend(batch_configs)
- إصلاح JSON المقطوع في كل دفعة:
def _fix_truncated_json(self, content: str) -> str:
open_braces = content.count('{') - content.count('}')
open_brackets = content.count('[') - content.count(']')
if content and content[-1] not in '",}]':
content += '"'
content += ']' * open_brackets
content += '}' * open_braces
return content
3. محاكاة متوازية لمنصتين
- تويتر وريديت يعملان في مسارات منفصلة مع قواعد بيانات وسجلات خاصة:
uploads/simulations/{simulation_id}/
├── twitter/
│ ├── actions.jsonl
│ └── twitter_simulation.db
├── reddit/
│ ├── actions.jsonl
│ └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
- يتم اكتشاف نهاية كل منصة عبر حدث
simulation_end.
اعتبارات الأداء
إدارة الذاكرة
- اقتصاص المستندات الكبيرة إلى 50 ألف حرف.
- ملخصات الكيانات محددة بـ 300 حرف.
- الاحتفاظ بـ 50 إجراء فقط في الذاكرة (والسجل الكامل في JSONL).
عزل قواعد البيانات
- كل منصة تعتمد SQLite منفصلة لتجنب تضارب الأقفال.
التدهور السلس
- عند فشل Zep Search API، يعود النظام إلى بحث الكلمات الرئيسية محليًا:
try:
search_results = self.client.graph.search(...)
except Exception as e:
logger.warning(f"Zep Search API failed, falling back to local search: {e}")
return self._local_search(graph_id, query, limit, scope)
الخاتمة
يوضح MiroFish كيفية بناء نظام محاكاة كامل متعدد الوكلاء من الصفر. سير العمل ذو الخطوات الخمس يحول المستندات الخام إلى عوالم رقمية حية تتفاعل فيها آلاف الوكلاء بأنماط واقعية.
النقاط العملية الأساسية:
- تصميم الأنطولوجيا مهم: البنية ذات المستويين (8 أنواع محددة + 2 احتياطيين) تضمن التغطية وعدم تجاوز حدود API.
- سير العمل غير المتزامن: تتبع المهام مع تحديثات التقدم يحافظ على تفاعل المستخدمين أثناء العمليات الطويلة.
- النشاط القائم على الوقت: أنماط المنطقة الزمنية الصينية وجداول كل وكيل تنتج سلوكًا مشابهًا للواقع.
- محاكاة ثنائية المنصة: تشغيل تويتر وريديت بالتوازي يظهر اختلافات ديناميكيات المنصات.
- استرجاع ثلاثي الطبقات: أدوات InsightForge وPanoramaSearch وInterviewAgents تغطي كل حالات التحليل.
الكود المصدري الكامل متاح على github.com/666ghj/MiroFish.
هل ترغب في تجربة MiroFish؟ قم بزيارة العرض التوضيحي المباشر لمشاهدة محاكاة حدث نقطة ساخنة في العمل.

Top comments (0)