Most embedded Linux projects do not need a database server. They need something that stores data reliably when the network is down, survives a power cut, does not eat RAM, and requires zero administration. SQLite does all of that. It is a single .db file on disk, it has been in production use for over twenty years, and it is already installed on virtually every Linux system you will ever deploy to. If you are currently writing sensor data to a flat file or a custom binary format, this article will show you why SQLite is almost always the better choice — and exactly how to use it correctly.
See the project on GitHub: Resilient Edge MQTT client
Enable WAL Mode Before Anything Else
The single most impactful thing you can do is switch from the default journal mode to Write-Ahead Logging. In the default mode, SQLite locks the entire database file during a write, which means a reader has to wait. WAL mode allows reads and writes to happen simultaneously, which matters enormously when you have one thread logging sensor data and another thread reading it for transmission. It also makes crash recovery safer — incomplete writes never corrupt the database.
import sqlite3
conn = sqlite3.connect('sensor_data.db')
# Do this immediately after opening — before any other operations
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL") # Safe with WAL, much faster than FULL
conn.commit()
synchronous=NORMAL is safe to use in combination with WAL mode. The default FULL mode flushes to disk on every transaction, which is slow on SD cards and eMMC storage. NORMAL flushes at critical checkpoints instead, giving you durability against crashes while being significantly faster on the kinds of storage you find in embedded devices.
Schema Design: Keep It Flat, Add the Right Indexes
Resist the urge to normalise aggressively. On an edge device you are almost always inserting one kind of data repeatedly and querying it by time range. A flat table with a proper index on the timestamp column handles both operations efficiently and keeps queries simple.
conn.execute("""
CREATE TABLE IF NOT EXISTS sensor_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL, -- Unix epoch, fractional seconds
topic TEXT NOT NULL, -- e.g. 'sensors/warehouse/temperature'
payload TEXT NOT NULL, -- JSON string
synced INTEGER NOT NULL DEFAULT 0 -- 0 = pending upload, 1 = done
)
""")
# This index makes time-range queries fast and powers the sync queue
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp_synced
ON sensor_readings(timestamp, synced)
""")
The synced column is a pattern worth adopting from the start. Edge devices often need to buffer data locally and then upload it in batches when connectivity returns. Tracking sync state in the same table keeps everything in one place and makes the query for "give me everything not yet sent" trivially simple:
SELECT * FROM sensor_readings WHERE synced = 0 ORDER BY timestamp ASC LIMIT 100
Always Use Batch Inserts
Never insert one row per commit in a logging loop. Each commit flushes a transaction to disk, and on embedded storage that is slow enough to miss readings at any meaningful sample rate. Batch your inserts inside a single transaction — the difference in throughput is dramatic.
import time
import json
from contextlib import contextmanager
@contextmanager
def batch_insert(conn, batch_size=100):
"""Context manager that accumulates rows and commits in one transaction."""
rows = []
def add(topic, payload):
rows.append((time.time(), topic, json.dumps(payload)))
if len(rows) >= batch_size:
_flush(conn, rows)
rows.clear()
yield add
if rows: # Flush any remaining rows on exit
_flush(conn, rows)
def _flush(conn, rows):
conn.executemany(
"INSERT INTO sensor_readings (timestamp, topic, payload) VALUES (?, ?, ?)",
rows
)
conn.commit()
# Usage in a sensor loop
with batch_insert(conn, batch_size=50) as log:
for reading in sensor_stream():
log('sensors/warehouse/temperature', {
'value': reading.temperature,
'unit': 'celsius'
})
executemany with a list of tuples is the correct tool here. It prepares the statement once and executes it repeatedly within a single transaction, which is both faster and safer than building SQL strings manually. Never concatenate user data or variable values into SQL strings directly — always use parameterised queries with ? placeholders.
Thread Safety: One Connection Per Thread
If your application is multi-threaded — and most real embedded applications are — do not share a single SQLite connection across threads. SQLite connections are not thread-safe by default, and sharing one between a logging thread and a sync thread will give you intermittent errors that are hard to reproduce. The cleanest approach is to use a connection pool or simply open a dedicated connection per thread.
import threading
# Thread-local storage: each thread gets its own connection automatically
_local = threading.local()
def get_connection(db_path: str) -> sqlite3.Connection:
"""Return a per-thread SQLite connection, creating it if needed."""
if not hasattr(_local, 'conn') or _local.conn is None:
_local.conn = sqlite3.connect(db_path, check_same_thread=False)
_local.conn.row_factory = sqlite3.Row # Access columns by name
_local.conn.execute("PRAGMA journal_mode=WAL")
_local.conn.execute("PRAGMA synchronous=NORMAL")
return _local.conn
row_factory = sqlite3.Row is small but worth enabling. It makes query results accessible by column name (row['timestamp']) rather than index (row[0]), which makes your code substantially easier to read and maintain.
Manage Storage: Prevent the Database from Growing Forever
On a device with a 16 GB SD card, an unbounded database will eventually fill the storage and crash your application. Build a retention policy in from the start. The simplest approach is a periodic cleanup that deletes synced rows older than your retention window.
import time
def cleanup_old_records(conn, retain_days=7):
"""
Delete synced records older than retain_days.
Only deletes synced rows — unsynced data is preserved regardless of age,
because it hasn't been uploaded yet and may still be needed.
"""
cutoff = time.time() - (retain_days * 86400)
cursor = conn.execute(
"DELETE FROM sensor_readings WHERE synced = 1 AND timestamp < ?",
(cutoff,)
)
conn.commit()
deleted = cursor.rowcount
if deleted > 0:
# VACUUM reclaims the disk space freed by deletions
# Run this infrequently — it rewrites the entire database file
conn.execute("VACUUM")
return deleted
Run cleanup_old_records once per day from a background thread or a systemd timer. The VACUUM command rewrites the database to reclaim the disk space freed by deletions — SQLite does not shrink the file automatically. VACUUM is expensive, so do not call it every cleanup cycle; once a day or once a week is usually fine depending on your write volume.
The Complete Pattern
Here is what the full setup looks like when these practices are combined:
import sqlite3
import threading
import time
import json
DB_PATH = '/var/lib/myapp/sensor_data.db'
_local = threading.local()
def get_db() -> sqlite3.Connection:
if not hasattr(_local, 'conn') or _local.conn is None:
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
_local.conn = conn
return _local.conn
def init_schema():
conn = get_db()
conn.execute("""
CREATE TABLE IF NOT EXISTS sensor_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
topic TEXT NOT NULL,
payload TEXT NOT NULL,
synced INTEGER NOT NULL DEFAULT 0
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp_synced
ON sensor_readings(timestamp, synced)
""")
conn.commit()
def log_reading(topic: str, payload: dict):
conn = get_db()
conn.execute(
"INSERT INTO sensor_readings (timestamp, topic, payload) VALUES (?, ?, ?)",
(time.time(), topic, json.dumps(payload))
)
conn.commit()
def get_pending(limit: int = 100) -> list:
"""Return unsynced readings, oldest first."""
conn = get_db()
return conn.execute(
"SELECT * FROM sensor_readings WHERE synced = 0 ORDER BY timestamp ASC LIMIT ?",
(limit,)
).fetchall()
def mark_synced(row_ids: list[int]):
"""Mark a batch of rows as successfully uploaded."""
conn = get_db()
conn.executemany(
"UPDATE sensor_readings SET synced = 1 WHERE id = ?",
[(rid,) for rid in row_ids]
)
conn.commit()
Quick Reference
Do these from the start. Enable WAL mode and synchronous=NORMAL immediately after opening each connection. Use parameterised queries — never string concatenation. Add a timestamp index and a synced column if you are buffering for later upload. Set a retention policy before your first deployment.
Avoid these. Do not commit per row in a high-frequency logging loop — batch your inserts. Do not share a connection across threads. Do not call VACUUM on every cleanup. Do not use the default journal mode if you have concurrent readers and writers.
Use when you need it. executemany for bulk inserts. row_factory = sqlite3.Row for readable query results. VACUUM once a day or week to reclaim space. PRAGMA integrity_check after a power loss if you want to verify the database is undamaged.
If you found this useful, drop a comment or a ❤️ — and let me know what you'd like to see covered next.
Top comments (0)