Advanced Python Script: AI-Powered Network Anomaly Detector with Real-Time Visualization
This script combines:
Real-time network traffic analysis using scapy.
Machine learning-based anomaly detection using scikit-learn.
Real-time visualization using matplotlib and plotly.
Automated reporting using pandas and email libraries.
The script monitors network traffic, detects anomalies (e.g., unusual traffic patterns), and generates real-time visualizations and email alerts.
import time
import pandas as pd
import numpy as np
from scapy.all import sniff, IP, TCP
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt
import plotly.express as px
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from threading import Thread
# Global variables
network_data = []
anomalies = []
model = IsolationForest(contamination=0.01) # Anomaly detection model
# Email configuration
EMAIL_HOST = 'smtp.gmail.com'
EMAIL_PORT = 587
EMAIL_USER = 'your_email@gmail.com'
EMAIL_PASSWORD = 'your_password'
ALERT_EMAIL = 'recipient_email@example.com'
def capture_traffic(packet):
"""
Capture network traffic and extract features.
"""
if IP in packet:
src_ip = packet[IP].src
dst_ip = packet[IP].dst
protocol = packet[IP].proto
length = len(packet)
timestamp = time.time()
# Append to network data
network_data.append([timestamp, src_ip, dst_ip, protocol, length])
def detect_anomalies():
"""
Detect anomalies in network traffic using Isolation Forest.
"""
global network_data, anomalies
while True:
if len(network_data) > 100: # Wait for enough data
df = pd.DataFrame(network_data, columns=['timestamp', 'src_ip', 'dst_ip', 'protocol', 'length'])
X = df[['protocol', 'length']].values
# Train the model and predict anomalies
model.fit(X)
preds = model.predict(X)
df['anomaly'] = preds
# Extract anomalies
anomalies = df[df['anomaly'] == -1]
if not anomalies.empty:
print("Anomalies detected:")
print(anomalies)
send_alert_email(anomalies)
visualize_anomalies(anomalies)
# Clear old data
network_data = network_data[-100:] # Keep last 100 entries
time.sleep(10) # Check for anomalies every 10 seconds
def visualize_anomalies(anomalies):
"""
Visualize anomalies using Plotly.
"""
fig = px.scatter(anomalies, x='timestamp', y='length', color='protocol',
title='Network Anomalies Detected')
fig.show()
def send_alert_email(anomalies):
"""
Send an email alert with detected anomalies.
"""
msg = MIMEMultipart()
msg['From'] = EMAIL_USER
msg['To'] = ALERT_EMAIL
msg['Subject'] = 'Network Anomaly Alert'
body = "The following network anomalies were detected:\n\n"
body += anomalies.to_string()
msg.attach(MIMEText(body, 'plain'))
try:
server = smtplib.SMTP(EMAIL_HOST, EMAIL_PORT)
server.starttls()
server.login(EMAIL_USER, EMAIL_PASSWORD)
server.sendmail(EMAIL_USER, ALERT_EMAIL, msg.as_string())
server.quit()
print("Alert email sent.")
except Exception as e:
print(f"Failed to send email: {e}")
def start_capture():
"""
Start capturing network traffic.
"""
print("Starting network traffic capture...")
sniff(prn=capture_traffic, store=False)
if __name__ == "__main__":
# Start traffic capture in a separate thread
capture_thread = Thread(target=start_capture)
capture_thread.daemon = True
capture_thread.start()
# Start anomaly detection
detect_anomalies()
How It Works
Network Traffic Capture:
The script uses scapy to capture live network traffic and extract features like source IP, destination IP, protocol, and packet length.
Anomaly Detection:
It uses the Isolation Forest algorithm from scikit-learn to detect unusual patterns in network traffic.
Real-Time Visualization:
Detected anomalies are visualized in real-time using plotly.
Email Alerts:
If anomalies are detected, the script sends an email alert with details.
Multi-threading:
Traffic capture and anomaly detection run in separate threads for efficiency.
Top comments (0)