DNS records are crucial for translating human-friendly domain names, like medium.com, into IP addresses that computers use to identify each other on the network. This translation is fundamental for accessing websites, sending emails, and other internet functionalities. WHOIS information provides details about the domain registration, including the registrant's contact information, domain status, and key dates.
Project Structure
DNS-Lookup/
├── app/
│ ├── core/
│ │ └── _config.py
│ │ └── settings.cfg
│ ├── routes/
│ │ └── _dnsLookup.py
│ ├── schemas/
│ │ ├── _dnsRequest.py
│ │ └── _whoisInfo.py
│ └── utils/
│ └── _dns.py
├── main.py
├── requirements.txt
├── static/
│ └── ... (static files)
└── templates/
└── index.html
Installation
Clone the repository:
git clone https://github.com/riottecboi/DNS-Lookup.git
This project running on Python 3.11, so please be sure that you are install Python 3.11 on your machine.
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update
sudo apt install python3.11 -y
sudo apt-get install python3-pip -y
Install virtual environment library if it’s not on your machine and initialize your virtual environment in your project and activate the virtual environment.
python3.11 -m pip install virtualenv
python3.11 -m venv <virtual-environment-name>
source <virtual-environment-name>/bin/activate
Install all required libraries for this project from the file requirements.txt.
python3.11 -m pip install -r requirements.txt
Usage
- Start the FastAPI server:
uvicorn main:app --reload
- Open your web browser and navigate to
http://localhost:8000 - Enter a domain name in the provided input field.
- View the DNS records and WHOIS information displayed on the web page.
Dockerize
We can also run application as container as well. Please build an image from Dockerfile in the project.
docker build -f docker/Dockerfile -t dns-lookup .
Then, running a container with the above image by
docker run -n DNS-Lookup -p 8000:8000 dns-lookup -d
Explanation
This code defines two Pydantic models:
_dnsRequest.py
from pydantic import BaseModel, model_validator
import validators
import socket
class DomainOrIP(BaseModel):
domain_or_ip: str
@model_validator(mode='after')
def validate_domain_or_ip(cls, value):
try:
# Check if input is IP address
socket.inet_pton(socket.AF_INET, value.domain_or_ip)
return value
except socket.error:
pass
try:
# Check if input is IPv6 address
socket.inet_pton(socket.AF_INET6, value.domain_or_ip)
return value
except socket.error:
pass
try:
# Check if input is domain name
validators.domain(value.domain_or_ip)
return value
except socket.error:
pass
raise ValueError(f"Invalid Domain or IP.")
class ErrorResponse(BaseModel):
error: str
message: str
DomainOrIP:
This model has a single field called domain_or_ip of type str.
It has a model_validator decorator that applies the validate_domain_or_ip function after the model has been initialized.
The validate_domain_or_ip function performs the following validations:
- First, it tries to validate if the input
domain_or_ipis a valid IPv4 address using thesocket.inet_ptonfunction withsocket.AF_INET. - If the input is not a valid IPv4 address, it tries to validate if it's a valid IPv6 address using
socket.inet_ptonwithsocket.AF_INET6. - If the input is neither an IPv4 nor an IPv6 address, it tries to validate if it's a valid domain name using the
validators.domainfunction from thevalidatorslibrary. - If the input fails all three validations, it raises a
ValueErrorwith the message "Invalid Domain or IP." - If the input passes any of the validations, the function returns the original value without any modifications.
ErrorResponse:
- This model has two fields:
error(str) andmessage(str). - It is likely used to represent an error response payload when an error occurs in the application.
The purpose of this code is to provide a way to validate user input and ensure that it is either a valid IP address (IPv4 or IPv6) or a valid domain name. This validation is important because the application likely needs to handle both IP addresses and domain names correctly.
The ErrorResponse model is used to create a structured error response that can be returned to the client when an error occurs, such as when the input is invalid.
_dns.py
import asyncio
import validators
import whois
import folium
from geopy import Nominatim
from schemas._whoisInfo import WhoisInfo
class DomainLocator:
def __init__(self, domain: str):
self.geolocator = Nominatim(user_agent="DNS_lookup")
self.domain = domain
self.domain_info = None
self.return_info = None
self.domain_map = None
async def fetch_domain_info(self):
try:
loop = asyncio.get_event_loop()
self.domain_info = await loop.run_in_executor(None, whois.whois, self.domain)
except Exception as e:
print(f"Error fetching WHOIS information for {self.domain}: {e}")
async def get_coordinates(self, location):
try:
location = self.geolocator.geocode(location)
if location:
return location.latitude, location.longitude
else:
return None, None
except Exception as e:
print(f"Error fetching coordinates for {location}: {e}")
return None, None
async def plot_domain_location(self):
if self.domain_info and self.domain_info.registrar:
location = self.domain_info.address
if self.domain_info.country and isinstance(self.domain_info.country, str):
location = self.domain_info.country
if self.domain_info.city and isinstance(self.domain_info.city, str):
location = self.domain_info.city
lat, lon = await self.get_coordinates(location)
if lat and lon:
map = folium.Map(location=[lat, lon], zoom_start=4)
folium.Marker([lat, lon], popup=f"{self.domain}").add_to(map)
self.domain_map = map.get_root()._repr_html_()
else:
print(f"Unable to find coordinates for location: {location}")
else:
print(f"No registrar information found for {self.domain}")
self.domain_map = ''
async def map_whois_data(self, data):
if self.domain_info.domain_name and "Name or service not known" not in self.domain_info.text:
whois_fields = list(WhoisInfo.model_fields.keys())
self.return_info = {}
for field in whois_fields:
if field in data:
self.return_info[field] = data[field]
return self.return_info
else:
return {}
async def process_domain(self):
if self.domain:
print(f"Processing domain: {self.domain}")
await self.fetch_domain_info()
await self.map_whois_data(self.domain_info)
await self.plot_domain_location()
return self.return_info, self.domain_map
else:
print("No valid domain to process")
return None, None
This code defines a DomainLocator class that is responsible for fetching WHOIS information for a given domain, mapping the WHOIS data to a structured model, and plotting the location of the domain on a map using the folium library.
-
__init__(self, domain: str):- Initializes the
DomainLocatorinstance with a domain name. - Creates a
Nominatiminstance from thegeopylibrary for geocoding purposes. - Sets the
domain_info,return_info, anddomain_mapattributes toNoneinitially.
- Initializes the
-
async fetch_domain_info(self):- Fetches the WHOIS information for the given domain using the
whoislibrary. - Runs the
whois.whoisfunction in an executor to avoid blocking the event loop. - Stores the WHOIS information in the
domain_infoattribute.
- Fetches the WHOIS information for the given domain using the
-
async get_coordinates(self, location):- Takes a location string as input and uses the
Nominatimgeocoder to retrieve the latitude and longitude coordinates. - Returns the coordinates as a tuple
(latitude, longitude)or(None, None)if the location could not be geocoded.
- Takes a location string as input and uses the
-
async plot_domain_location(self):- Attempts to determine the location of the domain based on the available WHOIS information (address, country, or city).
- Calls the
get_coordinatesmethod to obtain the latitude and longitude coordinates for the location. - If coordinates are found, creates a
foliummap centered on those coordinates and adds a marker for the domain. - Stores the HTML representation of the map in the
domain_mapattribute.
-
async map_whois_data(self, data):- Maps the WHOIS data to the
WhoisInfomodel defined in theschemas._whoisInfomodule. - Iterates over the fields in the
WhoisInfomodel and populates thereturn_infodictionary with the corresponding values from the WHOIS data. - Returns the
return_infodictionary if the WHOIS data is valid, or an empty dictionary if the data is not available or invalid.
- Maps the WHOIS data to the
-
async process_domain(self):- The main method that orchestrates the entire process of fetching WHOIS information, mapping the data, and plotting the domain location.
- Calls the
fetch_domain_info,map_whois_data, andplot_domain_locationmethods in sequence. - Returns the
return_infodictionary and thedomain_mapHTML content.
This class is designed to be used asynchronously, as indicated by the async keyword on the methods. It leverages the asyncio library and the run_in_executor function to offload blocking operations (like fetching WHOIS data) to a separate executor, allowing the event loop to remain responsive.
The process_main method can be called with a valid domain name, and it will return the structured WHOIS information and a map showing the location of the domain (if available).
_dnsLookup.py
import json
import os
from fastapi import APIRouter, HTTPException, Response, Request, Form, status, Depends
from fastapi.responses import RedirectResponse
from utils._dns import DomainLocator
from schemas._dnsRequest import DomainOrIP, ErrorResponse
from pydantic_core._pydantic_core import ValidationError
from cryptography.fernet import Fernet
from fastapi.templating import Jinja2Templates
key = Fernet.generate_key()
cipher_suite = Fernet(key)
folder = os.getcwd()
os.chdir("..")
path = os.getcwd()
templates = Jinja2Templates(directory=path+"/templates")
router = APIRouter()
def serialize_datetime(dt_or_list):
if isinstance(dt_or_list, list):
return [dt.isoformat() for dt in dt_or_list]
if dt_or_list is None:
return ''
else:
return dt_or_list.isoformat()
async def process_and_encrypt_data(domain_or_ip: str):
try:
validated_input = DomainOrIP(domain_or_ip=domain_or_ip)
domain_or_ip = validated_input.domain_or_ip
locator = DomainLocator(domain_or_ip)
domain_info, domain_map = await locator.process_domain()
if domain_info:
domain_info['updated_date'] = serialize_datetime(domain_info['updated_date'])
domain_info['creation_date'] = serialize_datetime(domain_info['creation_date'])
domain_info['expiration_date'] = serialize_datetime(domain_info['expiration_date'])
encrypted_domain_info = cipher_suite.encrypt(json.dumps(domain_info).encode()).decode()
encrypted_domain_map = cipher_suite.encrypt(domain_map.encode()).decode()
return encrypted_domain_info, encrypted_domain_map
else:
return None, None
except ValidationError as e:
raise HTTPException(status_code=400, detail={"error": "Not processing Domain/IP",
"message": "The input cannot process. Please try again."})
@router.post("/lookup", responses={400: {"model": ErrorResponse}})
async def dns_lookup(domain_or_ip: str = Form(...)):
try:
encrypted_domain_info, encrypted_domain_map = await process_and_encrypt_data(domain_or_ip)
return RedirectResponse(url=f"/result?domain={domain_or_ip}&domain_info={encrypted_domain_info}&domain_map={encrypted_domain_map}", status_code=302)
except ValidationError as e:
raise HTTPException(status_code=400, detail=ErrorResponse(error="Not processing Domain/IP",
message="The input cannot process. Please try again.").dict())
@router.get("/home")
async def get_template(request: Request):
return templates.TemplateResponse("index.html", {"request": request, "domain_info": None, "domain_map": None, "found": True})
@router.get("/result")
async def get_template(request: Request):
found = True
search_domain = request.query_params.get('domain')
domain_info = request.query_params.get('domain_info')
domain_map = request.query_params.get('domain_map')
if domain_info == 'None':
domain_info = eval(domain_info)
domain_map = eval(domain_map)
found = False
else:
decrypted_domain_info_json = cipher_suite.decrypt(domain_info.encode()).decode() if domain_info else None
domain_info = json.loads(decrypted_domain_info_json)
domain_map = cipher_suite.decrypt(domain_map.encode()).decode() if domain_map else None
return templates.TemplateResponse("index.html", {"request": request, "domain": search_domain, "domain_info": domain_info, "domain_map": domain_map, "found": found})
Imports: The code imports the necessary modules and classes, such as
jsonfor working with JSON data,osfor interacting with the operating system, and various classes from the FastAPI framework.Initialization: The code generates a key using the
Fernetclass from thecryptographymodule, which is likely used for encryption and decryption purposes. It also sets up a Jinja2 template environment for rendering HTML templates and creates an instance of theAPIRouterclass from FastAPI.-
Helper Functions:
-
serialize_datetime: This function converts datetime objects or a list of datetime objects to ISO format strings. -
process_and_encrypt_data: This async function takes a domain or IP address as input, validates it using theDomainOrIPschema, retrieves domain information using theDomainLocatorclass, and encrypts the domain information and domain map using theFernetcipher suite.
-
-
API Routes:
-
@router.post("/lookup"): This route accepts a domain or IP address as form data, calls theprocess_and_encrypt_datafunction, and redirects the user to the/resultroute with the encrypted domain information and domain map as query parameters. -
@router.get("/home"): This route renders theindex.htmltemplate without any domain information. -
@router.get("/result"): This route retrieves the domain, encrypted domain information, and encrypted domain map from the query parameters. If the domain information isNone, it sets afoundflag toFalse. Otherwise, it decrypts the domain information and domain map using theFernetcipher suite and renders theindex.htmltemplate with the retrieved data.
-
The code is a part of a larger application that retrieves and processes DNS information for a given domain or IP address.
It uses encryption and decryption techniques to securely transmit and store the domain information and map. The application provides a web interface ( through the index.html template) where users can input a domain or IP address, and the application retrieves and displays the corresponding DNS information.
main.py
from fastapi import FastAPI
from routes import _dnsLookup
from fastapi.staticfiles import StaticFiles
import os
app = FastAPI(
title="DNS Lookup API",
description="API for getting whois information and location of domain or IP",
version="1.0",
docs_url="/docs",
openapi_url="/openapi.json",
contact={
"name": "Tran Vinh Liem",
"email": "riottecboi@gmail.com",
"url": "https://about.riotteboi.com"
}
)
folder = os.getcwd()
app.mount("/static", StaticFiles(directory=folder+"/static", html=True), name="static")
app.include_router(_dnsLookup.router)
-
Imports:
-
FastAPIis imported from thefastapimodule to create the FastAPI application instance. -
_dnsLookupis imported from theroutesmodule, which likely contains the API routes for handling DNS lookup requests. -
StaticFilesis imported fromfastapi.staticfilesto serve static files. -
osis imported to interact with the operating system and get the current working directory.
-
-
FastAPI Application Instance:
- A new
FastAPIinstance is created and assigned to theappvariable. - The application is configured with metadata such as the title, description, version, documentation URLs, and contact information.
- A new
-
Static Files:
- The
StaticFilesclass is used to mount a directory containing static files (e.g., CSS, JavaScript, images) at the/staticURL path. - The
foldervariable stores the current working directory, and the static files are located in thestaticsubdirectory. - The
html=Trueparameter is set to allow serving HTML files from the static directory.
- The
-
Router Inclusion:
- The
include_routermethod is called on theappinstance to include the routes defined in the_dnsLookupmodule. - This means that all the routes defined in
_dnsLookup.routerwill be added to the FastAPI application.
- The
Final — Results
Conclusion
Overall, this application provides a web-based interface and API endpoints for users to look up WHOIS information and geographical location details for domains or IP addresses. The combination of FastAPI, Pydantic, encryption, geocoding, and mapping libraries enables a comprehensive and secure solution for DNS lookup functionality.





Top comments (0)