Have you ever sat there staring at a tangle of wires, wondering if you just invented a robot… or a toaster? That was me last year. I’d bought a second-hand pressotherapy machine for a rehab project—don’t ask why—and thought, “How hard can it be to make my own control system?” Spoiler: harder than I expected. But if you’re the type who enjoys tinkering (or just wants to save a ton of money), you’re in for a ride.
The real challenge nobody tells you about
I thought the tough part would be the coding. Nope. It was understanding how the machine actually thinks. Every pressotherapy setup—whether you’re running it in your basement or at a high-end Pressotherapy Chicago IL wellness spa—has this logic baked in: sequence control, timing, and safety failsafes. Miss one tiny thing, and you’re suddenly inflating cuffs like a balloon animal. Not cool.
Quick recap: what even is pressotherapy?
Imagine a massage, but instead of hands, it’s air chambers gently squeezing your limbs in a programmed sequence. It’s used for lymphatic drainage, muscle recovery, and sometimes even post-surgery rehab. You’ll see it in wellness centers offering Chicago Pressotherapy sessions for athletes and anyone chasing better circulation. The machine is basically a compressor plus smart valves, all directed by—you guessed it—a control system.
The five core ideas before you code (my “oh duh” list)
- Pressure mapping – deciding how much air goes where and when.
- Sequence timing – not too fast, not too slow.
- Safety interrupts – your code needs “panic stops.”
- User interface – even if it’s just buttons, make them foolproof.
- Error logging – because things will go wrong.
How to build your control system without losing your mind
I started with a Raspberry Pi and Python because, well, it’s easy to find code examples. Here’s my rough process (messy notes included):
- Step 1: Map your valves and sensors. I literally taped labels on everything. When you’re testing, you’ll thank yourself.
- Step 2: Write a basic “inflate/deflate” loop. Keep it stupid-simple at first. I tested with just one chamber until I got the rhythm right.
- Step 3: Add sequence logic. Think: calf → thigh → hip. Test slowly, watch the pressure readings.
- Step 4: Build in a stop button. Seriously, this saved me when a cuff overinflated once.
- Step 5: Layer in presets. So your code can switch from “gentle” to “intense” with just a menu choice.
I’m not gonna lie—seeing it work for the first time felt better than my first coffee that morning.
Long complex Python control system example
Below is a fairly comprehensive Python example that demonstrates a control system design pattern for a pressotherapy machine. It's intentionally verbose and includes multiple components: hardware abstraction, asynchronous sequencing, PID-like pressure control, safety monitors, event logging, and a simple command-line UI. This is for educational/demo purposes only.
'''
pressotherapy_control.py
Complex example control system for an educational pressotherapy prototype.
NOTE: This code is a simulation/blueprint and shouldn't be used to control real medical equipment without proper validation, safety approval, and hardware-specific adaptations.
'''
import asyncio
import math
import random
import time
from collections import deque
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Dict, List, Optional, Callable, Any
# -----------------------
# Low-level hardware simulation / abstraction
# -----------------------
class ValveState(Enum):
CLOSED = 0
OPEN = 1
@dataclass
class Valve:
id: str
state: ValveState = ValveState.CLOSED
async def open(self):
# simulate physical delay
await asyncio.sleep(0.02)
self.state = ValveState.OPEN
async def close(self):
await asyncio.sleep(0.02)
self.state = ValveState.CLOSED
@dataclass
class Compressor:
pressure_pa: float = 0.0 # simulated pressure in Pascals
rate_pa_per_s: float = 2000.0 # how fast pressure increases when running
running: bool = False
async def start(self):
self.running = True
async def stop(self):
self.running = False
async def step(self, dt: float):
# simple physics-ish: pressure rises when running, decays otherwise
if self.running:
self.pressure_pa += self.rate_pa_per_s * dt
else:
# natural leak or release
self.pressure_pa -= self.rate_pa_per_s * 0.5 * dt
# clamp
self.pressure_pa = max(0.0, min(self.pressure_pa, 200000.0))
# -----------------------
# Sensors and pressure channels
# -----------------------
@dataclass
class PressureSensor:
id: str
# we simulate reading by sampling compressor pressure + local leakage/noise
leak_coefficient: float = 0.95
noise_std: float = 25.0
last_value: float = 0.0
def read(self, base_pressure: float, chamber_inflation: float) -> float:
# chamber_inflation is between 0..1 (fraction of full volume filled)
ideal = base_pressure * chamber_inflation * self.leak_coefficient
noise = random.gauss(0.0, self.noise_std)
value = max(0.0, ideal + noise)
self.last_value = value
return value
# -----------------------
# Safety and logging
# -----------------------
@dataclass
class EventLog:
buffer: deque = field(default_factory=lambda: deque(maxlen=2000))
def info(self, message: str):
ts = time.time()
entry = (ts, "INFO", message)
self.buffer.append(entry)
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))}] INFO: {message}")
def warn(self, message: str):
ts = time.time()
entry = (ts, "WARN", message)
self.buffer.append(entry)
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))}] WARN: {message}")
def error(self, message: str):
ts = time.time()
entry = (ts, "ERROR", message)
self.buffer.append(entry)
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))}] ERROR: {message}")
# -----------------------
# Control primitives (PID-ish controller)
# -----------------------
@dataclass
class PIDController:
kp: float
ki: float
kd: float
setpoint: float = 0.0
integrator: float = 0.0
last_error: Optional[float] = None
integrator_limits: tuple = (-1e6, 1e6)
def update(self, measurement: float, dt: float) -> float:
error = self.setpoint - measurement
self.integrator += error * dt
# anti-windup
self.integrator = max(self.integrator_limits[0], min(self.integrator, self.integrator_limits[1]))
deriv = 0.0
if self.last_error is not None:
deriv = (error - self.last_error) / dt if dt > 0 else 0.0
self.last_error = error
# linear combination
out = (self.kp * error) + (self.ki * self.integrator) + (self.kd * deriv)
return out
# -----------------------
# High-level machine model
# -----------------------
@dataclass
class Chamber:
id: str
valve: Valve
sensor: PressureSensor
inflation_fraction: float = 0.0 # 0..1
target_fraction: float = 0.0
max_fraction: float = 1.0
leak_per_s: float = 0.02 # passive leakage
async def step(self, dt: float, compressor_pressure: float):
# If valve is open and compressor has pressure, inflate towards max
if self.valve.state == ValveState.OPEN and compressor_pressure > 100.0:
# simplified inflation dynamics, nonlinear
delta = (compressor_pressure / 200000.0) * dt * 0.8
self.inflation_fraction += delta
else:
# leak
self.inflation_fraction -= self.leak_per_s * dt
# clamp & approach target
# softly approach target if below
if self.inflation_fraction < self.target_fraction:
self.inflation_fraction += min(0.05 * dt, self.target_fraction - self.inflation_fraction)
self.inflation_fraction = max(0.0, min(self.inflation_fraction, self.max_fraction))
# -----------------------
# Sequence and scheduler
# -----------------------
class SequenceStep:
def __init__(self, chambers: List[Chamber], target_fraction: float, duration_s: float, name: str = ""):
self.chambers = chambers
self.target_fraction = target_fraction
self.duration_s = duration_s
self.name = name or f"step-{id(self)}"
@dataclass
class SequenceProgram:
name: str
steps: List[SequenceStep]
repeat: int = 1
# -----------------------
# Main controller
# -----------------------
class PressotherapyController:
def __init__(self, chambers: List[Chamber], compressor: Compressor, logger: EventLog):
self.chambers = {c.id: c for c in chambers}
self.compressor = compressor
self.logger = logger
self.valves = {c.id: c.valve for c in chambers}
# safety params
self.max_pressure_pa = 120000.0
self.pressure_safety_timeout = 2.0 # seconds threshold for stuck high pressure
self._high_pressure_since = None
# controllers (per-chamber simple P controller for target fraction)
self.controllers: Dict[str, PIDController] = {}
# system state
self.running_program: Optional[SequenceProgram] = None
self._stop_requested = False
self._pause_requested = False
def attach_controller(self, chamber_id: str, controller: PIDController):
self.controllers[chamber_id] = controller
async def apply_presets(self, step: SequenceStep):
# set targets and open/close valves accordingly
self.logger.info(f"Applying step '{step.name}' target {step.target_fraction} for {step.duration_s}s")
for c in self.chambers.values():
# a simple mapping: included chambers get target, others 0
if c in step.chambers:
c.target_fraction = step.target_fraction
await c.valve.open()
else:
c.target_fraction = 0.0
await c.valve.close()
async def monitor_safety(self, dt: float):
# check compressor pressure vs max and track how long it's high
if self.compressor.pressure_pa > self.max_pressure_pa:
if self._high_pressure_since is None:
self._high_pressure_since = time.time()
elif time.time() - self._high_pressure_since > self.pressure_safety_timeout:
self.logger.error("Overpressure persistent: initiating emergency stop")
await self.emergency_stop()
else:
self._high_pressure_since = None
# monitor chamber sensors for anomalies (e.g., unexpected high reading)
for c in self.chambers.values():
val = c.sensor.last_value
if val > self.max_pressure_pa * 0.9:
self.logger.warn(f"Sensor {c.sensor.id} high reading: {val:.1f} pa")
async def emergency_stop(self):
self._stop_requested = True
await self.compressor.stop()
# close all valves
for v in self.valves.values():
await v.close()
self.logger.error("Emergency stop executed: compressor stopped and valves closed")
async def run_program(self, program: SequenceProgram):
self.running_program = program
self._stop_requested = False
self._pause_requested = False
# run repeat cycles
for cycle in range(program.repeat):
if self._stop_requested:
break
for step in program.steps:
if self._stop_requested:
break
# apply presets then run for duration
await self.apply_presets(step)
step_start = time.time()
elapsed = 0.0
while elapsed < step.duration_s:
if self._stop_requested:
break
if self._pause_requested:
await asyncio.sleep(0.1)
continue
await asyncio.sleep(0.05) # small tick
elapsed = time.time() - step_start
# end of step
# ensure safe stop at end
await self.compressor.stop()
for v in self.valves.values():
await v.close()
self.logger.info("Program finished or stopped cleanly")
self.running_program = None
async def control_loop(self, tick_s: float = 0.05):
last = time.time()
while True:
now = time.time()
dt = now - last
last = now
# update compressor physics
await self.compressor.step(dt)
# for each chamber, step dynamics and read sensors
for c in self.chambers.values():
await c.step(dt, self.compressor.pressure_pa)
val = c.sensor.read(self.compressor.pressure_pa, c.inflation_fraction)
# store measurement for controller
ctrl = self.controllers.get(c.id)
if ctrl:
# assume controller setpoint is fraction * scaling to "pressure" domain; keep simple
ctrl.setpoint = c.target_fraction * self.compressor.pressure_pa
control_signal = ctrl.update(val, dt)
# control_signal > 0 -> open compressor or valve
# we map this to compressor on/off heuristic:
if control_signal > 50.0:
await self.compressor.start()
elif control_signal < -50.0:
await self.compressor.stop()
# small random fault detection
if val > self.max_pressure_pa * 1.05:
self.logger.warn(f"Chamber {c.id} sensor spike: {val:.1f}")
await self.monitor_safety(dt)
await asyncio.sleep(tick_s)
# -----------------------
# CLI & demo program builder
# -----------------------
def build_demo_system(logger: EventLog):
# create valves, sensors, chambers
chambers = []
for i in range(4):
v = Valve(id=f"V{i}")
s = PressureSensor(id=f"S{i}")
ch = Chamber(id=f"C{i}", valve=v, sensor=s)
chambers.append(ch)
compressor = Compressor(pressure_pa=0.0, rate_pa_per_s=2500.0)
# attach controllers with different tuning to simulate realism
logger.info("Building demo system with 4 chambers")
controller = PressotherapyController(chambers=chambers, compressor=compressor, logger=logger)
# attach PID controllers with varied params
for c in chambers:
pid = PIDController(kp=0.8 + random.random()*0.4, ki=0.05, kd=0.01, integrator_limits=(-10000, 10000))
controller.attach_controller(c.id, pid)
return controller
async def demo_run():
logger = EventLog()
controller = build_demo_system(logger)
# compose a gentle program
program = SequenceProgram(name="gentle-flow", repeat=2, steps=[
SequenceStep(chambers=list(controller.chambers.values())[:1], target_fraction=0.4, duration_s=6.0, name="calf-1"),
SequenceStep(chambers=list(controller.chambers.values())[1:2], target_fraction=0.5, duration_s=6.0, name="calf-2"),
SequenceStep(chambers=list(controller.chambers.values())[2:3], target_fraction=0.6, duration_s=6.0, name="thigh"),
SequenceStep(chambers=list(controller.chambers.values())[3:4], target_fraction=0.5, duration_s=6.0, name="hip"),
SequenceStep(chambers=list(controller.chambers.values()), target_fraction=0.3, duration_s=8.0, name="full-relax"),
])
# run control loop concurrently
ctl_task = asyncio.create_task(controller.control_loop(tick_s=0.02))
# run program
prog_task = asyncio.create_task(controller.run_program(program))
# run for limited time or until program completes
try:
await prog_task
finally:
ctl_task.cancel()
try:
await ctl_task
except asyncio.CancelledError:
logger.info("Control loop cancelled")
logger.info("Demo run complete.")
# ============================
# If executed as script, run demo
# ============================
if __name__ == "__main__":
try:
asyncio.run(demo_run())
except KeyboardInterrupt:
print("Interrupted by user - exiting.")
Why this code is "complex" (short note)
This sample mixes asynchronous loops, simulated physics, PID controllers, safety checks, and a scheduled sequence runner. It should give you a solid starting point to adapt to real hardware libraries (GPIO, I2C, ADC) on a Raspberry Pi or microcontroller.
Benefits (casual bullets)
- You'll actually understand how your machine works—no mystery.
- You can tweak it for special needs, like sports recovery.
- You’ll save big compared to buying a brand-new setup.
- You’ll be able to fix it if it breaks (and it will).
- It's fun. Actually. I know by Chicago in Pressotherapy
Give it a try this week—you’ll see! [sic]
Top comments (0)