DEV Community

Ertugrul
Ertugrul

Posted on

Part 2 — Training model on Raspberry Pi Pico for Edge AI Trend Alarm

This part turns the dataset from Part 1 into a tiny edge‑ready model. This corrected version ensures consistency with the actual codebase (train.py, main_infer.cpp) and highlights the math, engineering trade‑offs, and validation you’ll use on hardware.

Pipeline overview

  1. compute a slope (°C/s) feature with a causal rolling OLS window,
  2. train a Logistic Regression classifier (idle vs load),
  3. export both JSON (sanity) and C++ header (firmware),
  4. parity‑check Python vs firmware numerics.

Corrections vs old draft

  • CSV_PATH in code points to ./data/pico_log_20min.csv (not just project root).
  • requirement.txt vs requirements.txt — adapt to your repo.
  • Hold logic on firmware side is actually a debounce counter (N consecutive seconds above/below threshold) — not a “sticky decay timer” as initially described.
  • Blog explicitly matches the code (fast sigmoid, counter‑based hold, slope buffer API).

1) Environment & Data

  • Python ≥ 3.10
  • Install dependencies once:
pip install -r requirement.txt   # or: requirements.txt
Enter fullscreen mode Exit fullscreen mode
  • Default CSV location: ./data/pico_log_20min.csv
uptime_ms,temp_c,load
5702,24.330,0
6702,23.861,0
8702,24.330,1
...
Enter fullscreen mode Exit fullscreen mode

Tip: The load column is produced by firmware (toggle L0/L1). Ensure the CSV header includes all 3 columns.


2) Feature: Rolling OLS Slope (°C/s)

We use a causal time window of the last WIN_S seconds to fit a line to T(t) and keep only the slope. This captures heating rate under load, robust against absolute calibration error.

Derivation

For timestamps t_i and temperatures y_i inside the window:

equ1

We re‑center by means (\bar{t}, \bar{y}) for stability.

Implementation (Python)

def rolling_slope(t, y, win=120, min_samples=20):
    t = np.asarray(t,float); y = np.asarray(y,float)
    out = np.full(len(t), np.nan)
    for i in range(len(t)):
        j0 = np.searchsorted(t, t[i]-win)
        if i-j0+1 >= min_samples:
            tt, yy = t[j0:i+1], y[j0:i+1]
            tm, ym = tt.mean(), yy.mean()
            dt, dy = tt-tm, yy-ym
            den = (dt*dt).sum()
            if den > 1e-9:
                out[i] = (dt*dy).sum()/den
    return out
Enter fullscreen mode Exit fullscreen mode

Design choices

  • Causal → deployable in real time.
  • min_samples gate → avoids unstable fits in early window.
  • Centering → improves numeric stability.
  • Firmware version (slope_buf.hpp) maintains running sums (O(1) memory).

3) Training Script (train.py)

Pipeline steps:

  1. Load CSV, convert uptime to seconds.
  2. Compute slope (WIN_S=120, MIN_SAMPLES=20).
  3. Keep valid rows, standardize slope.
  4. Train Logistic Regression.
  5. Export params to JSON + C++ header.
import pandas as pd, numpy as np, json
from sklearn.linear_model import LogisticRegression

CSV_PATH = "./data/pico_log_20min.csv"
WIN_S, MIN_SAMPLES = 120, 20
P_ON, P_OFF, HOLD_S = 0.60, 0.40, 8

df = pd.read_csv(CSV_PATH)
df["t_s"] = df["uptime_ms"]/1000.0
df["slope"] = rolling_slope(df["t_s"], df["temp_c"], WIN_S)
data = df.dropna(subset=["slope"]).copy()

X = data[["slope"]].to_numpy()
y = data["load"].astype(int).to_numpy()
mu, sigma = float(X.mean()), float(X.std(ddof=1) or 1.0)
Xz = (X - mu)/sigma

clf = LogisticRegression()
clf.fit(Xz, y)
a, b = float(clf.coef_[0,0]), float(clf.intercept_[0])
acc = clf.score(Xz, y)
Enter fullscreen mode Exit fullscreen mode

Console output:

n=1200 idle=600 load=600
win=120s mu=0.00166 sigma=0.00207
a=0.06447 b=-0.50907 acc=95.1%
Enter fullscreen mode Exit fullscreen mode

4) Model Semantics

We model

equ2

On device:

  1. causal slope → 2. z‑score → 3. probability via sigmoid → 4. hysteresis/hold FSM.

Firmware differences

  • Sigmoid: fast_sigmoid() implementation in utils_rt.hpp (approximate).
  • Hold logic: not a “sticky decay” but a counter. Alarm changes only if condition holds for HOLD_S consecutive seconds. This is closer to a debounce filter.

5) Firmware Loop (excerpt)

SlopeBuf buf(WIN_S);
bool alarm=false; int hold=0;

while(true){
    float T = read_temp_oversampled(128, 1500);
    uint32_t ms = to_ms_since_boot(get_absolute_time());
    float ts = ms * 1e-3f;

    buf.push(ts, T);
    float m = buf.slope();
    float z = (m - MU) / (SIGMA > 1e-9f ? SIGMA : 1.f);
    float prob = fast_sigmoid(A_COEF*z + B_INT);

    if (!alarm && prob >= P_ON) {
        if (++hold >= HOLD_S) { alarm = true; hold = 0; }
    } else if (alarm && prob <= P_OFF) {
        if (++hold >= HOLD_S) { alarm = false; hold = 0; }
    } else {
        hold = 0;
    }

    printf("%u,%.3f,%.5f,%.2f,%s\n", ms, T, m, prob, alarm?"ALARM":"OK");
    sleep_ms(1000);
}
Enter fullscreen mode Exit fullscreen mode

6) What to Tune Next

  • Window length (WIN_S): shorter detects faster, noisier; longer smoother, slower.
  • Thresholds (P_ON/P_OFF): widen gap if oscillations occur.
  • Hold counter (HOLD_S): require longer confirmation to reduce false toggles.
  • Sampling rate: 1 Hz is fine for thermal.
  • Features: ratio of short vs long slope, or curvature, for earlier detection.

7) Deliverables Recap

  • train.py → trains LR, exports parameters.
  • kws_edge_params.json → readable deployment snapshot.
  • model_params.hpp → constants for firmware.
  • main_infer.cpp → real‑time inference loop (ADC, slope buffer, LR, hysteresis).

In Part 3, we wire these into the Pico firmware (efficient slope buffer, LR, hysteresis with debounce) and stream CSV (including prob + alarm state) for desktop visualization.


🔗 Explore More

Top comments (0)