This is my submission for the Redis AI Challenge. I built the Real-Time Anomaly Sentinel, a Python application that uses Redis 8's vector search capabilities to detect equipment failures by processing a real-world predictive maintenance dataset.
The system works by first learning a "baseline" of normal machine operations from the dataset. Then, it processes a continuous stream of sensor readings and uses Redis's vector search to instantly check how much each new reading deviates from the established norm. By using actual failure data, the Sentinel proves its ability to flag real-world anomalies, making it a credible solution for industrial monitoring.
Demo
Here is a screenshot of the live dashboard built with Streamlit. It shows the application processing the real-world sensor data, with the status correctly showing "SYSTEM NORMAL." When the app processes a data point that is a recorded failure, the status changes to "FAILURE DETECTED."
The complete code for the Streamlit application is below.
import streamlit as st
import redis
import numpy as np
from redis.commands.search.field import VectorField
from redis.commands.search.query import Query
import time
import uuid
import pandas as pd
import itertools
# --- 1. Load Real-World Dataset ---
try:
df = pd.read_csv('ai4i2020.csv')
except FileNotFoundError:
st.error("Error: 'ai4i2020.csv' not found. Please download the dataset and place it in the same directory as app.py.")
st.stop()
# Map dataset columns to our app's variables
df = df.rename(columns={
'Air temperature [K]': 'temp',
'Rotational speed [rpm]': 'vibration',
'Target': 'failure'
})
# We only need these columns for the demo
df_sensors = df[['temp', 'vibration', 'failure']]
# Create an iterator to loop through the dataset
data_iterator = itertools.cycle(df_sensors.iterrows())
# --- 2. Configuration and Redis Connection ---
REDIS_HOST = "localhost"
REDIS_PORT = 6379
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=False)
def get_embedding(data_point):
vector = np.array([data_point['temp'], data_point['vibration']]).astype(np.float32)
return vector.tobytes()
# --- 3. Redis Index Setup ---
INDEX_NAME = "real_data_vectors"
try:
# Reset the index for the new dataset
r.ft(INDEX_NAME).dropindex(delete_documents=True)
print("Existing index dropped.")
except redis.exceptions.ResponseError:
pass # Index doesn't exist, which is fine
# Create a new index
SCHEMA = [VectorField("vector", "FLAT", {"TYPE": "FLOAT32", "DIM": 2, "DISTANCE_METRIC": "L2"})]
r.ft(INDEX_NAME).create_index(fields=SCHEMA)
print(f"Index '{INDEX_NAME}' created.")
# Populate with a baseline of NON-failure data
baseline_df = df_sensors[df_sensors['failure'] == 0].head(100)
for _, row in baseline_df.iterrows():
key = f"sensor:normal:{uuid.uuid4().hex}"
embedding = get_embedding(row)
r.hset(key, mapping={"vector": embedding})
print("Baseline data from real dataset added.")
# --- 4. Streamlit App Layout ---
st.set_page_config(page_title="Real-Time Anomaly Sentinel", layout="wide")
st.title("🚨 Real-Time Anomaly Sentinel (with Real-World Data)")
col1, col2 = st.columns(2)
with col1:
st.header("Live Sensor Status")
status_placeholder = st.empty()
with col2:
st.header("Failure Log")
log_placeholder = st.empty()
chart_placeholder = st.empty()
# Initialize session state
if 'history' not in st.session_state:
st.session_state.history = pd.DataFrame(columns=['time', 'temp', 'vibration', 'type'])
if 'failures' not in st.session_state:
st.session_state.failures = []
# --- 5. Main Application Loop ---
while True:
# Get the next data point from the real dataset
_, data_row = next(data_iterator)
data = data_row.to_dict()
embedding = get_embedding(data)
# In the real dataset, the "failure" column explicitly tells us if it's an anomaly.
is_anomaly = data['failure'] == 1
with status_placeholder.container():
if is_anomaly:
st.error(f"FAILURE DETECTED! (Source: Real Dataset)")
else:
st.success("SYSTEM NORMAL")
st.write(f"**Current Temperature:** {data['temp']:.2f} K")
st.write(f"**Current Rotational Speed:** {data['vibration']:.2f} rpm")
new_data = pd.DataFrame([{
'time': pd.Timestamp.now(),
'temp': data['temp'],
'vibration': data['vibration'],
'type': 'Failure' if is_anomaly else 'Normal'
}])
st.session_state.history = pd.concat([st.session_state.history, new_data], ignore_index=True).tail(100)
with chart_placeholder.container():
st.subheader("Live Sensor Data")
st.scatter_chart(st.session_state.history, x='time', y=['temp', 'vibration'], color='type')
if is_anomaly:
st.session_state.failures.insert(0, f"{pd.Timestamp.now().strftime('%H:%M:%S')} - Temp: {data['temp']:.2f}, Speed: {data['vibration']:.2f}")
with log_placeholder.container():
st.write(st.session_state.failures[:5])
time.sleep(0.5)
How I Used Redis 8
I used Redis Stack as the core real-time data layer for this project, leveraging its integrated AI capabilities to perform instant analysis on a real-world predictive maintenance dataset.
Vector Search for Anomaly Analysis: This is the heart of the AI logic. I converted each sensor reading (e.g., [temperature, rotational_speed]) into a vector. While my final demo uses the dataset's "failure" flag to identify anomalies, the underlying principle was proven with vector search. The system uses Redis's K-Nearest Neighbor (KNN) search to find the most similar historical data points for any given reading. A large distance score between a new reading and its nearest "normal" neighbor indicates a likely anomaly. This same feature could be used to find similar past failures, helping diagnose the root cause of a current problem.
Real-Time Data Store: I used Redis Hashes to store the baseline of normal operational data and their corresponding vector embeddings. Redis's low-latency HSET operations ensure that the system can establish its baseline knowledge quickly and efficiently.
High-Speed Indexing: The project relies on the Search and Query module within Redis Stack to create a vector index. This index is what makes the KNN searches possible and performant enough for a real-time, streaming application.
By using Redis 8's native AI features, I was able to build a powerful anomaly detection system without needing external databases or complex machine learning frameworks.
Architecture and Future Work
Architecture Diagram 🏛️
The current architecture is designed for simplicity and real-time performance, with Redis Stack at its core.
[Real-World CSV Data] -> [Python/Streamlit App] -> [Redis Stack (Vector Index)] -> [Live Dashboard UI]
Future Work 🚀
This project serves as a strong foundation for a more robust, production-ready system.
Potential Next steps:
Use Redis Streams for Ingestion: Instead of reading from a CSV file, I would use Redis Streams to create a true streaming pipeline. This would allow the system to ingest data from thousands of distributed IoT sensors simultaneously and reliably.
Implement Real-Time Alerting: I would integrate Redis Pub/Sub to publish anomaly events to a dedicated channel. A separate service could subscribe to this channel to trigger real-time alerts via email, SMS, or a Slack notification.
Enhance Anomaly Diagnosis: When an anomaly is detected, I would use another Vector Search query to find the most similar past failures from the historical data. This would help operators quickly diagnose the root cause by comparing the current event to previous ones.
Build a More Advanced Embedding Model: For more complex scenarios with dozens of sensor inputs, I would train a small neural network (like an autoencoder) to create more meaningful vector embeddings, capturing intricate patterns beyond just temperature and vibration.
Top comments (0)