DEV Community

Cover image for How to program a control system for a Python Press therapy machine
Mary Lu Camargo
Mary Lu Camargo

Posted on

How to program a control system for a Python Press therapy machine

Have you ever sat there staring at a tangle of wires, wondering if you just invented a robot… or a toaster? That was me last year. I’d bought a second-hand pressotherapy machine for a rehab project—don’t ask why—and thought, “How hard can it be to make my own control system?” Spoiler: harder than I expected. But if you’re the type who enjoys tinkering (or just wants to save a ton of money), you’re in for a ride.


The real challenge nobody tells you about

I thought the tough part would be the coding. Nope. It was understanding how the machine actually thinks. Every pressotherapy setup—whether you’re running it in your basement or at a high-end Pressotherapy Chicago IL wellness spa—has this logic baked in: sequence control, timing, and safety failsafes. Miss one tiny thing, and you’re suddenly inflating cuffs like a balloon animal. Not cool.


Quick recap: what even is pressotherapy?

Imagine a massage, but instead of hands, it’s air chambers gently squeezing your limbs in a programmed sequence. It’s used for lymphatic drainage, muscle recovery, and sometimes even post-surgery rehab. You’ll see it in wellness centers offering Chicago Pressotherapy sessions for athletes and anyone chasing better circulation. The machine is basically a compressor plus smart valves, all directed by—you guessed it—a control system.


The five core ideas before you code (my “oh duh” list)

  1. Pressure mapping – deciding how much air goes where and when.
  2. Sequence timing – not too fast, not too slow.
  3. Safety interrupts – your code needs “panic stops.”
  4. User interface – even if it’s just buttons, make them foolproof.
  5. Error logging – because things will go wrong.

How to build your control system without losing your mind

I started with a Raspberry Pi and Python because, well, it’s easy to find code examples. Here’s my rough process (messy notes included):

  • Step 1: Map your valves and sensors. I literally taped labels on everything. When you’re testing, you’ll thank yourself.
  • Step 2: Write a basic “inflate/deflate” loop. Keep it stupid-simple at first. I tested with just one chamber until I got the rhythm right.
  • Step 3: Add sequence logic. Think: calf → thigh → hip. Test slowly, watch the pressure readings.
  • Step 4: Build in a stop button. Seriously, this saved me when a cuff overinflated once.
  • Step 5: Layer in presets. So your code can switch from “gentle” to “intense” with just a menu choice.

I’m not gonna lie—seeing it work for the first time felt better than my first coffee that morning.


Long complex Python control system example

Below is a fairly comprehensive Python example that demonstrates a control system design pattern for a pressotherapy machine. It's intentionally verbose and includes multiple components: hardware abstraction, asynchronous sequencing, PID-like pressure control, safety monitors, event logging, and a simple command-line UI. This is for educational/demo purposes only.

'''
pressotherapy_control.py
Complex example control system for an educational pressotherapy prototype.
NOTE: This code is a simulation/blueprint and shouldn't be used to control real medical equipment without proper validation, safety approval, and hardware-specific adaptations.
'''
import asyncio
import math
import random
import time
from collections import deque
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Dict, List, Optional, Callable, Any

# -----------------------
# Low-level hardware simulation / abstraction
# -----------------------

class ValveState(Enum):
    CLOSED = 0
    OPEN = 1

@dataclass
class Valve:
    id: str
    state: ValveState = ValveState.CLOSED

    async def open(self):
        # simulate physical delay
        await asyncio.sleep(0.02)
        self.state = ValveState.OPEN

    async def close(self):
        await asyncio.sleep(0.02)
        self.state = ValveState.CLOSED

@dataclass
class Compressor:
    pressure_pa: float = 0.0  # simulated pressure in Pascals
    rate_pa_per_s: float = 2000.0  # how fast pressure increases when running
    running: bool = False

    async def start(self):
        self.running = True

    async def stop(self):
        self.running = False

    async def step(self, dt: float):
        # simple physics-ish: pressure rises when running, decays otherwise
        if self.running:
            self.pressure_pa += self.rate_pa_per_s * dt
        else:
            # natural leak or release
            self.pressure_pa -= self.rate_pa_per_s * 0.5 * dt
        # clamp
        self.pressure_pa = max(0.0, min(self.pressure_pa, 200000.0))

# -----------------------
# Sensors and pressure channels
# -----------------------

@dataclass
class PressureSensor:
    id: str
    # we simulate reading by sampling compressor pressure + local leakage/noise
    leak_coefficient: float = 0.95
    noise_std: float = 25.0
    last_value: float = 0.0

    def read(self, base_pressure: float, chamber_inflation: float) -> float:
        # chamber_inflation is between 0..1 (fraction of full volume filled)
        ideal = base_pressure * chamber_inflation * self.leak_coefficient
        noise = random.gauss(0.0, self.noise_std)
        value = max(0.0, ideal + noise)
        self.last_value = value
        return value

# -----------------------
# Safety and logging
# -----------------------

@dataclass
class EventLog:
    buffer: deque = field(default_factory=lambda: deque(maxlen=2000))

    def info(self, message: str):
        ts = time.time()
        entry = (ts, "INFO", message)
        self.buffer.append(entry)
        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))}] INFO: {message}")

    def warn(self, message: str):
        ts = time.time()
        entry = (ts, "WARN", message)
        self.buffer.append(entry)
        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))}] WARN: {message}")

    def error(self, message: str):
        ts = time.time()
        entry = (ts, "ERROR", message)
        self.buffer.append(entry)
        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))}] ERROR: {message}")

# -----------------------
# Control primitives (PID-ish controller)
# -----------------------

@dataclass
class PIDController:
    kp: float
    ki: float
    kd: float
    setpoint: float = 0.0
    integrator: float = 0.0
    last_error: Optional[float] = None
    integrator_limits: tuple = (-1e6, 1e6)

    def update(self, measurement: float, dt: float) -> float:
        error = self.setpoint - measurement
        self.integrator += error * dt
        # anti-windup
        self.integrator = max(self.integrator_limits[0], min(self.integrator, self.integrator_limits[1]))

        deriv = 0.0
        if self.last_error is not None:
            deriv = (error - self.last_error) / dt if dt > 0 else 0.0

        self.last_error = error
        # linear combination
        out = (self.kp * error) + (self.ki * self.integrator) + (self.kd * deriv)
        return out

# -----------------------
# High-level machine model
# -----------------------

@dataclass
class Chamber:
    id: str
    valve: Valve
    sensor: PressureSensor
    inflation_fraction: float = 0.0  # 0..1
    target_fraction: float = 0.0
    max_fraction: float = 1.0
    leak_per_s: float = 0.02  # passive leakage

    async def step(self, dt: float, compressor_pressure: float):
        # If valve is open and compressor has pressure, inflate towards max
        if self.valve.state == ValveState.OPEN and compressor_pressure > 100.0:
            # simplified inflation dynamics, nonlinear
            delta = (compressor_pressure / 200000.0) * dt * 0.8
            self.inflation_fraction += delta
        else:
            # leak
            self.inflation_fraction -= self.leak_per_s * dt

        # clamp & approach target
        # softly approach target if below
        if self.inflation_fraction < self.target_fraction:
            self.inflation_fraction += min(0.05 * dt, self.target_fraction - self.inflation_fraction)

        self.inflation_fraction = max(0.0, min(self.inflation_fraction, self.max_fraction))

# -----------------------
# Sequence and scheduler
# -----------------------

class SequenceStep:
    def __init__(self, chambers: List[Chamber], target_fraction: float, duration_s: float, name: str = ""):
        self.chambers = chambers
        self.target_fraction = target_fraction
        self.duration_s = duration_s
        self.name = name or f"step-{id(self)}"

@dataclass
class SequenceProgram:
    name: str
    steps: List[SequenceStep]
    repeat: int = 1

# -----------------------
# Main controller
# -----------------------

class PressotherapyController:
    def __init__(self, chambers: List[Chamber], compressor: Compressor, logger: EventLog):
        self.chambers = {c.id: c for c in chambers}
        self.compressor = compressor
        self.logger = logger
        self.valves = {c.id: c.valve for c in chambers}
        # safety params
        self.max_pressure_pa = 120000.0
        self.pressure_safety_timeout = 2.0  # seconds threshold for stuck high pressure
        self._high_pressure_since = None
        # controllers (per-chamber simple P controller for target fraction)
        self.controllers: Dict[str, PIDController] = {}
        # system state
        self.running_program: Optional[SequenceProgram] = None
        self._stop_requested = False
        self._pause_requested = False

    def attach_controller(self, chamber_id: str, controller: PIDController):
        self.controllers[chamber_id] = controller

    async def apply_presets(self, step: SequenceStep):
        # set targets and open/close valves accordingly
        self.logger.info(f"Applying step '{step.name}' target {step.target_fraction} for {step.duration_s}s")
        for c in self.chambers.values():
            # a simple mapping: included chambers get target, others 0
            if c in step.chambers:
                c.target_fraction = step.target_fraction
                await c.valve.open()
            else:
                c.target_fraction = 0.0
                await c.valve.close()

    async def monitor_safety(self, dt: float):
        # check compressor pressure vs max and track how long it's high
        if self.compressor.pressure_pa > self.max_pressure_pa:
            if self._high_pressure_since is None:
                self._high_pressure_since = time.time()
            elif time.time() - self._high_pressure_since > self.pressure_safety_timeout:
                self.logger.error("Overpressure persistent: initiating emergency stop")
                await self.emergency_stop()
        else:
            self._high_pressure_since = None

        # monitor chamber sensors for anomalies (e.g., unexpected high reading)
        for c in self.chambers.values():
            val = c.sensor.last_value
            if val > self.max_pressure_pa * 0.9:
                self.logger.warn(f"Sensor {c.sensor.id} high reading: {val:.1f} pa")

    async def emergency_stop(self):
        self._stop_requested = True
        await self.compressor.stop()
        # close all valves
        for v in self.valves.values():
            await v.close()
        self.logger.error("Emergency stop executed: compressor stopped and valves closed")

    async def run_program(self, program: SequenceProgram):
        self.running_program = program
        self._stop_requested = False
        self._pause_requested = False

        # run repeat cycles
        for cycle in range(program.repeat):
            if self._stop_requested:
                break
            for step in program.steps:
                if self._stop_requested:
                    break
                # apply presets then run for duration
                await self.apply_presets(step)
                step_start = time.time()
                elapsed = 0.0
                while elapsed < step.duration_s:
                    if self._stop_requested:
                        break
                    if self._pause_requested:
                        await asyncio.sleep(0.1)
                        continue
                    await asyncio.sleep(0.05)  # small tick
                    elapsed = time.time() - step_start
                # end of step
        # ensure safe stop at end
        await self.compressor.stop()
        for v in self.valves.values():
            await v.close()
        self.logger.info("Program finished or stopped cleanly")
        self.running_program = None

    async def control_loop(self, tick_s: float = 0.05):
        last = time.time()
        while True:
            now = time.time()
            dt = now - last
            last = now
            # update compressor physics
            await self.compressor.step(dt)
            # for each chamber, step dynamics and read sensors
            for c in self.chambers.values():
                await c.step(dt, self.compressor.pressure_pa)
                val = c.sensor.read(self.compressor.pressure_pa, c.inflation_fraction)
                # store measurement for controller
                ctrl = self.controllers.get(c.id)
                if ctrl:
                    # assume controller setpoint is fraction * scaling to "pressure" domain; keep simple
                    ctrl.setpoint = c.target_fraction * self.compressor.pressure_pa
                    control_signal = ctrl.update(val, dt)
                    # control_signal > 0 -> open compressor or valve
                    # we map this to compressor on/off heuristic:
                    if control_signal > 50.0:
                        await self.compressor.start()
                    elif control_signal < -50.0:
                        await self.compressor.stop()
                # small random fault detection
                if val > self.max_pressure_pa * 1.05:
                    self.logger.warn(f"Chamber {c.id} sensor spike: {val:.1f}")
            await self.monitor_safety(dt)
            await asyncio.sleep(tick_s)

# -----------------------
# CLI & demo program builder
# -----------------------

def build_demo_system(logger: EventLog):
    # create valves, sensors, chambers
    chambers = []
    for i in range(4):
        v = Valve(id=f"V{i}")
        s = PressureSensor(id=f"S{i}")
        ch = Chamber(id=f"C{i}", valve=v, sensor=s)
        chambers.append(ch)

    compressor = Compressor(pressure_pa=0.0, rate_pa_per_s=2500.0)
    # attach controllers with different tuning to simulate realism
    logger.info("Building demo system with 4 chambers")
    controller = PressotherapyController(chambers=chambers, compressor=compressor, logger=logger)
    # attach PID controllers with varied params
    for c in chambers:
        pid = PIDController(kp=0.8 + random.random()*0.4, ki=0.05, kd=0.01, integrator_limits=(-10000, 10000))
        controller.attach_controller(c.id, pid)
    return controller

async def demo_run():
    logger = EventLog()
    controller = build_demo_system(logger)
    # compose a gentle program
    program = SequenceProgram(name="gentle-flow", repeat=2, steps=[
        SequenceStep(chambers=list(controller.chambers.values())[:1], target_fraction=0.4, duration_s=6.0, name="calf-1"),
        SequenceStep(chambers=list(controller.chambers.values())[1:2], target_fraction=0.5, duration_s=6.0, name="calf-2"),
        SequenceStep(chambers=list(controller.chambers.values())[2:3], target_fraction=0.6, duration_s=6.0, name="thigh"),
        SequenceStep(chambers=list(controller.chambers.values())[3:4], target_fraction=0.5, duration_s=6.0, name="hip"),
        SequenceStep(chambers=list(controller.chambers.values()), target_fraction=0.3, duration_s=8.0, name="full-relax"),
    ])

    # run control loop concurrently
    ctl_task = asyncio.create_task(controller.control_loop(tick_s=0.02))
    # run program
    prog_task = asyncio.create_task(controller.run_program(program))
    # run for limited time or until program completes
    try:
        await prog_task
    finally:
        ctl_task.cancel()
        try:
            await ctl_task
        except asyncio.CancelledError:
            logger.info("Control loop cancelled")
    logger.info("Demo run complete.")

# ============================
# If executed as script, run demo
# ============================
if __name__ == "__main__":
    try:
        asyncio.run(demo_run())
    except KeyboardInterrupt:
        print("Interrupted by user - exiting.")
Enter fullscreen mode Exit fullscreen mode

Why this code is "complex" (short note)

This sample mixes asynchronous loops, simulated physics, PID controllers, safety checks, and a scheduled sequence runner. It should give you a solid starting point to adapt to real hardware libraries (GPIO, I2C, ADC) on a Raspberry Pi or microcontroller.


Benefits (casual bullets)

  • You'll actually understand how your machine works—no mystery.
  • You can tweak it for special needs, like sports recovery.
  • You’ll save big compared to buying a brand-new setup.
  • You’ll be able to fix it if it breaks (and it will).
  • It's fun. Actually. I know by Chicago in Pressotherapy

Give it a try this week—you’ll see! [sic]

Top comments (0)