DEV Community

Cover image for كيف تصنع MiroFish عوالم رقمية موازية؟
Yusuf Khalidd
Yusuf Khalidd

Posted on • Originally published at apidog.com

كيف تصنع MiroFish عوالم رقمية موازية؟

مقدمة

وسائل التواصل الاجتماعي سريعة الحركة. يمكن لمنشور واحد أن يطلق موجات متتالية من ردود الفعل، وإعادة التشكيل، والتحركات المضادة التي لم يتوقعها أحد. ماذا لو كان بإمكانك رؤية كيف يتطور سيناريو قبل حدوثه في العالم الحقيقي؟

جرّب Apidog اليوم

هذا بالضبط ما يفعله MiroFish. إنه محرك ذكاء أسراب ينشئ عوالم رقمية متوازية تتفاعل فيها آلاف من وكلاء الذكاء الاصطناعي ذوي شخصيات وذكريات وأنماط سلوكية مميزة بحرية. تقوم بتحميل مادة أولية—مقالة إخبارية، مسودة سياسة، أو حتى رواية—ويقوم MiroFish ببناء محاكاة عالية الدقة لكيفية تطور الأحداث.

💡 تطلب بناء MiroFish أساسًا موثوقًا لاختبار واجهات برمجة التطبيقات (API). استخدم الفريق Apidog لتصميم وتصحيح وتوثيق جميع واجهات برمجة التطبيقات الخلفية قبل كتابة منطق المحاكاة. وقد أدى ذلك إلى اكتشاف مشكلات نقاط النهاية مبكرًا والحفاظ على تزامن الواجهة الخلفية Python والواجهة الأمامية Vue طوال فترة التطوير.

يشرح هذا المنشور البنية التقنية وراء MiroFish، مع التركيز على خطوات التنفيذ العملية: تحويل المستندات إلى محاكاة حية، منطق اتخاذ القرار للوكلاء، وتنظيم سير العمل ذو الخطوات الخمس من بناء الرسم البياني المعرفي حتى مراقبة النتائج في الوقت الفعلي.

MiroFish System Overview

نظرة عامة على النظام: سير العمل ذو الخطوات الخمس

يعالج MiroFish المحاكاة عبر خمس مراحل عملية متسلسلة:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Step 1    │ ──► │   Step 2    │ ──► │   Step 3    │ ──► │   Step 4    │ ──► │   Step 5    │
│  Ontology   │     │  GraphRAG   │     │   Env       │     │ Simulation  │     │   Report    │
│  Generation │     │   Build     │     │   Setup     │     │   Run       │     │ Generation  │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
Enter fullscreen mode Exit fullscreen mode

الخطوة 1: توليد الأنطولوجيا

  • حمّل مستنداتك أو متطلبات المحاكاة.
  • النظام يستخدم LLM لتوليد أنطولوجيا مخصصة:
    • 10 أنواع كيانات (مثال: طالب، أستاذ، جامعة، منفذ إعلامي، وكالة حكومية)
    • 10 أنواع علاقات (مثال: WORKS_FOR، COMMENTS_ON، RESPONDS_TO)
    • سمات لكل نوع (مع تجنب الكلمات المحجوزة كـ name, uuid, created_at)
  • يتم فرض بنية من مستويين: 8 أنواع من السيناريو، + نوعين احتياطيين (Person, Organization).

الخطوة 2: بناء GraphRAG

  • تقسيم المستندات لأجزاء (كل جزء 500 حرف مع تداخل 50 حرف).
  • إرسال الأجزاء على دفعات إلى Zep Cloud:
    1. إنشاء رسم بياني بمعرف فريد.
    2. تعيين الأنطولوجيا المولدة.
    3. إرسال دفعات نصية لاستخراج الكيانات والعلاقات.
    4. انتظار معالجة Zep.
    5. استرداد الرسم النهائي مع العقد والحواف.

الخطوة 3: إعداد البيئة

  • تحليل الرسم المعرفي وتوليد معلمات للوكلاء:
    • إعداد الوقت (ساعات الذروة والركود حسب المنطقة الزمنية الصينية)
    • الأحداث المبدئية (منشورات، مواضيع ساخنة)
    • نشاط الوكلاء (عدد المنشورات في الساعة، تأخير الرد، وزن التأثير)
    • تكوينات المنصات (تويتر، ريديت، عتبات انتشار مختلفة)

الخطوة 4: تشغيل المحاكاة

  • الوكلاء ينشطون حسب جداولهم الزمنية وينشرون ويعلقون ويتفاعلون.
  • تتم المحاكاة بشكل متوازي على تويتر وريديت.
  • جميع الأحداث تُسجل في ملفات JSONL في الوقت الفعلي.

الخطوة 5: توليد التقرير

  • وكيل التقارير يستخدم ثلاث أدوات استرجاع:
    • InsightForge: بحث معمق مع تحليل الأسئلة.
    • PanoramaSearch: استرجاع شامل لكل الوقائع (بما فيها المنتهية الصلاحية).
    • InterviewAgents: مقابلات حية مع الوكلاء النشطين عبر IPC.

تعمق تقني: توليد الأنطولوجيا

  • يوجد مولد الأنطولوجيا في backend/app/services/ontology_generator.py.
  • يستخدم موجه نظام صارم لتحديد الكيانات الصالحة (أشخاص، منظمات، منافذ إعلامية) مقابل غير الصالحة (مفاهيم مجردة...).
  • بعد توليد الأنطولوجيا، تطبق دالة _validate_and_process القيود لضمان التوافق مع حدود Zep وعدم تجاوز الحد الأقصى للأنواع.
def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
    # Zep API limits: max 10 entity types, max 10 edge types
    MAX_ENTITY_TYPES = 10
    MAX_EDGE_TYPES = 10

    # Ensure fallback types exist
    fallbacks_to_add = []
    if "Person" not in entity_names:
        fallbacks_to_add.append(person_fallback)
    if "Organization" not in entity_names:
        fallbacks_to_add.append(organization_fallback)

    # Trim if adding fallbacks would exceed limit
    if current_count + needed_slots > MAX_ENTITY_TYPES:
        result["entity_types"] = result["entity_types"][:-to_remove]

    result["entity_types"].extend(fallbacks_to_add)
    return result
Enter fullscreen mode Exit fullscreen mode

بناء الرسم البياني المعرفي: تكامل Zep

  • خدمة بناء الرسم البياني (backend/app/services/graph_builder.py) تعتمد سير عمل غير متزامن:
def _build_graph_worker(self, task_id: str, text: str, ontology: Dict, ...):
    # 1. Create graph
    graph_id = self.create_graph(graph_name)

    # 2. Set ontology
    self.set_ontology(graph_id, ontology)

    # 3. Chunk text
    chunks = TextProcessor.split_text(text, chunk_size, chunk_overlap)

    # 4. Send batches
    episode_uuids = self.add_text_batches(graph_id, chunks, batch_size)

    # 5. Wait for Zep processing
    self._wait_for_episodes(episode_uuids, progress_callback)

    # 6. Retrieve final graph
    graph_info = self._get_graph_info(graph_id)
Enter fullscreen mode Exit fullscreen mode

توليد نموذج Pydantic الديناميكي

  • النظام ينشئ نماذج Pydantic ديناميكيًا لكل نوع كيان:
def set_ontology(self, graph_id: str, ontology: Dict[str, Any]):
    RESERVED_NAMES = {'uuid', 'name', 'group_id', 'name_embedding', 'summary', 'created_at'}

    def safe_attr_name(attr_name: str) -> str:
        if attr_name.lower() in RESERVED_NAMES:
            return f"entity_{attr_name}"
        return attr_name

    entity_types = {}
    for entity_def in ontology.get("entity_types", []):
        name = entity_def["name"]
        attrs = {"__doc__": description}
        annotations = {}

        for attr_def in entity_def.get("attributes", []):
            attr_name = safe_attr_name(attr_def["name"])
            attrs[attr_name] = Field(description=attr_desc, default=None)
            annotations[attr_name] = Optional[EntityText]

        attrs["__annotations__"] = annotations
        entity_class = type(name, (EntityModel,), attrs)
        entity_types[name] = entity_class
Enter fullscreen mode Exit fullscreen mode

التنقل عبر الرسوم البيانية الكبيرة

  • لاسترداد كل العقد من Zep باستعمال الصفحات:
def fetch_all_nodes(client: Zep, graph_id: str) -> List[Node]:
    nodes = []
    cursor = None
    while True:
        result = client.graph.get_nodes(graph_id=graph_id, cursor=cursor, limit=100)
        nodes.extend(result.nodes)
        if not result.next_cursor:
            break
        cursor = result.next_cursor
    return nodes
Enter fullscreen mode Exit fullscreen mode

محاكاة نشاط الوكيل القائم على الوقت

  • مولد التكوين (backend/app/services/simulation_config_generator.py) يبني أنماط نشاط مستندة للمنطقة الزمنية الصينية:
CHINA_TIMEZONE_CONFIG = {
    "dead_hours": [0, 1, 2, 3, 4, 5],
    "morning_hours": [6, 7, 8],
    "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    "peak_hours": [19, 20, 21, 22],
    "night_hours": [23],
    "activity_multipliers": {
        "dead": 0.05,
        "morning": 0.4,
        "work": 0.7,
        "peak": 1.5,
        "night": 0.5
    }
}
Enter fullscreen mode Exit fullscreen mode
  • كل نوع وكيل له إعدادات مختلفة:
نوع الوكيل مستوى النشاط ساعات النشاط تأخير الاستجابة التأثير
University 0.2 9-17 60-240 min 3.0
MediaOutlet 0.5 7-23 5-30 min 2.5
Student 0.8 8-12, 18-23 1-15 min 0.8
Professor 0.4 8-21 15-90 min 2.0
  • يتم تخصيص هذه القيم عبر LLM، والرجوع للقيم الافتراضية عند الحاجة.

تتبع الإجراءات في الوقت الفعلي

  • مشغل المحاكاة (backend/app/services/simulation_runner.py) يبث أحداث JSONL:
def _read_action_log(self, log_path: str, position: int, state: SimulationRunState, platform: str):
    with open(log_path, 'r', encoding='utf-8') as f:
        f.seek(position)
        for line in f:
            action_data = json.loads(line)

            if "event_type" in action_data:
                if action_data["event_type"] == "simulation_end":
                    state.twitter_completed = True
                elif action_data["event_type"] == "round_end":
                    state.current_round = action_data["round"]
                continue

            action = AgentAction(
                round_num=action_data.get("round", 0),
                platform=platform,
                agent_id=action_data.get("agent_id", 0),
                action_type=action_data.get("action_type", ""),
                ...
            )
            state.add_action(action)

        return f.tell()
Enter fullscreen mode Exit fullscreen mode
  • التحديث كل ثانيتين، مع استعلام الواجهة الأمامية عن حالة التقدم.

إدارة العمليات عبر المنصات

  • إغلاق المحاكاة بشكل نظيف عبر منصات Windows وUnix:
def _terminate_process(cls, process: subprocess.Popen, simulation_id: str, timeout: int = 10):
    if IS_WINDOWS:
        subprocess.run(['taskkill', '/PID', str(process.pid), '/T'], ...)
    else:
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)
Enter fullscreen mode Exit fullscreen mode
  • تسجيل معالجات الإشارات:
def register_cleanup(cls):
    def cleanup_handler(signum, frame):
        cls.cleanup_all_simulations()
        # Then call original handler

    signal.signal(signal.SIGTERM, cleanup_handler)
    signal.signal(signal.SIGINT, cleanup_handler)
    if has_sighup:
        signal.signal(signal.SIGHUP, cleanup_handler)

    atexit.register(cls.cleanup_all_simulations)
Enter fullscreen mode Exit fullscreen mode

توليد التقرير: استرجاع ثلاثي الطبقات

  • خدمة أدوات Zep (backend/app/services/zep_tools.py) تدعم:

InsightForge (بحث معمق)

  • تحليل الأسئلة المعقدة إلى استعلامات فرعية وجمع النتائج:
def insight_forge(self, graph_id: str, query: str, simulation_requirement: str):
    sub_queries = self._generate_sub_queries(query, simulation_requirement)

    for sub_query in sub_queries:
        search_result = self.search_graph(graph_id, query=sub_query)
        all_facts.extend(search_result.facts)

    entity_uuids = set(edge['source_node_uuid'] for edge in all_edges)

    for uuid in entity_uuids:
        node = self.get_node_detail(uuid)
        entity_insights.append({...})

    for edge in all_edges:
        chain = f"{source_name} --[{relation_name}]--> {target_name}"
        relationship_chains.append(chain)
Enter fullscreen mode Exit fullscreen mode

PanoramaSearch (نطاق كامل)

  • استرجاع كل الوقائع بما فيها المنتهية الصلاحية:
def panorama_search(self, graph_id: str, query: str, include_expired: bool = True):
    all_nodes = self.get_all_nodes(graph_id)
    all_edges = self.get_all_edges(graph_id, include_temporal=True)

    for edge in all_edges:
        is_historical = edge.is_expired or edge.is_invalid
        if is_historical:
            historical_facts.append(f"[{valid_at} - {invalid_at}] {edge.fact}")
        else:
            active_facts.append(edge.fact)
Enter fullscreen mode Exit fullscreen mode

InterviewAgents (في الوقت الفعلي)

  • مقابلات حية مع الوكلاء عبر واجهة OASIS:
def interview_agents(self, simulation_id: str, interview_requirement: str):
    profiles = self._load_agent_profiles(simulation_id)

    selected_agents, selected_indices, reasoning = self._select_agents_for_interview(...)

    questions = self._generate_interview_questions(...)

    api_result = SimulationRunner.interview_agents_batch(
        simulation_id=simulation_id,
        interviews=[{"agent_id": idx, "prompt": combined_prompt} for idx in selected_indices],
        platform=None,
        timeout=180.0
    )

    for i, agent_idx in enumerate(selected_indices):
        twitter_response = results_dict.get(f"twitter_{agent_idx}", {})
        reddit_response = results_dict.get(f"reddit_{agent_idx}", {})
        response_text = f"[Twitter]\n{twitter_response}\n\n[Reddit]\n{reddit_response}"
Enter fullscreen mode Exit fullscreen mode

قرارات هندسية رئيسية

1. إدارة المهام غير المتزامنة

  • استخدم المهام غير المتزامنة مع تتبع التقدم للعمليات الطويلة (بناء الرسم البياني، تشغيل المحاكاة):
def build_graph_async(self, text: str, ontology: Dict, ...) -> str:
    task_id = self.task_manager.create_task(task_type="graph_build", metadata={...})

    thread = threading.Thread(
        target=self._build_graph_worker,
        args=(task_id, text, ontology, ...)
    )
    thread.daemon = True
    thread.start()

    return task_id
Enter fullscreen mode Exit fullscreen mode
  • استعلام حالة المهمة عبر /api/graph/task/{task_id}.

2. استدعاءات LLM المجمعة مع إعادة المحاولة

  • تقسيم القوائم الكبيرة إلى دفعات (كل دفعة 15 وكيل):
num_batches = math.ceil(len(entities) / self.AGENTS_PER_BATCH)
for batch_idx in range(num_batches):
    batch_entities = entities[start_idx:end_idx]
    batch_configs = self._generate_agent_configs_batch(context, batch_entities)
    all_agent_configs.extend(batch_configs)
Enter fullscreen mode Exit fullscreen mode
  • إصلاح JSON المقطوع في كل دفعة:
def _fix_truncated_json(self, content: str) -> str:
    open_braces = content.count('{') - content.count('}')
    open_brackets = content.count('[') - content.count(']')

    if content and content[-1] not in '",}]':
        content += '"'

    content += ']' * open_brackets
    content += '}' * open_braces
    return content
Enter fullscreen mode Exit fullscreen mode

3. محاكاة متوازية لمنصتين

  • تويتر وريديت يعملان في مسارات منفصلة مع قواعد بيانات وسجلات خاصة:
uploads/simulations/{simulation_id}/
├── twitter/
│   ├── actions.jsonl
│   └── twitter_simulation.db
├── reddit/
│   ├── actions.jsonl
│   └── reddit_simulation.db
├── simulation_config.json
├── run_state.json
└── simulation.log
Enter fullscreen mode Exit fullscreen mode
  • يتم اكتشاف نهاية كل منصة عبر حدث simulation_end.

اعتبارات الأداء

إدارة الذاكرة

  • اقتصاص المستندات الكبيرة إلى 50 ألف حرف.
  • ملخصات الكيانات محددة بـ 300 حرف.
  • الاحتفاظ بـ 50 إجراء فقط في الذاكرة (والسجل الكامل في JSONL).

عزل قواعد البيانات

  • كل منصة تعتمد SQLite منفصلة لتجنب تضارب الأقفال.

التدهور السلس

  • عند فشل Zep Search API، يعود النظام إلى بحث الكلمات الرئيسية محليًا:
try:
    search_results = self.client.graph.search(...)
except Exception as e:
    logger.warning(f"Zep Search API failed, falling back to local search: {e}")
    return self._local_search(graph_id, query, limit, scope)
Enter fullscreen mode Exit fullscreen mode

الخاتمة

يوضح MiroFish كيفية بناء نظام محاكاة كامل متعدد الوكلاء من الصفر. سير العمل ذو الخطوات الخمس يحول المستندات الخام إلى عوالم رقمية حية تتفاعل فيها آلاف الوكلاء بأنماط واقعية.

النقاط العملية الأساسية:

  1. تصميم الأنطولوجيا مهم: البنية ذات المستويين (8 أنواع محددة + 2 احتياطيين) تضمن التغطية وعدم تجاوز حدود API.
  2. سير العمل غير المتزامن: تتبع المهام مع تحديثات التقدم يحافظ على تفاعل المستخدمين أثناء العمليات الطويلة.
  3. النشاط القائم على الوقت: أنماط المنطقة الزمنية الصينية وجداول كل وكيل تنتج سلوكًا مشابهًا للواقع.
  4. محاكاة ثنائية المنصة: تشغيل تويتر وريديت بالتوازي يظهر اختلافات ديناميكيات المنصات.
  5. استرجاع ثلاثي الطبقات: أدوات InsightForge وPanoramaSearch وInterviewAgents تغطي كل حالات التحليل.

الكود المصدري الكامل متاح على github.com/666ghj/MiroFish.

هل ترغب في تجربة MiroFish؟ قم بزيارة العرض التوضيحي المباشر لمشاهدة محاكاة حدث نقطة ساخنة في العمل.

Top comments (0)