DEV Community

Aman Sachan
Aman Sachan

Posted on • Originally published at github.com

I rebuilt Zo Computer's seven subsystems in 800 lines of Python — here's the architecture, the tradeoffs, and what I cut

I rebuilt Zo Computer's seven subsystems in 800 lines of Python — here's the architecture, the tradeoffs, and what I cut

I've been using Zo Computer as my primary AI workspace for a few months. The piece I kept coming back to wasn't the model — it was the substrate: the agent manager that spawns parallel sessions, the skills registry that auto-loads SKILL.md files, the memory engine that compresses old context, the rrule-based scheduler, the compute pool that turns idle machines into workers, the BYOK client that swaps between Groq/OpenAI/Anthropic, and the headless browser that actually clicks things.

So I asked the obvious question: how much of that is concept and how much is platform glue? Could a single Python package on a laptop give a developer 80% of the same shape?

ZoClone is my answer. Seven files in src/, ~800 lines of dependency-light Python, and every subsystem above is wired up. No daemon, no Docker, no Postgres — just ~/.zoclone/*.db and a ThreadPoolExecutor.

Here's the architecture, what I learned about which parts are easy to clone and which ones are doing real work, and the shortcuts I had to take to fit the whole thing in a single repo.

The seven files

ZoClone/
├── src/
│   ├── zo.py              # top-level orchestrator + ask() loop
│   ├── agent_manager.py   # parallel async agents via Zo /zo/ask
│   ├── skills.py          # SKILL.md auto-loader + handler dispatch
│   ├── memory.py          # TF-IDF fallback embeddings + context recall
│   ├── automation.py      # rrule scheduler with minute/hour/day cadences
│   ├── compute_pool.py    # node registry + priority FIFO dispatch
│   ├── browser.py         # Playwright headless + navigate/screenshot/eval
│   ├── byok.py            # key vault for Groq/OpenAI/Anthropic/Ollama
│   ├── zo_client.py       # OpenAI-compatible chat() abstraction
│   └── services.py        # process supervisor (start/stop/logs)
Enter fullscreen mode Exit fullscreen mode

Total LoC: 775. No __init__.py magic, no metaclass tricks, no plugin discovery beyond a directory scan. The constraint forced every interface to be a plain function or a class with three methods.

The orchestrator: zo.py

Everything threads through a single ZoClone class that owns the DB connection, a thread pool, and a AIClient that's lazily constructed on first call to ask().

class ZoClone:
    def __init__(self):
        self.db = init_db()
        self.executor = ThreadPoolExecutor(max_workers=10)
        self.ai_client = None
        self.pool = pool        # module-level singleton
        self.hosting = hosting  # module-level singleton
        self.memory = memory
        self.scheduler = scheduler

    def ask(self, conv_id: str, message: str, provider: str = "groq",
            model: str = "", tools: list[dict] = None) -> dict:
        if not self.ai_client:
            key = get_key(provider)
            m = model or PROVIDERS[provider]["models"][0]
            self.ai_client = AIClient(provider, m, key)

        messages = self.memory.get_context(conv_id)
        messages.append({"role": "user", "content": message})
        system = f"You are Sentience, an advanced AI running locally. Workspace: {os.getcwd()}."

        resp = self.ai_client.chat(
            [{"role": "system", "content": system}] + messages[-20:],
            tools or [],
        )
        # ... persist + return
Enter fullscreen mode Exit fullscreen mode

The trick is AIClient — it's the only piece that has to be OpenAI-compatible, because every modern provider (Groq, Together, OpenRouter, Ollama, LM Studio) has converged on the chat completions schema. Anthropic needed a tiny shim, but Groq works out of the box.

The skills system: auto-loading SKILL.md

This is the part I'm proudest of. The directory scan is six lines:

def load_all_skills():
    global SKILLS
    SKILLS = {}
    if not SKILL_DIR.exists():
        return
    for item in SKILL_DIR.iterdir():
        if item.is_dir() and (item / "SKILL.md").exists():
            skill = load_skill(item.name, item / "SKILL.md")
            if skill:
                SKILLS[skill.name] = skill
Enter fullscreen mode Exit fullscreen mode

The interesting bit is the SKILL.md parser. It accepts the same frontmatter shape as the Agent Skills spec — name, description, triggers (comma-separated) — and looks for scripts/<name>.py to find a run() or execute() callable. That's the entire plugin API. There's no registration, no decorator, no manifest; drop a folder in skills/ and the next import picks it up.

The price: there's no versioning, no dependency declaration, no per-skill sandbox. If you want a skill to be hermetic, you have to do that yourself. For a single-user laptop, that's fine. For a multi-tenant platform, it's not.

The agent manager: parallel aiohttp over /zo/ask

I cheated here, and I'm fine with it. The original "spawn a parallel agent" primitive is itself a remote call to a model, and Zo's /zo/ask endpoint is open to anyone with a token. So:

async def spawn(self, agent_id: str, prompt: str, callback=None):
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.zo.computer/zo/ask",
            headers={"authorization": self.api_token, "content-type": "application/json"},
            json={"input": prompt, "model_name": "vercel:minimax/minimax-m2.7"},
        ) as resp:
            return {"agent_id": agent_id, "output": (await resp.json())["output"]}

async def spawn_all(self, agents: list) -> list:
    return await asyncio.gather(*[self.spawn(a["id"], a["prompt"]) for a in agents])
Enter fullscreen mode Exit fullscreen mode

spawn_all fires N concurrent requests, asyncio.gather waits for the slowest, and you get a list of outputs back. A ThreadPoolExecutor(max_workers=10) is the sync equivalent for callers that don't want to be async. In practice the bottleneck is the model, not the network — 10 parallel calls saturate the rate limiter long before they saturate asyncio.

The memory engine: TF-IDF as a placeholder

I'll be honest: this is the weakest subsystem. embed_tfidf hashes tokens into a 512-dim vector, cosine does the math, and recall() returns the top-k nodes whose embedding has the highest similarity. It works for short prompts and small corpora, but it is not semantic — database and sql don't cluster the way they would with a real embedding model.

The reason I shipped it anyway: a real embedding model (sentence-transformers, or a remote call) is one swap away, and the interfacememorize(content, meta) -> nid, recall(query, top_k) -> [{id, content, meta}] — doesn't change. When I get around to plugging in nomic-embed-text via Ollama, nothing in zo.py needs to move. The trick was defining the right shape first and being honest about which fields the placeholder is faking.

The scheduler: rrule in 30 lines

The rrule spec is a 50-page document. I needed three frequencies and a count. So:

def parse_rrule(rrule: str) -> dict:
    result = {"interval": 86400, "count": 0}  # default daily
    if "FREQ=DAILY" in rrule: result["interval"] = 86400
    elif "FREQ=HOURLY" in rrule: result["interval"] = 3600
    elif "FREQ=MINUTELY" in rrule: result["interval"] = 60
    if "COUNT=" in rrule:
        m = re.search(r"COUNT=(\d+)", rrule)
        if m: result["count"] = int(m.group(1))
    return result
Enter fullscreen mode Exit fullscreen mode

A daemon thread wakes once a minute, asks SQLite for WHERE enabled=1 AND next_run <= now, fires each one's handler, and bumps next_run by the interval. That's the entire automation system. It's missing timezones, exceptions, and DST handling, but for "run this every hour" it is correct and reliable.

The compute pool: priority FIFO over a Python dict

ComputePool keeps self.jobs and self.nodes as in-memory dicts protected by a threading.Lock. Heartbeats update last_heartbeat; dispatch sorts pending jobs by -priority and assigns the top one to the next polling node. No leader election, no Raft, no gossip protocol.

def assign_job(self, node_id: str) -> dict | None:
    with self.lock:
        pending = [j for j in self.jobs.values() if j["status"] == "pending"]
        if not pending: return None
        pending.sort(key=lambda x: -x["priority"])
        job = pending[0]
        job["status"] = "assigned"
        job["assigned_node"] = node_id
        if node_id in self.nodes:
            self.nodes[node_id]["status"] = "busy"
        return job
Enter fullscreen mode Exit fullscreen mode

This is a real footgun: in-process state means a process restart loses every pending job. For a real grid you'd want this in Postgres with row-level locks. But for "let me run a job on my second laptop", pip install is the whole onboarding.

What I cut, and why

Three things are not in the package and probably never will be:

  1. The hosted UI — the chat sidebar, the file tree, the agent picker. ZoClone is a library, not an app. Import zo and call zo.ask(...) from a Flask route, a Tk window, a Discord bot, a cron job.
  2. Multi-tenant auth — there's exactly one user. whoami() returns the local username. If you want a team plan, fork the repo.
  3. A real vector store — TF-IDF is a placeholder. The next iteration swaps it for Ollama's nomic-embed-text (private, free, runs on the same box) and the interface stays the same.

Try it

git clone https://github.com/AmSach/ZoClone
cd ZoClone && pip install aiohttp playwright
python -m playwright install chromium
python -c "from src.zo import zo; print(zo.ask('test-conv', 'hi'))"
Enter fullscreen mode Exit fullscreen mode

If you want a skill added, drop a folder in skills/ with a SKILL.md + scripts/foo.py and open a PR. I merge in 24 hours. If you find a real bug in one of the seven subsystems, open an issue with a minimal repro — there are only 775 lines to search.


Seven files, one Python process, no cloud dependency. The shape matters more than the scale.

Python #AI #OpenSource #BuildInPublic #PySide6 #LocalFirst

Top comments (1)

Collapse
 
mehmetcanfarsak profile image
Mehmet Can Farsak

Impressive architecture breakdown. One subsystem I've been missing from these kinds of agent platforms is a mode controller — most agents don't have a way to switch between thinking and doing. I built Brainstorm-Mode (mehmetcanfarsak/Brainstorm-Mode on GitHub) as a lightweight add-on that slots into the skills/hook pattern exactly like this. Three modes (divergent, actionable, academic) via PreToolUse hooks to prevent execution drift. Would be curious to see it wired into something like ZoClone's agent_manager.