DEV Community

Sachin Singhal
Sachin Singhal

Posted on

Fampay Solution#1

  • AI-Powered Severity Analysis: Classifies logs using BERT.
  • Anomaly Detection: Identifies unusual patterns with KMeans clustering.
  • Self-Healing Automation: Executes scripts to mitigate critical issues.
  • Slack Integration: Sends real-time notifications for critical logs.
  • Prometheus Monitoring: Tracks bugs and RCA metrics.

import logging
import os
import json
import pandas as pd
import numpy as np
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
from transformers import AutoTokenizer, TFAutoModel
from tensorflow.keras.models import Model
from sklearn.cluster import KMeans
from causalinference import CausalModel
from prometheus_client import start_http_server, Counter
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import subprocess
import requests

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

bug_detected_counter = Counter("bug_detected", "Number of bugs detected in logs")
root_cause_counter = Counter("root_cause_analysis", "Number of root causes identified")

KAFKA_BROKER = os.getenv("KAFKA_BROKER", "localhost:9092")
TOPIC = os.getenv("KAFKA_TOPIC", "application_logs")

consumer = KafkaConsumer(
TOPIC,
bootstrap_servers=[KAFKA_BROKER],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

ELASTIC_HOST = os.getenv("ELASTIC_HOST", "http://localhost:9200")
es = Elasticsearch([ELASTIC_HOST])

SLACK_TOKEN = os.getenv("SLACK_TOKEN")
SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "#bug-notifications")
slack_client = WebClient(token=SLACK_TOKEN)

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
bert_model = TFAutoModel.from_pretrained("bert-base-uncased")

def root_cause_analysis(data):
logging.info("Performing Root Cause Analysis...")
causal = CausalModel(
Y=data["severity_label"].values,
D=data["source_system"].values, # Adjust column names as per your logs
X=data.drop(columns=["severity_label", "source_system"]).values
)
causal.est_via_matching()
root_cause_counter.inc()
return causal.estimates

def cluster_anomalies(logs):
logging.info("Clustering logs for anomalies...")
kmeans = KMeans(n_clusters=3, random_state=42)
clusters = kmeans.fit_predict(logs)
anomalies = logs[clusters == 2] # Example: Cluster 2 as anomalies
return anomalies

Notification System

def notify_slack(message):
try:
slack_client.chat_postMessage(channel=SLACK_CHANNEL, text=message)
except SlackApiError as e:
logging.error(f"Slack API error: {e.response['error']}")

def run_self_healing_script(script_path):
logging.info(f"Running self-healing script: {script_path}")
try:
subprocess.run(["bash", script_path], check=True)
except subprocess.CalledProcessError as e:
logging.error(f"Error executing self-healing script: {e}")

def process_log(log):
try:
message = log['message']
severity = log['severity']
source = log['source']

    # Tokenize for Model Inference
    tokens = tokenizer([message], padding=True, truncation=True, max_length=128, return_tensors="tf")
    embeddings = bert_model.predict({"input_ids": tokens["input_ids"], "attention_mask": tokens["attention_mask"]})
    severity_label = np.argmax(embeddings[0][0])


    es.index(index="bug_logs", body={
        "message": message,
        "severity": severity,
        "severity_label": int(severity_label),
        "source": source
    })

    # Notification & Healing
    if severity_label >= 2:  # ERROR or CRITICAL
        notify_slack(f"Bug Detected: {message}, Severity: {severity_label}")
        run_self_healing_script("/path/to/healing_script.sh")  # Customize path

    bug_detected_counter.inc()
except Exception as e:
    logging.error(f"Error processing log: {e}")
Enter fullscreen mode Exit fullscreen mode

def process_logs_stream():
logging.info("Starting log stream processing...")
for message in consumer:
log = message.value
process_log(log)

def generate_reports():
logging.info("Generating advanced reports...")
logs = es.search(index="bug_logs", body={"query": {"match_all": {}}}, size=10000)
data = pd.DataFrame([log['_source'] for log in logs['hits']['hits']])
anomaly_logs = cluster_anomalies(data)
anomaly_logs.to_csv("anomaly_logs.csv", index=False)

if name == "main":
# Start Prometheus Monitoring
start_http_server(8000)

# Process Logs from Kafka
process_logs_stream()
Enter fullscreen mode Exit fullscreen mode

Top comments (0)