DEV Community

Carlos Armando Marcano Vargas
Carlos Armando Marcano Vargas

Posted on • Originally published at carlosmv.hashnode.dev on

How to Build a Application Monitoring System with FastAPI and RabbitMQ | Python

In this article, we will build a service monitoring system using FastAPI and RabbitMQ. The system will monitor the web traffic of various web services and microservices. This is just for educational purposes, and is not a replacement for more robust, reliable and industry-used software like OpenTelemetry, Prometheus, etc.

When a service is called, it will publish a message to the RabbitMQ queue. A consumer will be listening to that queue and store the tracking information of the service in a database. We will build a FastAPI to collect the monitoring data. This system will show information like IP address, request URL, port, path, method, browser, OS, time, etc.

Requirements

  • Python installed

  • Basic Python knowledge

  • Pip installed

  • Postgres installed

  • RabbitMQ installed (Download from here)

Building the Visitor Tracker

First, we create a directory for this application.

mkdir visitor_tracker
cd
#Windows users
py -m venv venv
cd venv/Scripts
./activate

#Linux
python3 -m venv venv
source venv/bin/activate

Enter fullscreen mode Exit fullscreen mode

We install all the dependencies we need.

pip install fastapi psycopg2-binary python-dotenv aio_pika

Enter fullscreen mode Exit fullscreen mode

Inside the visitor_tracker directory, we create a new file, init_db.py.

init_db.py

import os
import psycopg2
from dotenv import load_dotenv

load_dotenv()
USER = os.getenv('USER')
PASSWORD = os.getenv('PASSWORD')

def get_db_connection():
    conn = psycopg2.connect(
        dbname = "logs_db",
        user = "postgres",
        password = PASSWORD
    )
    return conn

conn = get_db_connection()
cur = conn.cursor()

cur.execute('DROP TABLE IF EXISTS logs;')
cur.execute('CREATE TABLE logs (id serial PRIMARY KEY,'
                                 'ip_address varchar (150) NOT NULL,'
                                 'request_url varchar (50) NOT NULL,'
                                 'request_port integer NOT NULL,'
                                 'request_path varchar (50) NOT NULL,'
                                 'request_method varchar (50) NOT NULL,'
                                 'browser_type varchar (150) NOT NULL,'
                                 'request_time timestamp (50) NOT NULL,'
                                 'service_name varchar (150) NOT NULL,'

                                 'date_added date DEFAULT CURRENT_TIMESTAMP);'
                                 )

cur.execute('INSERT INTO logs (ip_address,'
                                 'request_url,'
                                 'request_port,'
                                 'request_path,'
                                 'request_method,'
                                 'browser_type,'
                                 'request_time,'
                                 'service_name)'
                                 'VALUES (%s, %s, %s, %s, %s, %s, %s, %s)',
            ('127.0.0.1',
             'http://localhost:8000',
             8000,
             "/",
             "GET",
             "Chrome",
             "2023-06-25T16:03:24.722256",
             "Test_data_service"
             )
            )

conn.commit()

cur.close()
conn.close()

Enter fullscreen mode Exit fullscreen mode

Here we setting up a database connection to store log data. First, we load the .env file using dotenv to get the database username and password variables. Then, we define a get_db_connection() function that establishes a connection to a PostgreSQL database named logs_db. Then, the code calls that function to get a database connection and cursor.

In this file, also the code drops the logs table if it exists and recreates it with the given schema - with columns to store IP address, request URL, port, path, method, browser, OS, time etc. It inserts sample log data into the table with its values. It commits the changes to the database and closes the cursor and connection.

helpers.py

import collections

def to_dict(psycopg_tuple:tuple):
    tracker = collections.OrderedDict()
    tracker['id'] = psycopg_tuple[0]

    tracker["ip_address"] = psycopg_tuple[1]
    tracker["request_url"] = psycopg_tuple[2]
    tracker["request_port"] = psycopg_tuple[3]
    tracker["request_path"] = psycopg_tuple[4]
    tracker["request_method"] = psycopg_tuple[5]
    tracker["browser_type"] = psycopg_tuple[6]
    tracker["request_time"] = psycopg_tuple[7].strftime("%d-%m-%Y, %H:%M:%S")
    tracker["service_name"] = psycopg_tuple[8]
    return tracker

def list_dict(rows:list):

    row_list = []
    for row in rows:
        book_dict = to_dict(row)
        row_list.append(book_dict)

    return row_list

Enter fullscreen mode Exit fullscreen mode

This file has two functions: to_dict() and list_dict(). The to_dict() function converts a PostgreSQL tuple to a dictionary. The list_dict() function converts a list of PostgreSQL tuples to a list of dictionaries.

The to_dict() function takes a PostgreSQL tuple as input and returns a dictionary. The dictionary contains the values of the tuple in the same order as the tuple. The list_dict() function takes a list of PostgreSQL tuples as input and returns a list of dictionaries. The dictionaries are created using the to_dict() function.

controllers.py

from init_db import get_db_connection
from helpers import to_dict,list_dict
import json

def all_logs():
    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute('SELECT * FROM logs;')
    logs = list_dict(cur.fetchall())
    cur.close()
    conn.close()

    return logs

def new_log(ip_address: str,
         request_url: str,
         request_port: int,
         request_path: str,
         request_method: str,
         browser_type: str,
         request_time: str,
         service_name:str,):

    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute('INSERT INTO logs (ip_address, request_url, request_port, request_path, request_method, browser_type, request_time, service_name)'
                    'VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING *;',(ip_address,
                                                    request_url,
                                                    request_port,
                                                    request_path,
                                                    request_method,
                                                    browser_type,
                                                    request_time,
                                                    service_name))

    log = cur.fetchone()[:]
    log_dict = to_dict(log)
    conn.commit()
    cur.close()
    conn.close()

    return json.dumps(log_dict)

Enter fullscreen mode Exit fullscreen mode

The all_logs() function gets all logs from the database and returns a list of dictionaries. Each dictionary contains information about a single log.

The new_log() function inserts a new log into the database.

consumer.py


from controllers import new_log
import json    

import aio_pika  
import ast

async def on_message(message: aio_pika.IncomingMessage):
    tracker = ast.literal_eval(message.body.decode("utf-8"))

    new_log(tracker["ip_address"], tracker["request_url"], tracker["request_port"],
                        tracker["request_path"], tracker["request_method"],
                        tracker["browser_type"],tracker["request_time"], tracker["service_name"])

Enter fullscreen mode Exit fullscreen mode

In the consumer.py file we write the code responsible for receiving the messages from the RabbitMQ queue. The on_message() function will be called when a message is received. It parses the JSON data from the message body and calls the new_log() function to add the data to the database.

app.py

from fastapi import FastAPI
from controllers import all_logs, new_log
import json    
import asyncio
import aio_pika  

app = FastAPI()

@app.on_event('startup')
async def startup():
    loop = asyncio.get_event_loop()
    connection = await aio_pika.connect("amqp://guest:guest@localhost/", loop = loop)
    channel = await connection.channel()
    queue = await channel.declare_queue("logs")
    await queue.consume(on_message)

@app.get("/logs")
async def receiver():
    logs = all_logs()
    return logs

@app.get("/")
async def hello():

    return "hello"

Enter fullscreen mode Exit fullscreen mode

On startup, we create a connection to the AMQP broker and declare a name of the queue, in this case, "logs". And consume the messages from that queue using the on_message() function.

Then, we create an endpoint with the path "/logs" to show all the logs stored in the database.

To test this service, we create a new file, sender.py.

import pika
import json

tracker = {
        "ip_address": '127.0.0.1',
        "request_url": 'http://localhost:8000',
        "request_port": 8000,
        "request_path": "/",
        "request_method": "GET",
        "request_time": "2023-06-25T16:03:24.722256",
        "browser_type": "Firefox",
        "operating_system": "Windows 11",
        "service_name": "Fastapi_service",
    }

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='logs')

channel.basic_publish(exchange='', routing_key='logs', body=json.dumps(tracker))
print(" [x] Sent 'Logs'")
connection.close()

Enter fullscreen mode Exit fullscreen mode

We run this file, python3 sender.py. It will show [x] Sent 'Logs' in our command line. And in the command line that is showing the logs of the visitor tracker, it will appear the data is received from the sender.py.

Publisher

The publisher will be the service we want to track its activity.

sender.py

import pika

def sender(body: dict):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='logs')

    channel.basic_publish(exchange='', routing_key='logs', body=body)
    print(" [x] Sent 'Logs'")
    connection.close()

Enter fullscreen mode Exit fullscreen mode

In the sender.py file, we define the sender() function, it will receive a dictionary, that will be the tracking information. It creates a connection to RabbitMQ, declares a queue, and publishes the body (dictionary with the tracking information) to the queue.

middleware.py

from fastapi import Request
from datetime import datetime

class Tracker:
    def __init__ (self, service_name: str):
        self.service_name = service_name

    def visitor_tracker(self, request: Request):
        ip_address = request.client.host
        request_url = request.url._url
        request_port = request.url.port
        request_path = request.url.path
        request_method = request.method
        browser_type = request.headers["User-Agent"]
        request_time = str(datetime.now())
        service_name = self.service_name

        return {
            "ip_address": ip_address,
            "request_url": request_url,
            "request_port": request_port,
            "request_path": request_path,
            "request_method": request_method,
            "request_time": request_time,
            "browser_type": browser_type,
            "service_name": service_name,
        }

Enter fullscreen mode Exit fullscreen mode

In the middleware.py file, we create the Tracker class that takes the service_name as an argument in its constructor. It has a visitor_tracker() method which takes a FastAPI Request object as an argument. It extracts various information from the request like IP address, URL, port, request path, method (GET, POST etc.), browser type from the User-Agent header, request time, and the service name passed in the constructor. This method returns a dictionary containing all this information which can be logged to track visitor requests.

main.py

from fastapi import FastAPI, Request
from datetime import datetime
from middleware import Tracker
from sender import sender
import json
app = FastAPI()

@app.middleware("tracker")
async def tracker(request: Request, call_next):
    service_tracker = Tracker("service_one")
    tracker = str(service_tracker.visitor_tracker(request))

    sender(tracker)
    response = await call_next(request)

    return response

@app.get("/")
def index():
    return "Hello, world"

@app.get("/json")
def some_func():
    return {
        "some_json": "Some Json"
    }

if __name__ == " __main__":
    app.run(debug=True)

Enter fullscreen mode Exit fullscreen mode

In this file, we define a middleware function named "tracker" using the @app.middleware decorator. This middleware will run for every request. The middleware function creates an instance of the Tracker class named service_tracker passing "service_one" as the service name, calls the visitor_tracker() method of the Tracker instance and passes the FastAPI Request object. This gives us the tracking information and calls the sender() function passing the tracking information and sending it to the RabbitMQ queue. This is used to log or send the tracking data somewhere and calls the next route function using call_next and returns its response.

We define two routes. If we make a request to these routes, its information will be collected by the middleware and sent to the RabbitMQ queue.

Now, we run this service simultaneously with the visitor tracker.

We navigate through one of the endpoints with a browser or request with an HTTP client.

We will see this response from the visitor tracker:

Building a UI

We will create a React app to show a table with all the tracking information.

Adding CORS

app.py

from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

origins = ["*"]
methods = ["GET", "POST", "PUT", "DELETE"]
headers = ["*"]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_methods=methods,
    allow_headers=headers,
)
...
Enter fullscreen mode Exit fullscreen mode

Installing Vite and React

In our command line, we install Vite with a React-Typescript template. This command line will create a folder for the project.

#npm
npm create vite@latest table -- --template react-ts

#yarn
yarn create vite@latest table --template react-ts
#pnpm
pnpm create vite@latest table --template react-ts

Enter fullscreen mode Exit fullscreen mode

After all the packages are installed we run Vite with the command:

npm run dev

Enter fullscreen mode Exit fullscreen mode

We go to localhost:5173 and should see the Vite and React homepage.

app.ts

import React, { useState, useEffect} from "react";

const url = "http://localhost:8000/logs";

interface Table {
  id: number,
  ip_address: string,
  request_url: string,
  request_port: string,
  request_path: string,
  request_method: string,
  request_time: string,
  browser_type: string,
  service_name: string,
}

const Table: React.FC = () => {
    const [data, setData] = useState<Table[]>([]);

    useEffect(() => {
      fetch(url)  
        .then(res => res.json())
        .then(data => setData(data));
        console.log(data);
    }, []);

    return (
        <div>
          <h1>Logs</h1>
          <table>
            <thead>
                <tr>
                  <th>Id</th>
                  <th>IP Address</th>
                  <th>Request URL</th>
                  <th>Request Port</th>
                  <th>Request Path</th>
                  <th>Request Method</th>
                  <th>Request Time</th>
                  <th>Browser Type</th>
                  <th>Service Name</th>
                </tr>
              </thead>  

            <tbody>
            {data.map((item, index) => (
              <tr key={index}> 
                <td>{item.id}</td>  
                <td>{item.ip_address}</td>     
                <td>{item.request_url}</td>        
                <td>{item.request_port}</td>  
                <td>{item.request_path}</td>
                <td>{item.request_method}</td>
                <td>{item.request_time}</td>
                <td>{item.browser_type}</td>  
                <td>{item.service_name}</td>   
              </tr>
            ))}
          </tbody>  

          </table>

        </div>

      );

};

export default Table;

Enter fullscreen mode Exit fullscreen mode

Let's add a .css file to add style and see the data easily.

index.css

:root {
  font-family: Inter, system-ui, Avenir, Helvetica, Arial, sans-serif;
  line-height: 1.5;
  font-weight: 400;

  color-scheme: light dark;
  color: rgba(255, 255, 255, 0.87);
  background-color: #242424;

  font-synthesis: none;
  text-rendering: optimizeLegibility;
  -webkit-font-smoothing: antialiased;
  -moz-osx-font-smoothing: grayscale;
  -webkit-text-size-adjust: 100%;
}

table {
  border-collapse: collapse;
  margin-bottom: 1rem;
}

th,
td {
  padding: 0.5rem;
  border: 1px solid #ccc;
}

th {
  text-align: left;
}

td {
  text-align: left;
}

.column-gap-10 {
  column-gap: 10px;
}

Enter fullscreen mode Exit fullscreen mode

Conclusion

In this article, we built a simple service monitoring system using FastAPI and RabbitMQ. We saw how to create a FastAPI project, create exchanges and queues, publish and consume messages from RabbitMQ queues in FastAPI, and store monitoring data in a Postgres database.

Note: I didn't try this app to track the activities of other services that use a different framework than FastAPI. But if you want to, you can try using the same steps: Create a middleware that collects the tracking information and then push them to the RabbitMQ queue you declare to manage this data.

Thank you for taking the time to read this article.

If you have any recommendations about other packages, architectures, how to improve my code, my English, or anything; please leave a comment or contact me through Twitter, or LinkedIn.

The visitor tracker's code is here.

The source code of the publisher is here.

Resources

Top comments (3)

Collapse
 
otumianempire profile image
Michael Otu

That's an interesting project

Collapse
 
carlosm27 profile image
Carlos Armando Marcano Vargas

Thanks Otu

Collapse
 
imaginaz_21 profile image
imaginaz

Thanks for sharing the knowledge.