Detecting Anomalies in OpenTelemetry Logs Using Vector Embeddings and Redis
Finding Unknown Unknowns in Your Logs
Picture this: It's 3 AM, and your payment service starts failing. The errors don't match any of your alert rules. The log messages are technically distinct from anything you've seen before, but semantically, they're similar to a database connection issue that occurred six months ago. Your rule-based system missed it because the error message format changed. Your keyword searches failed because the new error uses different terminology. By the time your team discovers the issue through customer complaints, you have already lost thousands in revenue.
This scenario plays out daily across engineering teams. Modern applications generate millions of logs, and buried within them are critical anomalies, such as security breaches, performance degradations, and system failures. The challenge isn't just the volume; it's that we don't know what we're looking for. New failure modes emerge constantly, attackers develop novel techniques, and dependencies fail in unexpected ways. Meanwhile, traditional monitoring relies on flawed assumptions, such as anticipating all failure modes (rule-based detection) and assuming that anomalies will use predictable keywords (search-based detection). But what if, instead of looking for specific patterns, we could teach our system to understand what "normal" looks like and flag anything that deviates from that understanding?
This is where semantic anomaly detection comes in. By converting logs into vector embeddings that capture their meaning, we can identify anomalies based on how different they are from normal behavior, even if we've never seen that specific error before. It's like teaching your monitoring system to understand context and meaning, not just pattern matching.
Understanding the Solution: Embeddings and Vector Search
Before diving into code, let's understand what makes this approach powerful. An embedding is a method for representing data as an array of numbers (a vector) with the purpose of capturing its semantic meaning. When we convert the log message "User authentication failed for admin account" into an embedding, we obtain a vector of the form [0.23, -0.45, 0.67, ...], stored in a 768-dimensional space, where similar meanings result in similar vectors. We can achieve this by using a vector store, such as Redis, and executing a semantic search.
Here's the key insight: logs that describe similar events will have similar embeddings, even if they use different words. "Authentication failed" and "Login unsuccessful" will produce vectors that are close together in the 768-dimensional space. Meanwhile, "Authentication failed" and "Payment processed successfully" will be far apart. This distance becomes our anomaly score.
This is powerful, but it can be further enhanced. When we combine this with OpenTelemetry's structured logging, we can add additional context to the semantic search. OpenTelemetry (or OTEL for short) provides standardized fields for observability data:
-
service.nametells us which service generated the log -
http.status_codeindicates success or failure -
net.peer.ipshows who's connecting -
trace.idlinks related logs together
These are only a few examples of fields that can be provided by OTEL structured logging. They were included in the OTEL project in 2023 when Elastic donated its structured approach to logs, known as Elastic Common Schema (ECS), to OTEL. You can read more about this here.
By incorporating this context into our embeddings, we create rich representations that not only understand what happened, but also where, when, and under what circumstances. A "connection timeout" from your database service at 3 AM is very different from the same message from a third-party API during business hours, and our embeddings will reflect that.
Building the Anomaly Detection System
Let's build a production-ready anomaly detector using Redis Open Source, which includes support for Redis Query Engine (RQE). This will allow us to easily store vector embeddings and implement semantic search, which is required for this use case.
Setting Up Dependencies
The Docker Compose file below provides you with an easy way to spin up Redis, as well as Redis Insight, which you can use to browse and inspect data.
services:
redis-database:
container_name: redis-database
hostname: redis-database
image: redis:8.4.0
volumes:
- ./data:/data
environment:
REDIS_ARGS: --save 30 1
ports:
- "6379:6379"
healthcheck:
test: [ "CMD-SHELL", "redis-cli ping | grep PONG" ]
interval: 10s
retries: 5
start_period: 5s
timeout: 5s
redis-insight:
container_name: redis-insight
hostname: redis-insight
image: redis/redisinsight:2.70.1
depends_on:
- redis-database
environment:
RI_REDIS_HOST: "redis-database"
RI_REDIS_PORT: "6379"
ports:
- "5540:5540"
healthcheck:
test: ["CMD", "sh", "-c", "wget -q -O- http://redis-insight:5540/api/health | grep -q '\"status\":\"up\"'"]
interval: 10s
retries: 5
start_period: 5s
timeout: 5s
When both services are up, you can access your Redis database using a browser. Navigate to http://localhost:5540, and you should see the following page:
To interact with RQE, we will use RedisVL (Redis Vector Library), which provides a clean Python interface for vector operations in Redis. We'll start with the core components and gradually build up to a fully functional system.
Setting Up RedisVL and the Vector Index
First, we need to create a vector index that can store our log embeddings and enable fast similarity search. RedisVL makes this easy for you while handling the complexity of index management behind the scenes:
from redisvl.schema import IndexSchema
from redisvl.index import SearchIndex
from sentence_transformers import SentenceTransformer
import numpy as np
from datetime import datetime
from typing import Dict, List, Tuple
class OTELAnomalyDetector:
def __init__(self):
"""
Initialize our anomaly detection system with RedisVL.
We're using sentence-transformers for embeddings because:
1. They're specifically trained for semantic similarity
2. They run efficiently on CPU (no GPU required)
3. They produce fixed-size vectors perfect for similarity search
"""
# Initialize the embedding model
# all-MiniLM-L6-v2 gives us 768-dimensional embeddings
# It's fast (2000+ sentences/sec on CPU) and accurate
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
# Define our Redis schema
# This tells RedisVL how to store and index our data
schema = {
"index": {
"name": "otel-logs",
"prefix": "log:", # All keys will start with "log:"
"storage_type": "hash" # Redis hash for flexible field storage
},
"fields": [
# Vector field for embeddings - this is where the magic happens
{
"name": "embedding",
"type": "vector",
"attrs": {
"dims": 768, # Dimensions from our model
"distance_metric": "cosine", # Cosine similarity for semantic matching
"algorithm": "hnsw", # Hierarchical Navigable Small World for fast search
"datatype": "float32"
}
},
# OTEL semantic convention fields for context
{"name": "service_name", "type": "tag"}, # Which service?
{"name": "severity", "type": "tag"}, # ERROR, WARN, INFO
{"name": "http_status_code", "type": "numeric"}, # 200, 404, 500
{"name": "timestamp", "type": "numeric"}, # When did this happen?
{"name": "message", "type": "text"}, # Original log message
{"name": "trace_id", "type": "tag"}, # For tracing correlation
]
}
# Create the RedisVL index
self.index = SearchIndex(IndexSchema.from_dict(schema))
self.index.create(overwrite=False) # Don't overwrite if exists
# Anomaly detection threshold
# Cosine distance > 0.7 indicates an anomaly
self.anomaly_threshold = 0.7
The schema design is crucial. We're not just storing vectors; we're creating a searchable space where we can filter by service, time, or severity before applying vector similarity. This contextual search is what makes our anomaly detection intelligent rather than just mathematical.
Processing OTEL Logs into Embeddings
Now let's transform OpenTelemetry logs into embeddings. The key is creating a text representation that preserves the semantic meaning and context:
def process_otel_log(self, log_record: Dict) -> Tuple[bool, float, str]:
"""
Process an OpenTelemetry log record through our anomaly detection pipeline.
This method is the heart of our system. It takes structured OTEL data,
converts it to a semantic representation, and determines if it's anomalous.
Returns:
- is_anomaly: Boolean indicating if this log is anomalous
- anomaly_score: Float between 0 and 1 (higher = more anomalous)
- explanation: Human-readable explanation of the decision
"""
# Step 1: Create a semantic representation of the log
# We're not just concatenating fields - we're creating a narrative
# that captures the meaning and context
log_text = self._create_semantic_text(log_record)
# Step 2: Generate the embedding
# This converts our text into a 768-dimensional vector
# The model understands semantic relationships, so similar events
# produce similar vectors even with different wording
embedding = self.encoder.encode(log_text, convert_to_numpy=True)
# Step 3: Search for similar historical logs
# This is where we determine if this log is normal or anomalous
is_anomaly, score, similar_logs = self._check_anomaly(embedding, log_record)
# Step 4: Store this log for future comparisons
# Every log helps improve our understanding of "normal"
self._store_log(log_record, embedding, score)
# Step 5: Generate explanation
# This helps operators understand why something was flagged
explanation = self._generate_explanation(is_anomaly, score, similar_logs, log_record)
return is_anomaly, score, explanation
def _create_semantic_text(self, log_record: Dict) -> str:
"""
Convert OTEL structured data into semantic text for embedding.
The order and format matter! We're creating a consistent narrative
that helps the embedding model understand context. Think of this as
writing a one-sentence story about what happened.
"""
parts = []
# Start with the service context
service = log_record.get('resource', {}).get('service.name', 'unknown')
parts.append(f"In service {service}")
# Add severity context
severity = log_record.get('severity_text', 'INFO')
if severity == 'ERROR':
parts.append("an error occurred")
elif severity == 'WARN':
parts.append("a warning was raised")
else:
parts.append("an event happened")
# Add HTTP context if present
attributes = log_record.get('attributes', {})
if 'http.method' in attributes:
method = attributes['http.method']
path = attributes.get('http.route', 'unknown')
status = attributes.get('http.status_code', 'unknown')
parts.append(f"during {method} {path} with status {status}")
# Add network context
if 'net.peer.ip' in attributes:
parts.append(f"from IP {attributes['net.peer.ip']}")
# Add the actual message
message = log_record.get('body', 'no message')
parts.append(f": {message}")
# Add trace context for distributed tracing correlation
if 'trace_id' in log_record:
parts.append(f"[trace:{log_record['trace_id'][:8]}]")
return " ".join(parts)
This semantic text creation is more art than science. We're crafting a narrative that preserves the important context while being consistent enough for the embedding model to find patterns. The model has been trained on billions of sentences, so it understands that "error occurred during POST /api/login with status 401" indicates a failed authentication attempt.
Detecting Anomalies with Semantic Search
Here's where Redis really shines. We will implement a fast similarity search to determine if a log is anomalous. We will develop a method that performs anomaly detection while feeding Redis with additional logs for future analysis.
from redisvl.query import VectorQuery
import json
def _check_anomaly(self, embedding: np.ndarray, log_record: Dict) -> Tuple[bool, float, List]:
"""
Determine if a log is anomalous by comparing it to historical patterns.
The core insight: normal logs cluster together in vector space,
while anomalies are outliers. We use k-NN (k-Nearest Neighbors)
to find similar logs and calculate how different this one is.
"""
# Build a vector similarity query with context filters
# We're not searching all logs - we're searching logs from the same service
# in a recent time window for more accurate anomaly detection
service_name = log_record.get('resource', {}).get('service.name', 'unknown')
current_time = datetime.now().timestamp()
one_hour_ago = current_time - 3600
# Create RedisVL vector query
# This combines semantic similarity with metadata filtering
query = VectorQuery(
vector=embedding,
vector_field_name="embedding",
num_results=20, # Find 20 most similar logs
return_fields=["message", "severity", "timestamp", "service_name"],
filter_expression=f"@service_name:{{{service_name}}} @timestamp:[{one_hour_ago} {current_time}]"
)
# Execute the search
results = self.index.search(query)
if len(results) < 5:
# Not enough historical data - can't determine if anomalous
# Default to not anomalous to avoid false positives
return False, 0.0, []
# Calculate anomaly score based on distances to nearest neighbors
distances = []
similar_logs = []
for result in results[:10]: # Use top 10 for scoring
# RedisVL returns cosine distance (0 = identical, 2 = opposite)
distance = float(result['vector_distance'])
distances.append(distance)
similar_logs.append({
'message': result['message'],
'severity': result['severity'],
'distance': distance,
'similarity': 1 - (distance / 2) # Convert to similarity score
})
# Calculate anomaly score using statistics
# We use both mean and minimum distance for robustness
mean_distance = np.mean(distances)
min_distance = np.min(distances)
# Weighted combination - if even the closest log is far, it's very anomalous
anomaly_score = (0.7 * mean_distance + 0.3 * min_distance)
# Determine if anomalous based on threshold
is_anomaly = anomaly_score > self.anomaly_threshold
return is_anomaly, anomaly_score, similar_logs[:3] # Return top 3 for explanation
def _store_log(self, log_record: Dict, embedding: np.ndarray, anomaly_score: float):
"""
Store the processed log with its embedding in RedisVL.
This builds our historical baseline. Every normal log makes our
anomaly detection more accurate by better defining "normal".
"""
# Generate unique ID using timestamp and trace ID
timestamp = log_record.get('timestamp', datetime.now().timestamp())
trace_id = log_record.get('trace_id', 'notrace')
log_id = f"log:{trace_id}:{int(timestamp * 1000)}"
# Prepare document for RedisVL
doc = {
'embedding': embedding, # RedisVL handles serialization
'service_name': log_record.get('resource', {}).get('service.name', 'unknown'),
'severity': log_record.get('severity_text', 'INFO'),
'http_status_code': log_record.get('attributes', {}).get('http.status_code', 0),
'timestamp': timestamp,
'message': log_record.get('body', ''),
'trace_id': trace_id,
'anomaly_score': anomaly_score,
'raw_log': json.dumps(log_record) # Store original for debugging
}
# Store in Redis via RedisVL
self.index.load([doc], keys=[log_id])
The beauty of this approach is its simplicity. We're not training complex models or maintaining elaborate rule sets. We're just asking: "Have we seen something like this before?" If the answer is no (high distance to all historical logs), it's an anomaly worth investigating.
Making Results Actionable
An anomaly detection system is only useful if it provides actionable insights. Let's add explanation generation to this code to help operators understand why something was flagged:
def _generate_explanation(self, is_anomaly: bool, score: float,
similar_logs: List, log_record: Dict) -> str:
"""
Generate human-readable explanations for anomaly decisions.
This is crucial for building trust in the system. Operators need
to understand not just that something is anomalous, but why.
"""
if not is_anomaly:
if similar_logs:
return (f"Normal behavior. Similar to {len(similar_logs)} recent logs "
f"from the same service. Closest match: '{similar_logs[0]['message'][:50]}...' "
f"with {similar_logs[0]['similarity']:.1%} similarity.")
else:
return "Normal behavior. Insufficient historical data for detailed comparison."
# For anomalies, provide detailed explanation
severity_assessment = "High" if score > 0.9 else "Medium" if score > 0.8 else "Low"
explanation_parts = [
f"{severity_assessment} severity anomaly detected (score: {score:.2f}).",
f"This log is significantly different from recent patterns in {log_record.get('resource', {}).get('service.name', 'the service')}."
]
if similar_logs:
# Show what normal looks like for comparison
explanation_parts.append(
f"Most similar normal log: '{similar_logs[0]['message'][:50]}...' "
f"but with only {similar_logs[0]['similarity']:.1%} similarity."
)
else:
explanation_parts.append("No similar logs found in recent history.")
# Add specific indicators if present
attributes = log_record.get('attributes', {})
if attributes.get('http.status_code', 0) >= 500:
explanation_parts.append("Server error status code detected.")
if 'error' in log_record.get('body', '').lower():
explanation_parts.append("Error keyword present in message.")
return " ".join(explanation_parts)
A variation of this implementation could use an LLM to provide more structure using a human-like narrative. But we decided to keep things simple here. Just know the art of the possible.
Testing Everything with an Example
Let's see our anomaly detector in action with real OpenTelemetry logs:
# Initialize the detector
detector = OTELAnomalyDetector()
# Example OTEL logs - some normal, some anomalous
test_logs = [
# Normal authentication log
{
"timestamp": 1699564800.0,
"severity_text": "INFO",
"body": "User login successful for user123",
"resource": {"service.name": "auth-service"},
"attributes": {
"http.method": "POST",
"http.route": "/api/login",
"http.status_code": 200,
"net.peer.ip": "192.168.1.100"
},
"trace_id": "abc123"
},
# Another normal log - similar pattern
{
"timestamp": 1699564860.0,
"severity_text": "INFO",
"body": "Authentication completed for user456",
"resource": {"service.name": "auth-service"},
"attributes": {
"http.method": "POST",
"http.route": "/api/login",
"http.status_code": 200,
"net.peer.ip": "192.168.1.101"
},
"trace_id": "def456"
},
# Suspicious log - SQL injection attempt
{
"timestamp": 1699564920.0,
"severity_text": "ERROR",
"body": "Invalid input detected: '; DROP TABLE users; --",
"resource": {"service.name": "auth-service"},
"attributes": {
"http.method": "POST",
"http.route": "/api/login",
"http.status_code": 400,
"net.peer.ip": "185.220.101.45" # Known malicious IP range
},
"trace_id": "ghi789"
},
# Another anomaly - unusual error pattern
{
"timestamp": 1699564980.0,
"severity_text": "ERROR",
"body": "Database connection pool exhausted - unable to serve requests",
"resource": {"service.name": "auth-service"},
"attributes": {
"http.method": "POST",
"http.route": "/api/login",
"http.status_code": 503,
"net.peer.ip": "192.168.1.102"
},
"trace_id": "jkl012"
}
]
# Process logs and detect anomalies
print("=" * 80)
print("ANOMALY DETECTION RESULTS")
print("=" * 80)
for log in test_logs:
is_anomaly, score, explanation = detector.process_otel_log(log)
print(f"\nLog: {log['body'][:60]}...")
print(f"Timestamp: {datetime.fromtimestamp(log['timestamp'])}")
print(f"Service: {log['resource']['service.name']}")
print(f"Anomaly: {'🔴 YES' if is_anomaly else '🟢 NO'}")
print(f"Score: {score:.3f}")
print(f"Explanation: {explanation}")
print("-" * 40)
When you run this, you'll see the system correctly identify the SQL injection attempt and database error as anomalies, while recognizing the normal authentication logs as expected behavior. The explanations help you understand why each decision was made. Our anomaly detection system works because embeddings capture semantic meaning, not just text patterns.
When we process "Invalid input detected: '; DROP TABLE users; --", the embedding model recognizes this as semantically different from normal authentication logs. It understands that DROP TABLE is a SQL command, that the semicolon and comment markers indicate injection attempts, and that this pattern is inconsistent with successful logins.
The anomaly scores tell us how unusual each log is:
- 0.0 - 0.3: Very normal, closely matches historical patterns
- 0.3 - 0.7: Somewhat unusual but likely benign
- 0.7 - 0.9: Anomalous, worth investigating
- 0.9+: Highly anomalous, immediate attention needed
The contextual filtering (by service and time) is crucial. A "database connection timeout" might be normal for a batch processing service, but anomalous for your authentication service. By comparing logs within the same context, we avoid false positives from cross-service differences.
Monitoring the Monitor
As what happens with any system, you must ensure it is up and running so you can rely on it when you need it the most. As a best practice, you should track key metrics to ensure your anomaly detection system is working correctly:
def get_system_metrics(self) -> Dict:
"""
Get metrics about the anomaly detection system itself.
"""
info = self.index.info()
return {
'total_logs': info.get('num_docs', 0),
'index_memory_mb': info.get('memory_usage_mb', 0),
'avg_embedding_time_ms': self._measure_embedding_speed(),
'avg_search_time_ms': self._measure_search_speed(),
'anomaly_rate': self._calculate_recent_anomaly_rate(),
'index_fragmentation': info.get('fragmentation_ratio', 0)
}
Why This Approach Works?
The combination of OpenTelemetry structure, semantic embeddings, and Redis's fast vector search capabilities creates a powerful anomaly detection system that:
Understands Context: Unlike regex patterns, embeddings understand that "authentication failed" and "login unsuccessful" mean the same thing. This is powerful. Anomalies don't have to be fancy IP masking attempts, as we often see in movies. It's often a log text said differently.
Learns Continuously: Every log processed improves the baseline, making detection more accurate over time. In this case, more data literally means more value.
Scales Efficiently: RedisVL's HNSW index maintains fast search times even with millions of logs stored. Redis's in-memory design enables queries to execute with extremely low latency, while providing the tools to scale out as needed—both vertically and horizontally.
Requires No Training: Pre-trained embedding models work out of the box, no ML expertise required. Often, this is what hurts engineering teams the most, as they require ML expertise from day one to get things working.
Provides Explanations: Operators understand why something was flagged, building trust in the system. They may not be experts, but they are able to capture the essence of what went wrong and take the required actions.
Conclusion
In this blog post, we've built an anomaly detection system that understands the semantic meaning of logs, not just their text patterns. By combining OpenTelemetry's structured observability with embedding-based semantic search in Redis, we can detect novel anomalies that rule-based systems would likely miss.
The beauty of this approach is its simplicity. We're not training complex models or maintaining hundreds of rules. We're just asking, "Is this log semantically similar to what we've seen before?" When the answer is no, we've found an anomaly worth investigating. You can verify this by deploying this implementation for one critical service and allowing it to learn for a week. You'll be surprised how quickly it starts catching issues your existing monitoring misses. As you gain confidence, expand to more services, each maintaining its own baseline of normal behavior.
The future of observability isn't about writing more alerts or hiring more analysts. It's about systems that understand meaning, learn patterns, and surface what truly matters. By using OpenTelemetry for structure, embeddings for understanding, and RedisVL for scale, that future is accessible to any engineering team today.

Top comments (0)