DEV Community

Aniket Hingane
Aniket Hingane

Posted on

Building an Autonomous Data Pipeline Sentinel with Hierarchical Memory

Title Image

Subtitle: How I Architected a Persistent PR Defense System Using FAISS, SQLite, and Automated Memory Consolidation

TL;DR

  1. In my recent experiments, I built DataPipeline-Sentinel, a persistent OS for autonomous data pipeline incident management.
  2. I utilized a 4-tier Hierarchical Memory System (Context, Semantic, Episodic, Declarative) to enable genuine machine learning from past incidents.
  3. By combining FAISS for vector retrieval and SQLite for immutable logging, the agent instantly recalls resolved pipeline errors.
  4. I created a nightly Memory Consolidation background job to distill hundreds of raw logs into hard-coded declarative rules.
  5. This architecture shifts AI agents from stateless script-kiddies into seasoned, senior-level operators. All code is available in my public repository here.

Architecture Overview

Introduction

I observed a recurring nightmare in modern data engineering: pipelines break, engineers diagnose the issue, they apply a fix (like tweaking a Spark schema inference), and then... everyone forgets. Three months later, the exact same upstream API changes its payload structure, a different pipeline shatters, and a new engineer wastes four hours diagnosing the exact same root cause.

From my experience, stateless autonomous agents using standard RAG (Retrieval-Augmented Generation) aren't enough to solve this. If you just feed an LLM a static playbook, it never learns from the nuances of daily operational chaos. I thought about how human senior engineers operate: they have an instinct derived from thousands of past, painful outages. They remember the episodic pain of a MongoDB schema drift causing an Airflow DAG to hang.

I wanted to replicate this. I put it this way because I realized we don't just need agents that can read docs; we need agents that can remember experiences. Thus, the idea for the DataPipeline-Sentinel was born—an experimental PoC of an EverMem-style persistent AI Agent OS that learns chronologically from production incidents and consolidates that knowledge into permanent operational wisdom.

What's This Article About?

This article breaks down how I developed a Persistent Memory Operating System for an autonomous agent. I am not focusing on the specific LLM prompts. Instead, in my opinion, the fascinating part is the Memetic Architecture.

I will walk you through building a system that features:

  1. Short-Term Context Buffers: For active incident triaging.
  2. Semantic Memory (FAISS): To instantly find mathematically similar past outages using high-dimensional vector embeddings.
  3. Episodic Memory (SQLite): An immutable, append-only ledger of everything the agent and human engineers have ever done.
  4. Declarative Memory (SQLite): Firm, hard-coded constraints logically deduced from episodic logs during an automated "sleep cycle."

This isn't about generic coding. It's about designing a cognitive architecture that allows an AI operator to organically accumulate seniority over time.

Tech Stack

To keep this experimental PoC lean, I avoided heavy vector databases or complex graph tools. My setup is purposefully brutalist and highly effective:

  1. Python 3.12: The core orchestrator.
  2. FAISS (CPU): Facebook's incredibly fast library for similarity search and clustering of dense vectors. I use this exclusively for Semantic Memory.
  3. SQLite: The unsung hero of persistent storage. I use SQLite to maintain both the Episodic event logs and the Declarative rule tables. It is lightweight, zero-configuration, and ACID compliant.
  4. Rich: For hyper-readable, beautiful terminal output simulating the agent's internal monologue.
  5. Pillow & Mermaid.js: For all the visual diagramming and UI mockups.

Why Read It?

In my opinion, the AI industry is overly obsessed with context windows. "Just shove 1 million tokens into context and it will figure it out!" No. From my experience, shoving endless logs into a prompt is computationally wasteful and mathematically noisy.

You should read this if you want to understand how to build Systems of Record for autonomous agents. If you are trying to build an agent that handles customer support, financial auditing, or infrastructure monitoring, you will eventually hit the "amnesia wall." Your agent will solve a complex edge case on Tuesday and completely forget how to do it by Thursday.

This article provides the exact architectural blueprint to break through that wall. By implementing an automated consolidation layer, I've proven (at least in this PoC) that we can programmatically convert chaotic daily experiences into rigid, institutional knowledge.

Let's Design

The 4-Tier Cognitive Hierarchy

When I designed this architecture, I thought deeply about human memory psychology and applied it directly to Python objects.

Sequence Diagram

  1. Working Context (RAM)

    • Analogy: What I'm thinking about right now.
    • System Implementation: A standard Python list queue (self.short_term_buffer) capped at 10 items. It holds the active error stack trace and the active pipeline name. Once the issue is resolved, this buffer is cleared.
  2. Semantic Memory (FAISS HNSW)

    • Analogy: My vague intuitive sense that "I've seen this error before."
    • System Implementation: Every time a pipeline error occurs, it is embedded into a 1536-dimensional vector and stored in faiss.IndexFlatL2. If a new error comes in, I do a cosine similarity search (self.index.search()) to pull the top 3 most similar historical errors. Over time, FAISS acts as the agent's intuition.
  3. Episodic Memory (SQLite episodic_memory)

    • Analogy: My chronological journal of every outage I've ever fought.
    • System Implementation: An append-only relational table. Columns include timestamp, event_type, content, and consolidated. Crucially, this table stores the Resolutions—what the human engineer ultimately did to fix the pipeline.
  4. Declarative Memory (SQLite declarative_memory)

    • Analogy: The hard-coded rules written in the employee handbook.
    • System Implementation: A curated table of strict facts (e.g., "Fact Type: pipeline_fix, Content: Use infer_schema=True for Mongo syncs"). The agent queries this table purely by SQL WHERE clauses, entirely bypassing fuzzy vector math.

The Engine of Evolution: Memory Consolidation

This is the secret sauce. In my experiments, I realized Episodic Memory grows infinitely and becomes garbage. You don't want the agent reading 10,000 raw logs of humans fixing pipelines.

I wrote a consolidation.py script—a cron job simulating human sleep. It runs at midnight, performs a SQL query for all logs where consolidated = 0, uses an LLM to extract a generalized rule from the specific incident, writes to Declarative memory, and updates the flag to consolidated = 1.

Flow Diagram

Let’s Get Cooking

I structured the project strictly around separation of concerns. The OS handles persistence, the Agent handles logic, and the Consolidator handles background distillation.

Establishing the Hierarchical Memory OS

Let's look at how I implemented the core HierarchicalMemoryOS class combining SQLite and FAISS.

import sqlite3
import json
import faiss
import numpy as np
from datetime import datetime

class HierarchicalMemoryOS:
    def __init__(self, db_path="memory_os.db", vector_dim=1536):
        self.db_path = db_path
        self.vector_dim = vector_dim
        self.short_term_buffer = []

        # Initialize SQLite (Episodic & Declarative)
        self.conn = sqlite3.connect(self.db_path)
        self._init_db()

        # Initialize FAISS (Semantic)
        self.index = faiss.IndexFlatL2(self.vector_dim)
        # Map FAISS vector IDs back to SQLite Episodic IDs
        self.id_mapping = {}  
        self.next_faiss_id = 0
Enter fullscreen mode Exit fullscreen mode

I put it this way because managing two completely different storage paradigms (Vectors in RAM/Disk and Relational rows) requires a tight unifying class. The id_mapping dict bridges the gap between the FAISS integer ID array and the SQLite Primary Keys.

The Episodic and Declarative Schema

I designed the SQLite tables to be extremely barebones but highly relational to the agent's temporal experience.

    def _init_db(self):
        cursor = self.conn.cursor()
        # Episodic Memory: Raw events/logs
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS episodic_memory (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME,
                event_type TEXT,
                content TEXT,
                metadata TEXT,
                consolidated BOOLEAN DEFAULT 0
            )
        ''')
        # Declarative Memory: Concrete rules/facts derived from episodes
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS declarative_memory (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                fact_type TEXT,
                fact_content TEXT,
                confidence REAL
            )
        ''')
        self.conn.commit()
Enter fullscreen mode Exit fullscreen mode

From my experience, boolean flags like consolidated are the safest way to implement async background processing. It allows the agent to constantly write to Episodic memory without locking out the background distillation job.

Embedding Semantic Vectors

This is where the magic of fuzzy recall happens.

    def retrieve_semantic(self, query, top_k=3, simulate_embedding=True):
        # Search FAISS for semantically similar past events.
        if self.index.ntotal == 0:
            return []

        if simulate_embedding:
            # Pseudo-random reproducible vector for PoC
            np.random.seed(hash(query) % (2**32))
            vector = np.random.rand(1, 1536).astype('float32')
        else:
            # Expected API integration here
            vector = np.zeros((1, 1536)).astype('float32')

        faiss.normalize_L2(vector)
        distances, indices = self.index.search(vector, top_k)

        results = []
        cursor = self.conn.cursor()
        for idx in indices[0]:
            if idx != -1 and idx in self.id_mapping:
                sql_id = self.id_mapping[idx]
                cursor.execute("SELECT content FROM episodic_memory WHERE id = ?", (sql_id,))
                row = cursor.fetchone()
                if row:
                    results.append(row[0])
        return results
Enter fullscreen mode Exit fullscreen mode

I observed that simply returning vector distances isn't enough. We must do a secondary lookup into SQLite using self.id_mapping to return the actual, human-readable log content that matches the vector. This is how the agent fundamentally "remembers" text based on semantic meaning.

The Sentinel Agent Logic

Here is the core orchestration loop that fires when a pipeline breaks.

from memory_os import HierarchicalMemoryOS

class SentinelAgent:
    def __init__(self, memory_os: HierarchicalMemoryOS):
        self.memory = memory_os

    def handle_incident(self, pipeline_name, error_log):
        # 1. Update Short-Term Context
        self.memory.add_short_term({"event": "incident", "pipeline": pipeline_name, "error": error_log})

        # 2. Retrieve Semantic Context (Has this happened before?)
        similar_past_errors = self.memory.retrieve_semantic(error_log)

        # 3. Retrieve Declarative Rules (Are there firm rules for this?)
        firm_rules = self.memory.get_declarative_rules("pipeline_fix")

        # 4. Formulate Diagnosis & Fix (Simulated LLM Call)
        diagnosis = self._analyze_with_llm(error_log, similar_past_errors, firm_rules)

        # 5. Store Incident in Episodic Memory
        episodic_id = self.memory.store_episodic(
            event_type="incident",
            content=f"[{pipeline_name}] failed: {error_log}. Diagnosis: {diagnosis}",
            metadata={"pipeline": pipeline_name, "status": "unresolved"}
        )

        # Embed for immediate searchability
        self.memory.embed_and_store_semantic(error_log, episodic_id)

        return diagnosis
Enter fullscreen mode Exit fullscreen mode

I wrote it this way to force the agent to query BOTH its intuition (FAISS Semantic) and its handbook (SQLite Declarative) before invoking the LLM synthesis logic. This drastically reduces hallucinations because the LLM prompt is heavily saturated with historical ground-truth.

The Memory Consolidator

The final piece of the puzzle. This runs completely out-of-band.

class MemoryConsolidator:
    def __init__(self, memory_os: HierarchicalMemoryOS):
        self.memory = memory_os

    def run_consolidation_cycle(self):
        # Scan unconsolidated episodic memory and distill to declarative rules.
        cursor = self.memory.conn.cursor()
        cursor.execute('''
            SELECT id, content FROM episodic_memory 
            WHERE consolidated = 0 AND event_type = 'resolution'
        ''')
        rows = cursor.fetchall()

        consolidated_count = 0
        for row in rows:
            record_id, content = row

            # Simulated LLM Extraction: Extract a firm rule from the resolution
            if "infer_schema=True" in content:
                rule = "Always use infer_schema=True when dealing with upstream MongoDB drift."
                self.memory.store_declarative("pipeline_fix", rule)

            # Mark as consolidated
            cursor.execute("UPDATE episodic_memory SET consolidated = 1 WHERE id = ?", (record_id,))
            consolidated_count += 1

        self.memory.conn.commit()
        return consolidated_count
Enter fullscreen mode Exit fullscreen mode

By pulling unresolved logs and formally marking them as consolidated = 1, we effectively maintain a high-signal-to-noise ratio in the declarative database while preserving the unstructured history forever.

Let's Setup

If you want to run this experimental environment on your own machine:

Step by step details can be found at: DataPipeline-Sentinel GitHub Repository.

  1. Clone the repo and install the light-weight dependencies (faiss-cpu, rich).
  2. Run python3 main.py to initiate the simulation.
  3. Observe how the agent handles a Day 1 novel incident, undergoes Nightly Consolidation, and brilliantly resolves a Day 2 recurrent incident without human intervention.
  4. You can explore the exact raw source code structure there and adapt it to your LLM API of choice.

Let's Run

When executing the agent in an environment, the simulation visually proves the memetic shift.

On Day 1, the agent encounters a Schema mismatch on 'user_metadata' array. Semantic lookup returns 0 results. Declarative lookup returns 0 results. The agent escalates to a human engineer. The engineer manually deploys a fix (infer_schema=True). The agent logs this.

At Midnight, the MemoryConsolidator process wakes up. It scans the episodic logs, notices the human resolution, and extracts a hard-coded constraint rule, storing it in SQLite.

On Day 2, the agent encounters a very similar error on a different pipeline: Schema mismatch on 'transaction_data' array.
Instantly, the system queries FAISS and recognizes semantic similarity. It queries SQLite and retrieves the newly consolidated rule. The agent autonomously suggests the exact fix without escalating to the engineer.

This proves that continuous, persistent learning is possible when you decouple the storage topology from the stochastic LLM generation!

Extensive Deep Dive on Architectural Trade-offs

To reach a comprehensive understanding, I must expand on why I think this specific stack is the ultimate sweet spot for edge AI agents.

Why not just use a massive Vector Database for everything?

Ah, the trap of the modern AI hype cycle. If you store everything in Pinecone or Milvus, you treat subjective opinions and objective firm-rules identically. A vector database retrieves data based on mathematically fuzzy distance. If a company policy states "Never restart a Production node during business hours," you do NOT want a fuzzy 0.82 cosine similarity match to decide if that rule applies. You want a deterministic SQL WHERE rules.type = 'security_constraint' to enforce it.
By splitting the data, I guarantee that the agent has both creative intuition and strict boundary compliance.

The Ethics of Autonomous Operational Agents

When allowing agents to manage production data pipelines, an ethical engineering dilemma arises: accountability.
Because everything the DataPipeline-Sentinel does is logged immutably into episodic_memory SQLite tables, an audit team can trace exactly why the agent executed a specific query. We can see the FAISS IDs retrieved, the Declarative Rules pulled, and the prompt fed to the LLM.
In my opinion, any agent performing write-operations on enterprise infrastructure MUST have an immutable SQLite-style episodic log. RAG without auditability is a liability.

Future Roadmap

While this PoC brilliantly handles incident logging, my future experiments will focus on:

  1. Memory Decay: Periodically downgrading the confidence score in the Declarative table over time if a rule isn't cited in X days.
  2. Conflict Resolution: What happens when Day 50 consolidation contradicts a rule learned on Day 10? The agent will need an active reasoning loop to determine truth.
  3. Multi-Agent Memory Sharing: Having a Sentinel Agent share its FAISS semantic index with a completely different Security Agent over the network.

Closing Thoughts

Building the DataPipeline-Sentinel experiment was a profound validation of cognitive software architecture. I realized that the intelligence of an agent isn't bound by its underlying model's parameter count—it's bounded by the architecture of its memory systems.

A $10,000 foundational model with no persistence is a genius amnesiac. But a relatively cheap model wrapped in a beautifully orchestrated Hierarchical Memory OS becomes a domain expert. FAISS and SQLite proved to be the absolute perfect, lightweight pairing to achieve this.

If we want autonomous agents to truly integrate into real-world business environments—whether it's monitoring infrastructure, handling corporate finance, or auditing compliance—we must give them the gift of permanent, structured memory.


Disclaimer

The views and opinions expressed here are solely my own and do not represent the views, positions, or opinions of my employer or any organization I am affiliated with. The content is based on my personal experience and experimentation and may be incomplete or incorrect. Any errors or misinterpretations are unintentional, and I apologize in advance if any statements are misunderstood or misrepresented.

Top comments (0)