Wazuh is a robust open-source security platform, but it doesn't include native support for Telegram alerts. This guide walks you through a simple method to send alerts, like SSH login attempts, to Telegram using a custom integration script.
π οΈ Prerequisites
- A Telegram bot and your chat ID (official guide, or use the quick setup below)
- Wazuh 4.12 installed
- Basic Linux shell and Python knowledge
How to Create a Telegram Bot
Creating a Telegram bot is quick and easy with Telegramβs official BotFather:
- Open Telegram and search for @botfather.
- Start a conversation and send
/start
to begin. - Create your bot with the command
/newbot
. - Follow the instructions:
- Provide a display name for your bot.
- Choose a unique username that ends with
bot
(e.g.,WazuhAlertBot
).
- BotFather will then return your bot token, which looks like this:
123456789:ABCdefGhIJKlmNoPQRstUVwxyz1234567890
. - Keep this token safe, it's required to send messages from your bot.
You can now find your bot on Telegram using its username and send it a message to start.
How to Get Your Telegram Chat ID
To send alerts to Telegram, you'll need your chat ID. The script below fetches your chat ID by reading the latest message sent to your bot.
Steps
- Replace the
bot_token
in the script with the token you got from BotFather. - Send any message to your bot via Telegram (e.g., "hello").
- Run the script below using Python.
- Your chat ID will be printed in the terminal.
Python Script to Retrieve Chat ID
import requests
bot_token = "" # Replace with your bot token
def get_chat_id():
url = f"https://api.telegram.org/bot{bot_token}/getUpdates"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
if "result" in data and len(data["result"]) > 0:
# Get chat ID from the last message
chat_id = data["result"][-1]["message"]["chat"]["id"]
print(f"Your Chat ID is: {chat_id}")
else:
print("No messages found. Please send a message to your bot first.")
else:
print(f"Failed to get updates: {response.status_code}")
if __name__ == "__main__":
get_chat_id()
π¦ Step 1: Set up Python Virtual Environment in Wazuh
Create a Python virtual environment inside the Wazuh directory (/var/ossec
) to avoid permission issues and package conflicts.
mkdir -p /var/ossec/venv
python3 -m venv /var/ossec/venv
source /var/ossec/venv/bin/activate
pip install requests # or any needed packages
π Step 2: Create Telegram Integration Script
Name the script with prefix custom-
.
Location: /var/ossec/integrations/custom-telegram.py
#!/var/ossec/venv/bin/python
import json
import re
import sys
from datetime import datetime
import requests
# Telegram Bot credentials
bot_token = "" # Replace with your Telegram bot token
chat_id = "" # Replace with your Telegram chat ID
# === Excluded Wazuh Rule IDs ===
excluded_rules: list = [] # Example: ["1002", "5715", "18107"]
def escape_markdown_v2(text):
if not isinstance(text, str):
text = str(text)
# Escape special MarkdownV2 characters
escape_chars = r"_*[]()~`>#+-=|{}.!"
return re.sub(r"([%s])" % re.escape(escape_chars), r"\\\1", text)
def main():
if len(sys.argv) < 2:
print("[ERROR] No alert file path provided.")
sys.exit(1)
alert_file = sys.argv[1]
try:
with open(alert_file, "r") as f:
alert = json.load(f)
except Exception as e:
print(f"[ERROR] Failed to read or parse alert JSON file: {e}")
sys.exit(1)
rule_id = alert.get("rule", {}).get("id", None)
if rule_id in excluded_rules:
print(f"[INFO] Skipping excluded rule ID: {rule_id}")
sys.exit(0)
# Extract fields
data = alert.get("data", {})
srcuser = data.get("srcuser") or data.get("dstuser") or "unknown"
srcport = data.get("srcport", "unknown")
srcip = alert.get("data", {}).get("srcip", "unknown")
agent_name = alert.get("agent", {}).get("name", "unknown")
alert_level = alert.get("rule", {}).get("level", "unknown")
rule_id = alert.get("rule", {}).get("id", "unknown")
description = alert.get("rule", {}).get("description", "No description")
full_log = alert.get("full_log", "No full log available")
timestamp_raw = alert.get("timestamp", "unknown")
# Convert timestamp to human readable format (date and time to seconds)
try:
# Try parsing ISO8601 format
dt = datetime.fromisoformat(timestamp_raw.replace("Z", "+00:00"))
timestamp = dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
timestamp = timestamp_raw # fallback if parsing fails
# Build message
message = (
"*π¨ Wazuh Alert Notification*\n\n"
f"π *Time:* `{escape_markdown_v2(timestamp)}`\n"
f"π€ *Username:* `{escape_markdown_v2(srcuser)}`\n"
f"π *Source IP:* `{escape_markdown_v2(srcip)}`\n"
f"πͺ *Source Port:* `{escape_markdown_v2(srcport)}`\n"
f"π» *Agent:* {escape_markdown_v2(agent_name)}\n\n"
f"π *Rule ID:* `{escape_markdown_v2(rule_id)}`\n"
f"π§± *Level:* *{escape_markdown_v2(alert_level)}*\n\n"
f"π *Description:*\n{escape_markdown_v2(description)}\n\n"
f"π *Full Log:*\n{escape_markdown_v2(full_log)}"
)
vuln = alert.get("vulnerability", {})
cve_id = vuln.get("cve", "")
cve_title = vuln.get("title", "")
cve_url = f"https://cti.wazuh.com/vulnerabilities/cves/{cve_id}" if cve_id else ""
if cve_id:
message += (
f"\n\n*π‘οΈ CVE:* `{escape_markdown_v2(cve_id)}`\n"
f"*π Details:* {escape_markdown_v2(cve_title)}\n"
f"[π View in CTI]({escape_markdown_v2(cve_url)})"
)
# Send to Telegram
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
payload = {"chat_id": chat_id, "text": message, "parse_mode": "MarkdownV2"}
response = requests.post(url, json=payload)
if response.status_code != 200:
print(f"[ERROR] Telegram response: {response.status_code} - {response.text}")
if __name__ == "__main__":
main()
π§ Step 3: Create Shell Wrapper Script
Wazuh calls shell scripts, so create a simple wrapper to call the Python script. It should have the same name of the python file without the .py
extension.
Location: /var/ossec/integrations/custom-telegram
#!/bin/bash
/var/ossec/venv/bin/python3 /var/ossec/integrations/custom-telegram.py "$@"
Make both scripts executable and set proper ownership:
chmod 750 /var/ossec/integrations/custom-telegram*
chown root:wazuh /var/ossec/integrations/custom-telegram*
Step 4: Configure Integration in ossec.conf
- Ensure that
logall_json
is enabled by setting it toyes
in yourossec.conf
file:
<ossec_config>
<global>
<jsonout_output>yes</jsonout_output>
<alerts_log>yes</alerts_log>
<logall>yes</logall>
<logall_json>yes</logall_json>
</global>
</ossec_config>
- Add the integration block in
/var/ossec/etc/ossec.conf
: - The
<name>
tag value (custom-telegram
) must exactly match the name of your wrapper script.
<ossec_config>
<integration>
<name>custom-telegram</name>
<level>7</level>
<alert_format>json</alert_format>
</integration>
</ossec_config>
Setting level to 7
means only alerts with level 7 or higher trigger the Telegram alert.
π Step 5: Elevate SSH Login Rule Level in Wazuh
By default, successful SSH login events (SID 5715
) have a low alert level and do not trigger integrations. To resolve this, create or update the file /var/ossec/etc/rules/local_rules.xml
with the following content:
<group name="local,syslog,sshd,">
<rule id="100011" level="7" overwrite="yes">
<if_sid>5715</if_sid>
<description>sshd: Successful SSH login with elevated level.</description>
<group>authentication_success,pci_dss_10.2.5,</group>
</rule>
</group>
This configuration overrides the level of rule ID 5715 and raises it to level 7, ensuring it triggers and sends alerts to Telegram.
After making these changes, restart the Wazuh manager to apply them:
systemctl restart wazuh-manager
β Step 6: Test the Integration
Test manually with an alert JSON file:
/var/ossec/integrations/custom-telegram /var/ossec/logs/alerts/alerts.json
Or wait for a real SSH login alert to be sent to Telegram.
Related Projects
- Wazuh Slack Integration β Send Wazuh alerts to Slack using a similar method.
π Result
Receive real-time Telegram alerts for SSH logins and other critical events with detailed, nicely formatted messages, perfect for quick incident response.
If you found this useful or have questions, feel free to comment. Happy monitoring! π¨βπ»π±
Created with β€οΈ by 0xdolan | GitHub | View this guide on GitHub
Top comments (0)