DEV Community

Cover image for Part IV — The Builder’s Stack
Peace Thabiwa
Peace Thabiwa

Posted on

Part IV — The Builder’s Stack

I’ll show:

  1. Hardware + local dev stack
  2. Cloud layout + Python infra scripts (VPC → EC2 → S3 → SSM)
  3. Server maintenance automations (health, patches, logs)
  4. Network setup (boto3)
  5. Pattern generator (Phase Sync Maker)
  6. Leverage counter (BINFLOW time events)
  7. A minimal dashboard (Streamlit)
  8. Multi-platform posting bots (Dev.to, Hashnode, Medium)
  9. A daemonic scheduler to run it all (APScheduler)

Everything stays Python-first. Swap credentials/placeholders with your own.


🌐 Part IV — The Builder’s Stack

Python as the Control Plane for Temporal DevOps, Patterns, and Proof of Leverage

by Peace Thabiwa — Botswana 🇧🇼
Founder SAGEWORKS_AI | Creator BINFLOW


0) One-person hardware kit (realistic)

  • Laptop: 16–32GB RAM, 8+ cores (Mac/Linux)
  • Local services: Docker Desktop / Podman
  • Python: 3.11+ (venv/uv/poetry)
  • GPU (optional): 8–16GB VRAM (for local models)
  • Storage: 1TB NVMe (fast local vector stores, logs)

Philosophy: laptop is thinking node, cloud is execution field.


1) Repository layout (monorepo, Python at the center)

flow/
  binflow/                     # core time+phase library
    __init__.py
    events.py
    leverage.py
  psm/                         # Phase Sync Maker (pattern generator)
    __init__.py
    cli.py
    templates/
      rest-service/
        python/
          app.py.j2
          tests_test_app.py.j2
  cloud/                       # infra & ops (all Python)
    vpc.py
    ec2.py
    s3.py
    ssm.py
    health.py
    logs.py
  bots/                        # posting automations
    devto.py
    hashnode.py
    medium.py
  ui/                          # simple dashboard
    dashboard.py               # Streamlit
  scheduler/
    run.py                     # APScheduler entrypoint
  .env.example
  requirements.txt
  README.md
Enter fullscreen mode Exit fullscreen mode

2) BINFLOW: time-labeled events + leverage

binflow/events.py

from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
import sqlite3
from pathlib import Path
import json

DB = Path(".binflow.db")

class Phase(str, Enum):
    FOCUS = "Focus"
    LOOP = "Loop"
    TRANSITION = "Transition"
    PAUSE = "Pause"
    EMERGENCE = "Emergence"

@dataclass
class FlowEvent:
    pattern: str
    action: str       # "create" | "reuse" | "modify" | "publish"
    phase: Phase
    payload: dict

def _conn():
    create = not DB.exists()
    con = sqlite3.connect(DB)
    if create:
        con.execute("""CREATE TABLE events(
            id INTEGER PRIMARY KEY,
            ts TEXT NOT NULL,
            pattern TEXT NOT NULL,
            action TEXT NOT NULL,
            phase TEXT NOT NULL,
            payload TEXT NOT NULL
        );""")
    return con

def record_event(e: FlowEvent):
    ts = datetime.now(timezone.utc).isoformat()
    con = _conn()
    con.execute(
        "INSERT INTO events(ts,pattern,action,phase,payload) VALUES (?,?,?,?,?)",
        (ts, e.pattern, e.action, e.phase.value, json.dumps(e.payload))
    )
    con.commit()
    con.close()
Enter fullscreen mode Exit fullscreen mode

binflow/leverage.py

import sqlite3
from pathlib import Path
from collections import defaultdict

DB = Path(".binflow.db")

PHASE_VALUE = {
    "Focus": 1.0,
    "Loop": 1.2,
    "Transition": 1.4,
    "Pause": 0.6,
    "Emergence": 1.8,   # inventions get a higher weight
}

TRUST_START = 0.8  # naive baseline; could be learned from tests/uptime

def leverage_index(pattern: str) -> float:
    if not DB.exists(): return 0.0
    con = sqlite3.connect(DB)
    cur = con.cursor()
    cur.execute("SELECT action, phase FROM events WHERE pattern=?",(pattern,))
    rows = cur.fetchall()
    con.close()

    counts = defaultdict(int)
    score = 0.0
    for action, phase in rows:
        counts[(action, phase)] += 1
        score += 1.0 * PHASE_VALUE.get(phase, 1.0)

    # simple trust heuristic: more modifies passing CI -> higher trust
    # (you can wire this to your CI status table later)
    trust_factor = min(1.5, TRUST_START + 0.02 * counts.get(("modify","Loop"), 0))
    return round(score * trust_factor, 3)
Enter fullscreen mode Exit fullscreen mode

3) Phase Sync Maker (PSM): generate patterns (Python service)

psm/cli.py

import json, os
from datetime import datetime, timezone
from pathlib import Path
import click
from jinja2 import Environment, FileSystemLoader
from binflow.events import record_event, FlowEvent, Phase

ROOT = Path(__file__).resolve().parents[1]
TPL = ROOT / "psm" / "templates"

def render(pattern: str, stack: str, ctx: dict):
    env = Environment(loader=FileSystemLoader(TPL / pattern / stack))
    outdir = ROOT / "services" / f"{ctx['name']}-{stack}"
    outdir.mkdir(parents=True, exist_ok=True)
    for tname in env.list_templates():
        if not tname.endswith(".j2"): continue
        template = env.get_template(tname)
        content = template.render(**ctx, binflow=dict(
            phase=ctx["phase"], ts=datetime.now(timezone.utc).isoformat()))
        dest = outdir / tname[:-3]
        dest.parent.mkdir(parents=True, exist_ok=True)
        dest.write_text(content)

@click.group()
def cli(): ...

@cli.command("gen")
@click.option("--pattern", required=True, type=str)
@click.option("--name", required=True, type=str)
@click.option("--stack", default="python", type=click.Choice(["python"]))
@click.option("--phase", default="Focus", type=click.Choice([p.value for p in Phase]))
def gen(pattern, name, stack, phase):
    ctx = dict(name=name, phase=phase)
    render(pattern, stack, ctx)
    record_event(FlowEvent(
        pattern=f"{pattern}:{name}", action="create", phase=Phase(phase), payload=ctx
    ))
    click.echo(f"Generated {pattern}/{name} in phase={phase}")

if __name__ == "__main__":
    cli()
Enter fullscreen mode Exit fullscreen mode

psm/templates/rest-service/python/app.py.j2

# @binflow phase={{ binflow.phase }} ts={{ binflow.ts }}
from fastapi import FastAPI
app = FastAPI(title="{{ name }}")

@app.get("/ping")
def ping():
    return {"ok": True, "phase": "{{ binflow.phase }}"}
Enter fullscreen mode Exit fullscreen mode

4) Cloud: VPC → EC2 → S3 → SSM with Python (boto3)

Create a minimal network + instance fleet controlled entirely by Python.

cloud/vpc.py

import boto3, os
ec2 = boto3.client("ec2", region_name=os.getenv("AWS_REGION","us-east-1"))

def ensure_vpc(name="flow-vpc"):
    vpcs = ec2.describe_vpcs(Filters=[{"Name":"tag:Name","Values":[name]}])["Vpcs"]
    if vpcs: return vpcs[0]["VpcId"]
    v = ec2.create_vpc(CidrBlock="10.42.0.0/16")["Vpc"]
    ec2.create_tags(Resources=[v["VpcId"]], Tags=[{"Key":"Name","Value":name}])
    ec2.modify_vpc_attribute(VpcId=v["VpcId"], EnableDnsSupport={"Value":True})
    ec2.modify_vpc_attribute(VpcId=v["VpcId"], EnableDnsHostnames={"Value":True})
    return v["VpcId"]
Enter fullscreen mode Exit fullscreen mode

cloud/ec2.py

import boto3, os
ec2 = boto3.resource("ec2", region_name=os.getenv("AWS_REGION","us-east-1"))

def launch_runner(vpc_id: str, name="flow-runner"):
    sg = list(ec2.security_groups.filter(Filters=[
        {"Name":"group-name","Values":[f"{name}-sg"]}
    ]))
    if not sg:
        sg = ec2.create_security_group(
            GroupName=f"{name}-sg", Description="Flow runner", VpcId=vpc_id
        )
        sg.authorize_ingress(IpPermissions=[{
            "IpProtocol":"tcp","FromPort":22,"ToPort":22,
            "IpRanges":[{"CidrIp":"0.0.0.0/0"}]
        }])
    else:
        sg = sg[0]

    instances = list(ec2.instances.filter(Filters=[
        {"Name":"tag:Name","Values":[name]}, {"Name":"instance-state-name","Values":["running","pending"]}
    ]))
    if instances: return instances[0].id

    i = ec2.create_instances(
        ImageId=os.getenv("AMI","ami-0c94855ba95c71c99"),  # update to your region
        MinCount=1, MaxCount=1, InstanceType="t3.medium",
        SecurityGroupIds=[sg.id], TagSpecifications=[{
            "ResourceType":"instance","Tags":[{"Key":"Name","Value":name}]
        }]
    )[0]
    return i.id
Enter fullscreen mode Exit fullscreen mode

cloud/s3.py

import boto3, os
s3 = boto3.client("s3", region_name=os.getenv("AWS_REGION","us-east-1"))

def ensure_bucket(name="flow-artifacts-xyz"):
    buckets = [b["Name"] for b in s3.list_buckets()["Buckets"]]
    if name in buckets: return name
    s3.create_bucket(Bucket=name)
    return name
Enter fullscreen mode Exit fullscreen mode

cloud/ssm.py

import boto3, os, time
ssm = boto3.client("ssm", region_name=os.getenv("AWS_REGION","us-east-1"))

def run_patch(instance_id: str):
    resp = ssm.send_command(
        InstanceIds=[instance_id],
        DocumentName="AWS-RunPatchBaseline",
        Parameters={"Operation":["Install"]}
    )
    cid = resp["Command"]["CommandId"]
    time.sleep(5)
    return cid
Enter fullscreen mode Exit fullscreen mode

5) Ops automations: health, logs, patch windows

cloud/health.py

import requests, time

def check_http(url: str, retries=3, backoff=2):
    for i in range(retries):
        try:
            r = requests.get(url, timeout=3)
            return {"status": r.status_code, "ok": r.ok}
        except requests.RequestException:
            time.sleep(backoff * (i+1))
    return {"status": 0, "ok": False}
Enter fullscreen mode Exit fullscreen mode

cloud/logs.py

import gzip, shutil
from pathlib import Path
from datetime import datetime

def rotate(path="logs/app.log"):
    p = Path(path)
    if not p.exists(): return
    ts = datetime.now().strftime("%Y%m%d_%H%M")
    out = Path("logs/archive") / f"{p.stem}-{ts}.log.gz"
    out.parent.mkdir(parents=True, exist_ok=True)
    with open(p, "rb") as f_in, gzip.open(out, "wb") as f_out:
        shutil.copyfileobj(f_in, f_out)
    p.write_text("")  # truncate
Enter fullscreen mode Exit fullscreen mode

6) UI: a tiny Streamlit dashboard (flows + leverage)

ui/dashboard.py

import streamlit as st
import sqlite3
from binflow.leverage import leverage_index

st.set_page_config(page_title="BINFLOW Dashboard", layout="wide")
st.title("🌀 BINFLOW — Flows & Leverage")

con = sqlite3.connect(".binflow.db")
rows = con.execute("SELECT ts, pattern, action, phase FROM events ORDER BY ts DESC LIMIT 200").fetchall()
con.close()

patterns = sorted({r[1] for r in rows})
sel = st.selectbox("Pattern", patterns) if patterns else None

col1, col2 = st.columns(2)
with col1:
    st.subheader("Recent Events")
    st.table([{"ts":r[0], "pattern":r[1], "action":r[2], "phase":r[3]} for r in rows])

with col2:
    if sel:
        st.subheader("Leverage")
        st.metric(label=f"{sel}", value=leverage_index(sel))
        st.caption("Leverage = usage_count × phase_value × trust_factor")
Enter fullscreen mode Exit fullscreen mode

Run:

streamlit run ui/dashboard.py
Enter fullscreen mode Exit fullscreen mode

7) Cross-posting bots (Dev.to, Hashnode, Medium)

Note: keep API keys in environment variables; handle rate limits.

bots/devto.py

import os, requests
API = "https://dev.to/api/articles"

def post_devto(title: str, markdown_body: str, tags=None, published=False):
    headers = {"api-key": os.getenv("DEVTO_API_KEY")}
    payload = {
        "article": {
            "title": title,
            "published": published,
            "body_markdown": markdown_body,
            "tags": tags or ["binflow","python","web4"]
        }
    }
    r = requests.post(API, json=payload, headers=headers, timeout=15)
    r.raise_for_status()
    return r.json()
Enter fullscreen mode Exit fullscreen mode

bots/hashnode.py (GraphQL)

import os, requests

def post_hashnode(title: str, markdown_body: str, publication_id: str):
    url = "https://gql.hashnode.com"
    headers = {"Content-Type":"application/json","Authorization": os.getenv("HASHNODE_TOKEN")}
    query = """
    mutation($input: PublishPostInput!){
      publishPost(input: $input){ post{ id, slug, title } }
    }"""
    variables = {"input":{
        "title": title,
        "contentMarkdown": markdown_body,
        "publicationId": publication_id
    }}
    r = requests.post(url, json={"query":query, "variables":variables}, headers=headers, timeout=20)
    r.raise_for_status()
    return r.json()
Enter fullscreen mode Exit fullscreen mode

bots/medium.py

import os, requests

def post_medium(user_id: str, title: str, content_markdown: str, tags=None, publish_status="draft"):
    headers = {
        "Authorization": f"Bearer {os.getenv('MEDIUM_TOKEN')}",
        "Content-Type": "application/json"
    }
    url = f"https://api.medium.com/v1/users/{user_id}/posts"
    payload = {
        "title": title,
        "contentFormat": "markdown",
        "content": content_markdown,
        "publishStatus": publish_status,
        "tags": tags or ["BINFLOW","TemporalWeb","Python"]
    }
    r = requests.post(url, json=payload, headers=headers, timeout=20)
    r.raise_for_status()
    return r.json()
Enter fullscreen mode Exit fullscreen mode

When a post is published, record a BINFLOW event to increase leverage for the pattern being discussed:

from binflow.events import record_event, FlowEvent, Phase

def record_publish(pattern: str, platform: str, url: str):
    record_event(FlowEvent(
        pattern=pattern,
        action="publish",
        phase=Phase.EMERGENCE,     # publication = emergence by default
        payload={"platform": platform, "url": url}
    ))
Enter fullscreen mode Exit fullscreen mode

8) Scheduler: wire everything together (APScheduler)

scheduler/run.py

import os, logging
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
from cloud.logs import rotate
from cloud.health import check_http
from cloud.ssm import run_patch
from binflow.events import record_event, FlowEvent, Phase
from binflow.leverage import leverage_index
from bots.devto import post_devto

logging.basicConfig(level=logging.INFO)

def job_healthcheck():
    r = check_http(os.getenv("HEALTH_URL","http://localhost:8000/ping"))
    record_event(FlowEvent(pattern="rest-service:analytics-api",
                           action="healthcheck",
                           phase=Phase.LOOP,
                           payload=r))
    logging.info(f"Health: {r}")

def job_rotate_logs():
    rotate("logs/app.log")
    record_event(FlowEvent(pattern="ops:logs",
                           action="rotate",
                           phase=Phase.TRANSITION,
                           payload={"path":"logs/app.log"}))
    logging.info("Log rotation done")

def job_patch_instance():
    iid = os.getenv("INSTANCE_ID")
    if not iid: return
    cid = run_patch(iid)
    record_event(FlowEvent(pattern="ops:ssm",
                           action="patch",
                           phase=Phase.TRANSITION,
                           payload={"command_id": cid}))
    logging.info(f"Patch run {cid}")

def job_share_devto():
    # example: publish leverage report
    pat = "rest-service:analytics-api"
    li = leverage_index(pat)
    body = f"# Leverage Report\n\nPattern: **{pat}**\n\nLeverage: **{li}**\n\n*Generated {datetime.utcnow().isoformat()}Z*"
    try:
        res = post_devto(title="BINFLOW Leverage Report", markdown_body=body, published=False)
        record_event(FlowEvent(pattern=pat, action="publish", phase=Phase.EMERGENCE, payload={"platform":"dev.to","id":res.get("id")}))
    except Exception as e:
        logging.exception(e)

if __name__ == "__main__":
    s = BackgroundScheduler(timezone="UTC")
    s.add_job(job_healthcheck, "interval", minutes=5)
    s.add_job(job_rotate_logs, "cron", minute=0)           # hourly
    s.add_job(job_patch_instance, "cron", hour=3, minute=0) # nightly
    s.add_job(job_share_devto, "cron", hour=9, minute=30)   # daily draft
    s.start()
    logging.info("Scheduler started. Press Ctrl+C to exit.")
    try:
        import time
        while True: time.sleep(3600)
    except KeyboardInterrupt:
        s.shutdown()
Enter fullscreen mode Exit fullscreen mode

9) First-run script (local demo)

# create venv + deps
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt

# generate a pattern (Python REST)
python -m psm.cli gen --pattern rest-service --name analytics-api --stack python --phase Focus

# run the dashboard
streamlit run ui/dashboard.py

# start scheduler
python scheduler/run.py
Enter fullscreen mode Exit fullscreen mode

requirements.txt

boto3
requests
streamlit
jinja2
click
apscheduler
Enter fullscreen mode Exit fullscreen mode

10) How this feels in real life (flow)

  • You generate a pattern in Focus → event recorded
  • You deploy via Python infra scripts → Transition events
  • Your service runs Loop checks (health, logs) on schedule
  • You write a post; bots publish & record Emergence
  • The dashboard shows Leverage rising as others reuse your pattern or you post more artifacts
  • All of this is Python driving the machine

Why this matters

We’re not just automating DevOps — we’re writing time into the stack.
Every script, deployment, post, and reuse becomes proof-of-leverage for your work.

Your influence is the function of your patterns in motion.


Join me (Botswana → the world)

I’m Peace Thabiwa. I have a working base, the architecture, and a mountain of ideas.
I need Python pros, cloud engineers, UI designers, and Web3 builders who feel this rhythm and want to turn it into reality.

No funding yet — just proof.
We’ll ship public demos, track leverage openly, and make the case with working software.

If that resonates, you’re already part of the flow.

Top comments (0)