DEV Community

Thesius Code
Thesius Code

Posted on • Originally published at datanest-stores.pages.dev

Security Monitoring Setup

Security Monitoring Setup

A complete security monitoring stack that turns raw logs into actionable threat intelligence. Provides SIEM configurations, log correlation rules, threat detection queries, alerting pipelines, and pre-built dashboards — giving you mature SOC detection capability without months of custom development.

Key Features

  • SIEM Configuration Templates — Ready-to-deploy configs for log ingestion, parsing, and indexing supporting syslog, JSON, and CEF formats.
  • Log Correlation Rules — 40+ detection rules covering brute force, lateral movement, privilege escalation, and data exfiltration.
  • Threat Detection Queries — Pre-written search queries mapped to the MITRE ATT&CK framework.
  • Alerting Pipeline — Severity-based routing with deduplication, suppression, and escalation chains.
  • Security Dashboards — JSON dashboard definitions for authentication events, network traffic, and incidents.
  • Log Source Onboarding — Guides for firewalls, web servers, databases, and cloud platforms.
  • Baseline Generator — Python scripts establishing normal behavior baselines for anomaly detection.

Quick Start

# Extract the monitoring setup
unzip security-monitoring-setup.zip
cd security-monitoring-setup/

# Deploy correlation rules
cp rules/*.yml /etc/siem/rules/
python3 scripts/validate_rules.py --rules-dir /etc/siem/rules/

# Generate behavioral baseline from existing logs
python3 scripts/baseline_generator.py --log-dir /var/log/ --output baseline.json

# Import dashboards
python3 scripts/import_dashboards.py --config configs/production.yaml
Enter fullscreen mode Exit fullscreen mode

Correlation Rule Format

# rules/brute_force_detection.yml
rule:
  name: "Brute Force Authentication Attack"
  id: "DET-001"
  severity: "high"
  mitre_attack:
    tactic: "Credential Access"
    technique: "T1110.001"
    name: "Brute Force: Password Guessing"
  description: >
    Detects multiple failed authentication attempts from a single
    source IP within a short time window, indicating a brute force
    or credential stuffing attack.
  condition:
    event_type: "authentication"
    status: "failure"
    group_by: "source_ip"
    threshold: 10
    window: "5m"
  alert:
    channels: ["security_team", "soc_dashboard"]
    suppress_for: "15m"
    include_fields:
      - source_ip
      - target_user
      - failure_reason
      - geo_location
Enter fullscreen mode Exit fullscreen mode

Architecture / How It Works

Log Sources ──► Collector ──► Parser ──► Index
(firewalls,     (syslog,     (normalize    │
 servers,        agent)       to schema)   │
 cloud)                                    │
         ┌─────────────────────────────────┤
         ▼                                 ▼
  Correlation Engine              Search / Hunt
  (rules + baselines)             (ad-hoc MITRE queries)
         │                               │
         ▼                               ▼
  Alert Pipeline ──► Incident     Threat Dashboard
  (route/suppress)   Response     (real-time viz)
Enter fullscreen mode Exit fullscreen mode

Usage Examples

Behavioral Baseline Generator

import json
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Any

class BaselineGenerator:
    """Generate behavioral baselines for anomaly detection."""

    def __init__(self, lookback_days: int = 30):
        self.lookback_days = lookback_days
        self.user_activity: dict[str, dict[str, Any]] = defaultdict(
            lambda: {"login_hours": defaultdict(int), "source_ips": defaultdict(int)}
        )

    def process_auth_log(self, entry: dict[str, Any]) -> None:
        user = entry.get("username", "unknown")
        hour = datetime.fromisoformat(entry["timestamp"]).hour
        self.user_activity[user]["login_hours"][hour] += 1
        self.user_activity[user]["source_ips"][entry.get("source_ip", "unknown")] += 1

    def detect_anomaly(self, event: dict[str, Any]) -> dict[str, Any] | None:
        user = event.get("username", "unknown")
        if user not in self.user_activity:
            return {"type": "new_user", "user": user, "risk": "medium"}
        profile = self.user_activity[user]
        anomalies = []
        hour = datetime.fromisoformat(event["timestamp"]).hour
        if profile["login_hours"].get(hour, 0) == 0:
            anomalies.append(f"unusual_hour:{hour}")
        ip = event.get("source_ip", "")
        if ip and ip not in profile["source_ips"]:
            anomalies.append(f"new_ip:{ip}")
        return {"user": user, "anomalies": anomalies, "risk": "high"} if anomalies else None

    def export_baseline(self, output_path: str) -> None:
        data = {u: {k: dict(v) for k, v in p.items()} for u, p in self.user_activity.items()}
        Path(output_path).write_text(json.dumps(data, indent=2))
Enter fullscreen mode Exit fullscreen mode

MITRE ATT&CK Detection Queries

queries:
  - name: "Credential Dumping (T1003)"
    query: 'process_name:("mimikatz" OR "procdump") OR command_line:("lsass" AND "dump")'
    severity: critical

  - name: "Scheduled Task Persistence (T1053)"
    query: 'process_name:"schtasks.exe" AND command_line:("/create" AND "/sc")'
    severity: high

  - name: "DNS Tunneling (T1071.004)"
    query: 'event_type:dns_query AND query_length:>50 AND query_type:"TXT"'
    severity: high
Enter fullscreen mode Exit fullscreen mode

Configuration

Parameter Default Description
ingestion.buffer_size 1000 Events buffered before flush
correlation.window_default 5m Default correlation time window
alerting.dedup_window 15m Suppress duplicate alerts
alerting.escalation_delay 30m Unacked alert escalation time
baseline.lookback_days 30 Days for baseline calculation
baseline.anomaly_threshold 2.0 Std deviations for anomaly

Best Practices

  1. Normalize before you correlate — Map all sources to a common schema before writing detection rules.
  2. Start with high-fidelity rules — Deploy 10 rules with low false-positive rates before adding 100 noisy ones.
  3. Baseline before you alert — Run baselines for 30 days before enabling anomaly detection.
  4. Test with red team data — Inject known-bad events to verify rules fire correctly.
  5. Retain logs sufficiently — 90 days hot (searchable), 1 year cold. Many APTs dwell for months. ## Troubleshooting
Problem Cause Fix
Correlation rules not firing Log timestamps in wrong timezone Normalize all timestamps to UTC at ingestion; check ingestion.timezone
Dashboard shows no data Index pattern doesn't match log index name Verify index name in configs/production.yaml matches your SIEM setup
Too many false positive alerts Detection thresholds too sensitive Increase thresholds gradually; add allowlists for known-good patterns
Baseline generator OOM on large datasets Loading all logs into memory Use --sample-rate 0.1 to baseline on 10% of events

This is 1 of 9 resources in the Security Engineer Pro toolkit. Get the complete [Security Monitoring Setup] with all files, templates, and documentation for $49.

Get the Full Kit →

Or grab the entire Security Engineer Pro bundle (9 products) for $119 — save 30%.

Get the Complete Bundle →


Related Articles

Top comments (0)