DEV Community

Ertugrul
Ertugrul

Posted on

Building an Edge AI Sound Classifier (Part 2): Feature Extraction & Training

In Part 1, we prepared a balanced dataset of short audio snippets. In Part 2, we’ll turn those snippets into compact numerical features and train a tiny model that can run on the Raspberry Pi Pico.

We’ll cover:

  • 🎛️ The exact features (33‑dim vector) and why we chose them
  • 🧮 How we compute them efficiently
  • 🧪 Training a multinomial Logistic Regression with proper normalization
  • 📈 Real evaluation metrics & confusion matrix
  • 📤 Exporting weights to a C++ header for the Pico firmware

🎛️ Feature Set (33 dims)

For MCU‑friendly audio, we favor hand‑crafted, low‑cost features that still capture the essence of each class.

Per snippet (1.5–2.0 s, framed at 25 ms with 10 ms hop, Hann window):

  • 12 Goertzel band energies spanning ~300–4800 Hz → mean and std over time (24)
  • RMS mean, std (2)
  • Spectral centroid mean, std (2)
  • Spectral rolloff (0.85) mean, std (2)
  • ZCR mean, std (2)
  • Spectral flatness mean (1)

Total = 33.

Why Goertzel? It’s lighter than a dense filterbank and maps well to embedded targets. The other statistics add robustness to different voices/rooms/devices.

Spectrogram of 3 different classes side‑by‑side


⚙️ Pre‑processing & Framing

We normalize peaks for consistency and apply pre‑emphasis to boost high frequencies (helps alarms/doorbells):

peak = np.max(np.abs(y)) + 1e-9
y = np.clip(y/peak*0.707, -1, 1)
y = np.append(y[0], y[1:] - 0.95*y[:-1]).astype(np.float32)

frames = frame_stack(y, SR)               # 25 ms / 10 ms hop
win = np.hanning(frames.shape[1]).astype(np.float32)
frames_w = frames * win
Enter fullscreen mode Exit fullscreen mode

📐 Goertzel Bands (fast, MCU‑friendly)

We approximate band energies with a few Goertzel tones per band and average:

def goertzel_pow(frame, sr, f):
    w = 2*math.pi*f/sr
    c = math.cos(w)
    s0 = s1 = s2 = 0.0
    for x in frame:
        s0 = x + 2*c*s1 - s2
        s2, s1 = s1, s0
    return s1*s1 + s2*s2 - 2*c*s1*s2

def band_energy(frame, sr, lo, hi, n=3):
    freqs = np.linspace(lo, hi, n)
    return float(np.mean([goertzel_pow(frame, sr, f) for f in freqs]))
Enter fullscreen mode Exit fullscreen mode

Then we pool across time to get mean and std for each of the 12 bands.

⚠️ Important: We do not globally standardize here. We only apply an internal z‑score within each 12‑vector (be_mean separately from be_std) to remove per‑band scale bias. The global standardization used by the model is fitted on the training set and applied later.


🧮 Putting Features Together

# FFT-based helpers (per frame)
F = np.fft.rfft(frames_w, axis=1)
mag = np.abs(F) + 1e-12
freqs = np.fft.rfftfreq(frames_w.shape[1], 1/SR)
centroid = (mag*freqs).sum(axis=1)/mag.sum(axis=1)

# 0.85 rolloff
csum = np.cumsum(mag, axis=1)
roll_idx = np.argmax(csum >= (0.85*mag.sum(axis=1))[:,None], axis=1)
rolloff = freqs[roll_idx]

# ZCR (sign changes)
sgn = np.sign(frames_w); sgn[sgn==0]=1
zc = (np.diff(sgn, axis=1)!=0).mean(axis=1).astype(np.float32)

# Pool to 33-dim vector
be_mean = be.mean(axis=0); be_std = be.std(axis=0)+1e-8
be_mean = (be_mean - be_mean.mean()) / (be_mean.std()+1e-8)
be_std  = (be_std  - be_std.mean())  / (be_std.std()+1e-8)

feats = []
feats.extend(be_mean.tolist()); feats.extend(be_std.tolist())
feats.extend([float(rms.mean()), float(rms.std()+1e-8)])
feats.extend([float(centroid.mean()), float(centroid.std()+1e-8)])
feats.extend([float(rolloff.mean()), float(rolloff.std()+1e-8)])
feats.extend([float(zc.mean()), float(zc.std()+1e-8)])
feats.append(float(flat.mean()))
Enter fullscreen mode Exit fullscreen mode

Finally we serialize features to a CSV (featuresv1.csv) with columns:

[path, label, source, feat]
Enter fullscreen mode Exit fullscreen mode

where feat is a JSON list of 33 floats.

Screenshot of a few CSV rows in a table


🧪 Training (Multinomial Logistic Regression)

Your training script (train_ml.py) follows a clear, MCU‑friendly flow:

# ===== CONFIG =====
FEAT_CSV  = "./features/featuresv1.csv"
HPP_OUT   = "./firmware/model_params.hpp"
TEST_PROP = 0.30
SEED      = 42
Enter fullscreen mode Exit fullscreen mode

1) Load features & labels

df = pd.read_csv(FEAT_CSV)
X = np.stack(df["feat"].map(lambda s: np.array(json.loads(s), dtype=np.float32)))
labels = sorted(df["label"].unique().tolist())
lab2id = {l:i for i,l in enumerate(labels)}
y = df["label"].map(lab2id).values.astype(int)
sources = df["source"].astype(str).values
Enter fullscreen mode Exit fullscreen mode
  • Labels are sorted and mapped to IDs → this order is also exported to firmware via LABELS[].

2) Group‑aware split (fallback to stratified)

ok_group = all(len(set(sources[y==i])) >= 2 for i,_ in enumerate(labels))
if ok_group:
    tr_idx, te_idx = group_split_indices(y, sources, labels, test_prop=TEST_PROP, seed=SEED)
else:
    tr_idx, te_idx = train_test_split(
        np.arange(len(y)), test_size=TEST_PROP, random_state=SEED, stratify=y
    )
Enter fullscreen mode Exit fullscreen mode
  • If each class has ≥2 distinct sources, we split by source to avoid leakage.
  • Otherwise, we stratify to preserve class balance.

3) Fit normalization on train only, then apply

def zfit(X):
    mu = X.mean(axis=0); sigma = X.std(axis=0) + 1e-8
    return mu, sigma

def zapply(X, mu, sigma):
    return (X - mu) / sigma

mu, sigma = zfit(X[tr_idx])
Xtr = zapply(X[tr_idx], mu, sigma)
Xte = zapply(X[te_idx], mu, sigma)
Enter fullscreen mode Exit fullscreen mode
  • The + 1e-8 epsilon mirrors what the firmware does to avoid divide‑by‑zero.

4) Train multinomial Logistic Regression

from sklearn.linear_model import LogisticRegression
clf = LogisticRegression(max_iter=2000, class_weight="balanced")
clf.fit(Xtr, y[tr_idx])
Enter fullscreen mode Exit fullscreen mode
  • Balanced class weights help when the “other” class has more samples.

5) Evaluate

from sklearn.metrics import classification_report, confusion_matrix
ypr = clf.predict(Xte)
print(classification_report(y[te_idx], ypr, target_names=labels, zero_division=0))
print(confusion_matrix(y[te_idx], ypr, labels=np.arange(len(labels))))
Enter fullscreen mode Exit fullscreen mode
  • Your actual run: Accuracy 0.87, weighted F1 0.86. Best: smoke_alarm (≈0.99 F1). Trickiest: baby (recall ≈0.69).

6) Export to firmware header

C = len(labels); F = X.shape[1]
W = np.zeros((C, F+1), dtype=np.float32)
W[:,0]  = clf.intercept_.astype(np.float32)
W[:,1:] = clf.coef_.astype(np.float32)
export_hpp(W, labels, mu, sigma, HPP_OUT)
Enter fullscreen mode Exit fullscreen mode

This generates model_params.hpp with:

namespace model {
  static const int CLASSES = C;
  static const int FEATS   = F;
  static const float MU[FEATS];
  static const float SIGMA[FEATS];
  static const float W[CLASSES][FEATS+1]; // bias + weights
  static const char* LABELS[CLASSES];
}
Enter fullscreen mode Exit fullscreen mode

a confusion matrix heatmap


📤 Exporting for the Pico

After training, we export the parameters to a C++ header used by the firmware:

// model_params.hpp (auto-generated)
namespace model {
  static const int CLASSES = 4;
  static const int FEATS   = 33;
  static const float MU[FEATS]    = { ... };
  static const float SIGMA[FEATS] = { ... };
  static const float W[CLASSES][FEATS+1] = { /* bias + weights */ };
  static const char* LABELS[CLASSES] = { "baby", "doorbell", "other", "smoke_alarm" };
}
Enter fullscreen mode Exit fullscreen mode

This keeps the runtime code tiny: just z‑score → linear layer → softmax.


🧠 Common Pitfalls & Tips

  • Double normalization: Don’t re‑apply global z‑score on the PC feeder if the Pico already does (x − MU)/SIGMA.
  • SR mismatch: Ensure the same sample rate in extraction and live streaming (e.g., 16 kHz).
  • Latency vs stability: Shorter snippet & hop reduce latency; tune FSM thresholds per class in Part 3.

🔜 Next (Part 3): Firmware & Live Demo

We’ll deploy the model on the Pico:

  • C++ inference loop (z‑score → LR → softmax)
  • Hysteresis FSM per class (with thresholds / consecutive frames)
  • LED indication + serial logs, and a live microphone demo.

🌐 Links & Connect

Top comments (0)