<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Riottecboi</title>
    <description>The latest articles on DEV Community by Riottecboi (@riottecboi).</description>
    <link>https://dev.to/riottecboi</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1650910%2F9cbff6b4-c968-4392-861e-2314a28348f9.jpeg</url>
      <title>DEV Community: Riottecboi</title>
      <link>https://dev.to/riottecboi</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/riottecboi"/>
    <language>en</language>
    <item>
      <title>DNS Lookup Web Application</title>
      <dc:creator>Riottecboi</dc:creator>
      <pubDate>Wed, 19 Jun 2024 15:22:32 +0000</pubDate>
      <link>https://dev.to/riottecboi/dns-lookup-web-application-2llf</link>
      <guid>https://dev.to/riottecboi/dns-lookup-web-application-2llf</guid>
      <description>&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3pllr5w5dodrymhb2c0q.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3pllr5w5dodrymhb2c0q.png" alt="Cover" width="800" height="334"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;DNS records are crucial for translating human-friendly domain names, like medium.com, into IP addresses that computers use to identify each other on the network. This translation is fundamental for accessing websites, sending emails, and other internet functionalities. WHOIS information provides details about the domain registration, including the registrant's contact information, domain status, and key dates.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Project Structure
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;DNS-Lookup/
├── app/
│   ├── core/
│   │   └── _config.py
│   │   └── settings.cfg
│   ├── routes/
│   │   └── _dnsLookup.py
│   ├── schemas/
│   │   ├── _dnsRequest.py
│   │   └── _whoisInfo.py
│   └── utils/
│       └── _dns.py
├── main.py
├── requirements.txt
├── static/
│   └── ... (static files)
└── templates/
    └── index.html
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Installation
&lt;/h2&gt;

&lt;p&gt;Clone the repository:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;git clone https://github.com/riottecboi/DNS-Lookup.git
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This project running on Python 3.11, so please be sure that you are install Python 3.11 on your machine.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update 
sudo apt install python3.11 -y
sudo apt-get install python3-pip -y
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Install virtual environment library if it’s not on your machine and initialize your virtual environment in your project and activate the virtual environment.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;python3.11 -m pip install virtualenv
python3.11 -m venv &amp;lt;virtual-environment-name&amp;gt;
source &amp;lt;virtual-environment-name&amp;gt;/bin/activate
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Install all required libraries for this project from the file &lt;em&gt;requirements.txt&lt;/em&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;python3.11 -m pip install -r requirements.txt
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Usage
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;Start the FastAPI server:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;uvicorn main:app --reload
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;Open your web browser and navigate to &lt;code&gt;http://localhost:8000&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Enter a domain name in the provided input field.&lt;/li&gt;
&lt;li&gt;View the DNS records and WHOIS information displayed on the web page.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Dockerize
&lt;/h2&gt;

&lt;p&gt;We can also run application as container as well. Please build an image from &lt;em&gt;Dockerfile&lt;/em&gt; in the project.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker build -f docker/Dockerfile -t dns-lookup .
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, running a container with the above image by&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker run -n DNS-Lookup -p 8000:8000 dns-lookup -d
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flr23km6m4u5hzzy4cxys.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flr23km6m4u5hzzy4cxys.png" alt="Docker Application" width="720" height="343"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Explanation
&lt;/h2&gt;

&lt;p&gt;This code defines two Pydantic models:&lt;/p&gt;

&lt;p&gt;_&lt;em&gt;dnsRequest.py&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from pydantic import BaseModel, model_validator
import validators
import socket

class DomainOrIP(BaseModel):
    domain_or_ip: str
    @model_validator(mode='after')
    def validate_domain_or_ip(cls, value):
        try:
            # Check if input is IP address
            socket.inet_pton(socket.AF_INET, value.domain_or_ip)
            return value
        except socket.error:
            pass

        try:
            # Check if input is IPv6 address
            socket.inet_pton(socket.AF_INET6, value.domain_or_ip)
            return value
        except socket.error:
            pass

        try:
            # Check if input is domain name
            validators.domain(value.domain_or_ip)
            return value
        except socket.error:
            pass

        raise ValueError(f"Invalid Domain or IP.")


class ErrorResponse(BaseModel):
    error: str
    message: str
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;DomainOrIP&lt;/em&gt;:&lt;/p&gt;

&lt;p&gt;This model has a single field called &lt;code&gt;domain_or_ip&lt;/code&gt; of type &lt;code&gt;str&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;It has a &lt;code&gt;model_validator&lt;/code&gt; decorator that applies the &lt;code&gt;validate_domain_or_ip&lt;/code&gt; function after the model has been initialized.&lt;br&gt;
The &lt;code&gt;validate_domain_or_ip&lt;/code&gt; function performs the following validations:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;First, it tries to validate if the input &lt;code&gt;domain_or_ip&lt;/code&gt; is a valid IPv4 address using the &lt;code&gt;socket.inet_pton&lt;/code&gt; function with &lt;code&gt;socket.AF_INET&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;If the input is not a valid IPv4 address, it tries to validate if it's a valid IPv6 address using &lt;code&gt;socket.inet_pton&lt;/code&gt; with &lt;code&gt;socket.AF_INET6&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;If the input is neither an IPv4 nor an IPv6 address, it tries to validate if it's a valid domain name using the &lt;code&gt;validators.domain&lt;/code&gt; function from the &lt;code&gt;validators&lt;/code&gt; library.&lt;/li&gt;
&lt;li&gt;If the input fails all three validations, it raises a &lt;code&gt;ValueError&lt;/code&gt; with the message "Invalid Domain or IP."&lt;/li&gt;
&lt;li&gt;If the input passes any of the validations, the function returns the original value without any modifications.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;em&gt;ErrorResponse&lt;/em&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;This model has two fields: &lt;code&gt;error&lt;/code&gt; (str) and &lt;code&gt;message&lt;/code&gt; (str).&lt;/li&gt;
&lt;li&gt;It is likely used to represent an error response payload when an error occurs in the application.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The purpose of this code is to provide a way to validate user input and ensure that it is either a valid IP address (IPv4 or IPv6) or a valid domain name. This validation is important because the application likely needs to handle both IP addresses and domain names correctly.&lt;/p&gt;

&lt;p&gt;The &lt;em&gt;ErrorResponse&lt;/em&gt; model is used to create a structured error response that can be returned to the client when an error occurs, such as when the input is invalid.&lt;/p&gt;

&lt;p&gt;_&lt;em&gt;dns.py&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import asyncio
import validators
import whois
import folium
from geopy import Nominatim
from schemas._whoisInfo import WhoisInfo

class DomainLocator:
    def __init__(self, domain: str):
        self.geolocator = Nominatim(user_agent="DNS_lookup")
        self.domain = domain
        self.domain_info = None
        self.return_info = None
        self.domain_map = None

    async def fetch_domain_info(self):
        try:
            loop = asyncio.get_event_loop()
            self.domain_info = await loop.run_in_executor(None, whois.whois, self.domain)
        except Exception as e:
            print(f"Error fetching WHOIS information for {self.domain}: {e}")

    async def get_coordinates(self, location):
        try:
            location = self.geolocator.geocode(location)
            if location:
                return location.latitude, location.longitude
            else:
                return None, None
        except Exception as e:
            print(f"Error fetching coordinates for {location}: {e}")
            return None, None

    async def plot_domain_location(self):
        if self.domain_info and self.domain_info.registrar:
            location = self.domain_info.address
            if self.domain_info.country and isinstance(self.domain_info.country, str):
                location = self.domain_info.country
            if self.domain_info.city and isinstance(self.domain_info.city, str):
                location = self.domain_info.city
            lat, lon = await self.get_coordinates(location)
            if lat and lon:
                map = folium.Map(location=[lat, lon], zoom_start=4)
                folium.Marker([lat, lon], popup=f"{self.domain}").add_to(map)
                self.domain_map = map.get_root()._repr_html_()
            else:
                print(f"Unable to find coordinates for location: {location}")
        else:
            print(f"No registrar information found for {self.domain}")
            self.domain_map = ''

    async def map_whois_data(self, data):
        if self.domain_info.domain_name and "Name or service not known" not in self.domain_info.text:
            whois_fields = list(WhoisInfo.model_fields.keys())
            self.return_info = {}
            for field in whois_fields:
                if field in data:
                    self.return_info[field] = data[field]
            return self.return_info
        else:
            return {}
    async def process_domain(self):
        if self.domain:
            print(f"Processing domain: {self.domain}")
            await self.fetch_domain_info()
            await self.map_whois_data(self.domain_info)
            await self.plot_domain_location()
            return self.return_info, self.domain_map
        else:
            print("No valid domain to process")
            return None, None
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This code defines a &lt;code&gt;DomainLocator&lt;/code&gt; class that is responsible for fetching WHOIS information for a given domain, mapping the WHOIS data to a structured model, and plotting the location of the domain on a map using the &lt;code&gt;folium&lt;/code&gt; library.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;__init__(self, domain: str)&lt;/code&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Initializes the &lt;code&gt;DomainLocator&lt;/code&gt; instance with a domain name.&lt;/li&gt;
&lt;li&gt;Creates a &lt;code&gt;Nominatim&lt;/code&gt; instance from the &lt;code&gt;geopy&lt;/code&gt; library for geocoding purposes.&lt;/li&gt;
&lt;li&gt;Sets the &lt;code&gt;domain_info&lt;/code&gt;, &lt;code&gt;return_info&lt;/code&gt;, and &lt;code&gt;domain_map&lt;/code&gt; attributes to &lt;code&gt;None&lt;/code&gt; initially.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;code&gt;async fetch_domain_info(self)&lt;/code&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Fetches the WHOIS information for the given domain using the &lt;code&gt;whois&lt;/code&gt; library.&lt;/li&gt;
&lt;li&gt;Runs the &lt;code&gt;whois.whois&lt;/code&gt; function in an executor to avoid blocking the event loop.&lt;/li&gt;
&lt;li&gt;Stores the WHOIS information in the &lt;code&gt;domain_info&lt;/code&gt; attribute.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;code&gt;async get_coordinates(self, location)&lt;/code&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Takes a location string as input and uses the &lt;code&gt;Nominatim&lt;/code&gt; geocoder to retrieve the latitude and longitude coordinates.&lt;/li&gt;
&lt;li&gt;Returns the coordinates as a tuple &lt;code&gt;(latitude, longitude)&lt;/code&gt; or &lt;code&gt;(None, None)&lt;/code&gt; if the location could not be geocoded.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;code&gt;async plot_domain_location(self)&lt;/code&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Attempts to determine the location of the domain based on the available WHOIS information (address, country, or city).&lt;/li&gt;
&lt;li&gt;Calls the &lt;code&gt;get_coordinates&lt;/code&gt; method to obtain the latitude and longitude coordinates for the location.&lt;/li&gt;
&lt;li&gt;If coordinates are found, creates a &lt;code&gt;folium&lt;/code&gt; map centered on those coordinates and adds a marker for the domain.&lt;/li&gt;
&lt;li&gt;Stores the HTML representation of the map in the &lt;code&gt;domain_map&lt;/code&gt; attribute.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;code&gt;async map_whois_data(self, data)&lt;/code&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Maps the WHOIS data to the &lt;code&gt;WhoisInfo&lt;/code&gt; model defined in the &lt;code&gt;schemas._whoisInfo&lt;/code&gt; module.&lt;/li&gt;
&lt;li&gt;Iterates over the fields in the &lt;code&gt;WhoisInfo&lt;/code&gt; model and populates the &lt;code&gt;return_info&lt;/code&gt; dictionary with the corresponding values from the WHOIS data.&lt;/li&gt;
&lt;li&gt;Returns the &lt;code&gt;return_info&lt;/code&gt; dictionary if the WHOIS data is valid, or an empty dictionary if the data is not available or invalid.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;code&gt;async process_domain(self)&lt;/code&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The main method that orchestrates the entire process of fetching WHOIS information, mapping the data, and plotting the domain location.&lt;/li&gt;
&lt;li&gt;Calls the &lt;code&gt;fetch_domain_info&lt;/code&gt;, &lt;code&gt;map_whois_data&lt;/code&gt;, and &lt;code&gt;plot_domain_location&lt;/code&gt; methods in sequence.&lt;/li&gt;
&lt;li&gt;Returns the &lt;code&gt;return_info&lt;/code&gt; dictionary and the &lt;code&gt;domain_map&lt;/code&gt; HTML content.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This class is designed to be used asynchronously, as indicated by the async keyword on the methods. It leverages the asyncio library and the &lt;em&gt;run_in_executor&lt;/em&gt; function to offload blocking operations (like fetching WHOIS data) to a separate executor, allowing the event loop to remain responsive.&lt;/p&gt;

&lt;p&gt;The &lt;em&gt;process_main&lt;/em&gt; method can be called with a valid domain name, and it will return the structured WHOIS information and a map showing the location of the domain (if available).&lt;/p&gt;

&lt;p&gt;_&lt;em&gt;dnsLookup.py&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import json
import os
from fastapi import APIRouter, HTTPException, Response, Request, Form, status, Depends
from fastapi.responses import RedirectResponse
from utils._dns import DomainLocator
from schemas._dnsRequest import DomainOrIP, ErrorResponse
from pydantic_core._pydantic_core import ValidationError
from cryptography.fernet import Fernet
from fastapi.templating import Jinja2Templates

key = Fernet.generate_key()
cipher_suite = Fernet(key)

folder = os.getcwd()
os.chdir("..")
path = os.getcwd()
templates = Jinja2Templates(directory=path+"/templates")

router = APIRouter()

def serialize_datetime(dt_or_list):
    if isinstance(dt_or_list, list):
        return [dt.isoformat() for dt in dt_or_list]
    if dt_or_list is None:
        return ''
    else:
        return dt_or_list.isoformat()

async def process_and_encrypt_data(domain_or_ip: str):
    try:
        validated_input = DomainOrIP(domain_or_ip=domain_or_ip)
        domain_or_ip = validated_input.domain_or_ip
        locator = DomainLocator(domain_or_ip)
        domain_info, domain_map = await locator.process_domain()
        if domain_info:
            domain_info['updated_date'] = serialize_datetime(domain_info['updated_date'])
            domain_info['creation_date'] = serialize_datetime(domain_info['creation_date'])
            domain_info['expiration_date'] = serialize_datetime(domain_info['expiration_date'])

            encrypted_domain_info = cipher_suite.encrypt(json.dumps(domain_info).encode()).decode()
            encrypted_domain_map = cipher_suite.encrypt(domain_map.encode()).decode()

            return encrypted_domain_info, encrypted_domain_map
        else:
            return None, None
    except ValidationError as e:
        raise HTTPException(status_code=400, detail={"error": "Not processing Domain/IP",
                                                     "message": "The input cannot process. Please try again."})

@router.post("/lookup", responses={400: {"model": ErrorResponse}})
async def dns_lookup(domain_or_ip: str = Form(...)):
    try:
        encrypted_domain_info, encrypted_domain_map = await process_and_encrypt_data(domain_or_ip)
        return RedirectResponse(url=f"/result?domain={domain_or_ip}&amp;amp;domain_info={encrypted_domain_info}&amp;amp;domain_map={encrypted_domain_map}", status_code=302)

    except ValidationError as e:
        raise HTTPException(status_code=400, detail=ErrorResponse(error="Not processing Domain/IP",
                            message="The input cannot process. Please try again.").dict())

@router.get("/home")
async def get_template(request: Request):
    return templates.TemplateResponse("index.html", {"request": request, "domain_info": None, "domain_map": None, "found": True})

@router.get("/result")
async def get_template(request: Request):
    found = True
    search_domain = request.query_params.get('domain')
    domain_info = request.query_params.get('domain_info')
    domain_map = request.query_params.get('domain_map')

    if domain_info == 'None':
        domain_info = eval(domain_info)
        domain_map = eval(domain_map)
        found = False

    else:
        decrypted_domain_info_json = cipher_suite.decrypt(domain_info.encode()).decode() if domain_info else None
        domain_info = json.loads(decrypted_domain_info_json)

        domain_map = cipher_suite.decrypt(domain_map.encode()).decode() if domain_map else None

    return templates.TemplateResponse("index.html", {"request": request, "domain": search_domain, "domain_info": domain_info, "domain_map": domain_map, "found": found})
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Imports&lt;/strong&gt;: The code imports the necessary modules and classes, such as &lt;code&gt;json&lt;/code&gt; for working with JSON data, &lt;code&gt;os&lt;/code&gt; for interacting with the operating system, and various classes from the FastAPI framework.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Initialization&lt;/strong&gt;: The code generates a key using the &lt;code&gt;Fernet&lt;/code&gt; class from the &lt;code&gt;cryptography&lt;/code&gt; module, which is likely used for encryption and decryption purposes. It also sets up a Jinja2 template environment for rendering HTML templates and creates an instance of the &lt;code&gt;APIRouter&lt;/code&gt; class from FastAPI.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Helper Functions&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;serialize_datetime&lt;/code&gt;: This function converts datetime objects or a list of datetime objects to ISO format strings.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;process_and_encrypt_data&lt;/code&gt;: This async function takes a domain or IP address as input, validates it using the &lt;code&gt;DomainOrIP&lt;/code&gt; schema, retrieves domain information using the &lt;code&gt;DomainLocator&lt;/code&gt; class, and encrypts the domain information and domain map using the &lt;code&gt;Fernet&lt;/code&gt; cipher suite.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;strong&gt;API Routes&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;@router.post("/lookup")&lt;/code&gt;: This route accepts a domain or IP address as form data, calls the &lt;code&gt;process_and_encrypt_data&lt;/code&gt; function, and redirects the user to the &lt;code&gt;/result&lt;/code&gt; route with the encrypted domain information and domain map as query parameters.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;@router.get("/home")&lt;/code&gt;: This route renders the &lt;code&gt;index.html&lt;/code&gt; template without any domain information.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;@router.get("/result")&lt;/code&gt;: This route retrieves the domain, encrypted domain information, and encrypted domain map from the query parameters. If the domain information is &lt;code&gt;None&lt;/code&gt;, it sets a &lt;code&gt;found&lt;/code&gt; flag to &lt;code&gt;False&lt;/code&gt;. Otherwise, it decrypts the domain information and domain map using the &lt;code&gt;Fernet&lt;/code&gt; cipher suite and renders the &lt;code&gt;index.html&lt;/code&gt; template with the retrieved data.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The code is a part of a larger application that retrieves and processes DNS information for a given domain or IP address.&lt;/p&gt;

&lt;p&gt;It uses encryption and decryption techniques to securely transmit and store the domain information and map. The application provides a web interface ( through the index.html template) where users can input a domain or IP address, and the application retrieves and displays the corresponding DNS information.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;main.py&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from fastapi import FastAPI
from routes import _dnsLookup
from fastapi.staticfiles import StaticFiles
import os

app = FastAPI(
    title="DNS Lookup API",
    description="API for getting whois information and location of domain or IP",
    version="1.0",
    docs_url="/docs",
    openapi_url="/openapi.json",
    contact={
        "name": "Tran Vinh Liem",
        "email": "riottecboi@gmail.com",
        "url": "https://about.riotteboi.com"
    }
)
folder = os.getcwd()

app.mount("/static", StaticFiles(directory=folder+"/static", html=True), name="static")
app.include_router(_dnsLookup.router)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Imports&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;FastAPI&lt;/code&gt; is imported from the &lt;code&gt;fastapi&lt;/code&gt; module to create the FastAPI application instance.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;_dnsLookup&lt;/code&gt; is imported from the &lt;code&gt;routes&lt;/code&gt; module, which likely contains the API routes for handling DNS lookup requests.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;StaticFiles&lt;/code&gt; is imported from &lt;code&gt;fastapi.staticfiles&lt;/code&gt; to serve static files.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;os&lt;/code&gt; is imported to interact with the operating system and get the current working directory.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;strong&gt;FastAPI Application Instance&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;A new &lt;code&gt;FastAPI&lt;/code&gt; instance is created and assigned to the &lt;code&gt;app&lt;/code&gt; variable.&lt;/li&gt;
&lt;li&gt;The application is configured with metadata such as the title, description, version, documentation URLs, and contact information.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;strong&gt;Static Files&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The &lt;code&gt;StaticFiles&lt;/code&gt; class is used to mount a directory containing static files (e.g., CSS, JavaScript, images) at the &lt;code&gt;/static&lt;/code&gt; URL path.&lt;/li&gt;
&lt;li&gt;The &lt;code&gt;folder&lt;/code&gt; variable stores the current working directory, and the static files are located in the &lt;code&gt;static&lt;/code&gt; subdirectory.&lt;/li&gt;
&lt;li&gt;The &lt;code&gt;html=True&lt;/code&gt; parameter is set to allow serving HTML files from the static directory.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;

&lt;p&gt;&lt;strong&gt;Router Inclusion&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The &lt;code&gt;include_router&lt;/code&gt; method is called on the &lt;code&gt;app&lt;/code&gt; instance to include the routes defined in the &lt;code&gt;_dnsLookup&lt;/code&gt; module.&lt;/li&gt;
&lt;li&gt;This means that all the routes defined in &lt;code&gt;_dnsLookup.router&lt;/code&gt; will be added to the FastAPI application.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Final — Results
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3pdp3vsbft3q8kl66dlc.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3pdp3vsbft3q8kl66dlc.png" alt="Facebook WHOIS result" width="720" height="391"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Feey81dj9cfqampdkids2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Feey81dj9cfqampdkids2.png" alt="Google WHOIS result" width="720" height="421"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqttnueiz49uuqsrgw5e3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqttnueiz49uuqsrgw5e3.png" alt="No result for Example.domain" width="720" height="421"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Overall, this application provides a web-based interface and API endpoints for users to look up WHOIS information and geographical location details for domains or IP addresses. The combination of FastAPI, Pydantic, encryption, geocoding, and mapping libraries enables a comprehensive and secure solution for DNS lookup functionality.&lt;/p&gt;

</description>
      <category>fastapi</category>
      <category>python</category>
      <category>geopy</category>
      <category>folium</category>
    </item>
    <item>
      <title>Travel Recommended with FastAPI, Kafka, MongoDB and OpenAI</title>
      <dc:creator>Riottecboi</dc:creator>
      <pubDate>Wed, 19 Jun 2024 12:49:09 +0000</pubDate>
      <link>https://dev.to/riottecboi/travel-recommended-with-fastapi-kafka-mongodb-and-openai-j4g</link>
      <guid>https://dev.to/riottecboi/travel-recommended-with-fastapi-kafka-mongodb-and-openai-j4g</guid>
      <description>&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fdgn206vvnt62ggixyhde.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fdgn206vvnt62ggixyhde.png" alt="FastAPI" width="800" height="450"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;blockquote&gt;
&lt;p&gt;In the bustling realm of modern travel, personalized recommendations play a pivotal role in enhancing user experiences and fostering memorable journeys. Our project, the Smart Travel Recommender System, aims to revolutionize the way travelers explore new destinations by providing tailored recommendations for activities based on country and season. By integrating cutting-edge technologies such as FastAPI, Kafka, MongoDB, and OpenAI, we aspire to deliver a seamless and scalable solution that empowers users to discover the essence of every destination.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Structure of Project
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Travel Recommender/
├── app/
│   ├── background_worker.py
│   ├── main.py
│   ├── api/
│   │   ├── openai_api.py
│   │   └── routes/
│   │       ├── recommendations_route.py
│   │       └── status_route.py
│   ├── models/
│   │   ├── __init__.py
│   │   └── models.py
│   ├── schemas/
│   │   └── schema.py
│   ├── core/
│   │   └── config.py
│   ├── db/
│   │   └── mongodb.py
│   └── utils/
│       └── kafka.py
├── test/
│   └── sample_test.py
├── requirements.txt
├── README.md
├── Dockerfile
├── docker-compose.yml
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;More information from my &lt;a href="https://github.com/riottecboi/FastAPI-Kafka-MongoDB-OpenAI"&gt;Github&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Installation &amp;amp; Usage
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update 
sudo apt install python3.11 -y
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Use the package manager &lt;a href="https://pip.pypa.io/en/stable/"&gt;Pip&lt;/a&gt; to install driver, and any other required libraries.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo apt-get install python3-pip -y
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Initialize your virtual environment in your project and activate the virtual environment.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;python3.11 -m venv &amp;lt;virtual-environment-name&amp;gt;
source &amp;lt;virtual-environment-name&amp;gt;/bin/activate
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Install all required libraries for this project from the file &lt;em&gt;requirements.txt&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;python3.11 -m pip install -r requirements.txt
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Set Up All Services
&lt;/h2&gt;

&lt;p&gt;I have created a docker-compose file for easy to install all of elements we need for this project.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: bitnami/kafka:latest
    ports:
        - 9092:9092
        - 9093:9093
    environment:
        - KAFKA_BROKER_ID=1
        - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
        - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
        - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
        - ALLOW_PLAINTEXT_LISTENER=yes
        - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT
        - KAFKA_CFG_LISTENERS=CLIENT://:9092
        - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092
        - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
        - zookeeper

  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - 8080:8080
    depends_on:
      - kafka
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

  mongodb:
    image: mongo:5.0
    ports:
      - 27017:27017
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: root
    healthcheck:
      test: echo 'db.runCommand("ping").ok' | mongo localhost:27017/test --quiet
      interval: 10s
      timeout: 5s
      retries: 5

  mongo-express:
    image: mongo-express:latest
    depends_on:
      mongodb:
        condition: service_healthy
    ports:
      - 8888:8081
    environment:
      ME_CONFIG_MONGODB_SERVER: mongodb

  fastapi-app:
    build:
      context: .
      args:
        NO_CACHE: "true"
    depends_on:
      - kafka
      - mongodb
    ports:
      - "8000:8000"
    environment:
      OPENAI_KEY: "xxxx"
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
      KAFKA_TOPIC: recommendations_topic
      MONGODB_URI: mongodb://root:root@mongodb:27017
      MONGODB_DATABASE: recommendations
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Dockerfile will be like this&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;FROM tiangolo/uvicorn-gunicorn-fastapi:python3.11

RUN apt-get update &amp;amp;&amp;amp; apt-get install -y \
    build-essential \
    libpq-dev \
    python3-pip \
    &amp;amp;&amp;amp; rm -rf /var/lib/apt/lists/*

COPY requirements.txt /app/requirements.txt
COPY app/api /app/api
COPY app/core /app/core
COPY app/db /app/db
COPY app/schemas /app/schemas
COPY app/utils /app/utils
COPY app/background-worker.py /app/background.py
COPY app/main.py /app/main.py

RUN pip3 install --no-cache-dir -r requirements.txt

EXPOSE 8000
# Run the FastAPI service and background process
CMD ["sh", "-c", "uvicorn main:app --host 0.0.0.0 --port 8000 &amp;amp; python background.py"]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1ftmbe12hx915bjark6n.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1ftmbe12hx915bjark6n.png" alt="Containers will be pulled and run by each process, connect with a default bridge network of project" width="720" height="213"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Containers will be pulled and run by each process, connect with a default bridge network of project.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Explanation
&lt;/h2&gt;

&lt;p&gt;We will have in total two routes (recommendation_route and status_route)&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;&lt;em&gt;recommendation_route&lt;/em&gt;&lt;/strong&gt; will accept two query parameters:&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;em&gt;country&lt;/em&gt;: The country for which the recommendations are to be fetched.&lt;/li&gt;
&lt;li&gt;
&lt;em&gt;season&lt;/em&gt;: The season in which the recommendations are desired (e.g., "summer", "winter").&lt;/li&gt;
&lt;li&gt;Both parameters are required and the season must validate that one of the four seasons is chosen.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class RecommendationsRequest(BaseModel):
    country: str = Field(..., description="The country for which recommendations are to be fetched.")
    season: str = Field(..., description="The season in which the recommendations are desired.")

    @model_validator(mode='after')
    def validate_season(cls, values):
        try:
            pycountry.countries.search_fuzzy(values.country)
        except LookupError:
            raise ValueError(f"Invalid country.")
        valid_seasons = ["spring", "summer", "autumn", "winter"]
        if values.season not in valid_seasons:
            raise ValueError(f"Invalid season. Must be one of {', '.join(valid_seasons)}")

        return values
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When a request is made to the endpoint, generate a unique identifier (UID) for the request then return the UID to the user immediately.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;try:
    RecommendationsRequest(country=country, season=season)
    uid = str(uuid.uuid4())
    request_data = {'country': country, 'season': season}
    await kafka_producer(request_data, uid)
    return RecommendationSubmitResponse(uid=uid)
except ValidationError as e:
    raise HTTPException(status_code=422, detail=ErrorResponse(error="Invalid country/season", message="The input of country or season is invalid. Please try again.").dict())
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Offload the processing of the request to a background component. At this stage, we will use &lt;strong&gt;Kafka&lt;/strong&gt; to send the request as a message to a Kafka topic and consume it with a separate worker.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;async def kafka_producer(request_data, uid):
    producer = AIOKafkaProducer(
        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS
    )
    await producer.start()
    await producer.send(
        settings.KAFKA_TOPIC,
        f"{uid}:{request_data}".encode("utf-8"), partition=0
    )
    await producer.stop()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This Python code block defines an asynchronous function named &lt;em&gt;kafka_producer&lt;/em&gt; that is responsible for sending data to a Kafka topic.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;producer = AIOKafkaProducer(
        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS
    )
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Initializes a &lt;strong&gt;Kafka&lt;/strong&gt; producer using the &lt;em&gt;AIOKafkaProducer&lt;/em&gt; class from &lt;em&gt;aiokafka&lt;/em&gt; library. This producer is configured to connect to the Kafka cluster specified in the &lt;em&gt;settings.KAFKA_BOOTSTRAP_SERVERS&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;await producer.start()
    await producer.send(
        settings.KAFKA_TOPIC,
        f"{uid}:{request_data}".encode("utf-8"), partition=0
    )
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This step prepares the producer to send messages to the Kafka cluster, sends a message to the specified &lt;strong&gt;Kafka&lt;/strong&gt; topic (&lt;em&gt;settings.KAFKA_TOPIC&lt;/em&gt;). The message content is a string composed of the uid followed by a colon &lt;code&gt;(:)&lt;/code&gt; and the &lt;em&gt;request_data&lt;/em&gt;, message should be encoded as UTF-8 before sending.&lt;/p&gt;

&lt;p&gt;When the message sent a new topic to &lt;strong&gt;Kafka&lt;/strong&gt;, &lt;em&gt;background_worker.py&lt;/em&gt; will do a work to catch that message.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;async def handle_request(uid, request_data):
    try:
        recommendations = await get_openai_recommendation(request_data)
    except Exception as e:
        recommendations = []
    result = await save_recommendations(uid, request_data, recommendations)
    print(f"Recommendations saved with ID: {result}")

async def main():
    while True:
        uid, request_data = await kafka_consumer()
        await handle_request(uid, request_data)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;on this stage, the worker will consume a message by this code block, it defines an asynchronous function that acts as a Kafka consumer, receiving messages from a specified &lt;strong&gt;Kafka&lt;/strong&gt; topic, processing them, and committing the offsets to track the progress of message consumption.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;consumer = AIOKafkaConsumer(
        settings.KAFKA_TOPIC,
        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
        group_id=settings.KAFKA_TOPIC,
        auto_offset_reset='earliest'
    )
await consumer.start()
try:
    async for msg in consumer:
        uid, request_data = msg.value.decode("utf-8").split(":", 1)
        print(f"Processed recommendation request: {request_data}")
        await consumer.commit()
        return uid, eval(request_data)
except Exception as e:
    print(f"Consumer error: {e}")
finally:
    await consumer.stop()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;About the &lt;em&gt;auto_offset_reset&lt;/em&gt;=’&lt;em&gt;earliest&lt;/em&gt;’, It specifies the behavior for handling the offset when there is no initial offset or the current offset does not exist on the server (e.g., because the data has been deleted). In this case, it's set to ‘earliest’, meaning the consumer will start reading from the earliest available message.&lt;/p&gt;

&lt;p&gt;After commits the offset of the consumed message, &lt;em&gt;background_worker.py&lt;/em&gt; make the &lt;strong&gt;OpenAI&lt;/strong&gt; API call.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;try:
    client = OpenAI(api_key=settings.OPENAI_KEY)
    chat_completion = client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": f"Provide three recommendations for doing in {request_data['country']} during {request_data['season']}.",
            }
        ],
        model="gpt-3.5-turbo",
    )
    return [chat_completion.choices[0].message.content]
except Exception as e:
    raise Exception(str(e))
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;and saving recommendations store the recommendations in MongoDB with the UID as the key, structure the data to include the (country, season) as request_data, recommendations.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;async def save_recommendations(uid, request_data, recommendations):
    recommendation_doc = {
        "uid": uid,
        "request_data": request_data,
        "recommendations": recommendations
    }
    result = await loop.run_in_executor(None, recommendations_collection.insert_one, recommendation_doc)
    return result.inserted_id
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;status_route&lt;/strong&gt; accept the UID as a query parameter, checking the result in MongoDB.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;If having result, the status is “completed” with the recommendations.&lt;/li&gt;
&lt;li&gt;If not process haven’t finish, the status is “pending” to inform the data is not yet available.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;if recommendations is None:
    raise HTTPException(status_code=404, detail=ErrorResponse(error="UID not found", message="The provided UID does not exist. Please check the UID and try again.").dict())

if recommendations:
    return RecommendationResponse(uid=uid, country=country, season=season, message="The recommendations are ready", recommendations=recommendations, status="completed")
return RecommendationCheckResponse(uid=uid, status="pending", message="The recommendations are not yet available. Please try again later.")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class RecommendationSubmitResponse(BaseModel):
    uid: str

class RecommendationCheckResponse(RecommendationSubmitResponse):
    message: Optional[str] = None
    status: str

class RecommendationResponse(RecommendationCheckResponse):
    country: Optional[str] = None
    season: Optional[Literal["spring", "summer", "autumn", "winter"]] = None
    recommendations: Optional[List[str]] = None

class ErrorResponse(BaseModel):
    error: str
    message: str
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Final Demo
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flf2ke7gz8rlev28kzhhx.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flf2ke7gz8rlev28kzhhx.png" alt="Final Demo" width="800" height="448"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;By leveraging FastAPI, Kafka, MongoDB, and OpenAI, we endeavor to deliver a sophisticated yet user-friendly platform that empowers travelers to embark on unforgettable journeys tailored to their preferences and interests. With scalability, efficiency, and personalization at its core, our system strives to redefine the way travelers explore the world, one recommendation at a time.&lt;/p&gt;

</description>
      <category>fastapi</category>
      <category>kafka</category>
      <category>mongodb</category>
      <category>openai</category>
    </item>
  </channel>
</rss>
