Email scraping tools have become an essential component for many businesses and marketing professionals. Whether for prospecting, contact research, or lead generation, these tools automate the collection of contact information from the web. This article presents the fundamental principles of email scraping, with a practical Python example.
What is email scraping?
Email scraping involves automatically extracting email addresses and other contact information (names, phone numbers, etc.) from websites. This technique allows for rapidly building databases of prospects or professional contacts.
Ethical and legal considerations
Before creating or using an email scraper, it's crucial to consider several aspects:
GDPR compliance: In Europe, the General Data Protection Regulation imposes strict restrictions on the collection and use of personal data.
Respect for terms of service of visited websites
Observance of robots.txt files which indicate areas forbidden to scraping
Technical limitations implemented by websites (CAPTCHA, request limits, etc.)
Practical example: an email scraper in Python
Here's a Python code example that illustrates the basic principles of email scraping:
import requests
from bs4 import BeautifulSoup
import re
import csv
import time
import concurrent.futures
from urllib.parse import urlparse, urljoin
import argparse
class EmailScraper:
def __init__(self, max_pages=5, max_depth=2, delay=1, threads=5):
self.max_pages = max_pages # Maximum de pages à explorer par domaine
self.max_depth = max_depth # Profondeur maximale de crawling
self.delay = delay # Délai entre les requêtes
self.threads = threads # Nombre de threads pour le traitement parallèle
self.visited_urls = set() # URLs déjà visitées
# User-Agent aléatoires pour éviter les blocages
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Patterns pour la recherche
self.email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
self.phone_pattern = r'(?:\+\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}'
self.name_pattern = r'(?:Contact|About|Team|Staff).*?(?:<h\d>)(.*?)(?:</h\d>)'
def is_valid_url(self, url, base_domain):
"""Vérifie si l'URL appartient au même domaine"""
try:
parsed_url = urlparse(url)
parsed_base = urlparse(base_domain)
return parsed_url.netloc == parsed_base.netloc or parsed_url.netloc == ''
except:
return False
def get_base_url(self, url):
"""Extrait l'URL de base d'un site"""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"
def normalize_url(self, url, base_url):
"""Normalise les URLs relatives"""
if not url:
return None
if url.startswith(('http://', 'https://')):
return url
return urljoin(base_url, url)
def extract_company_info(self, soup, url):
"""Tente d'extraire le nom de l'entreprise"""
company_name = ""
# Essaie de trouver le nom via le title
if soup.title:
title = soup.title.string
if title:
company_name = title.split('|')[0].split('-')[0].strip()
# Essaie de trouver via les métadonnées
meta_og_site_name = soup.find('meta', property='og:site_name')
if meta_og_site_name and meta_og_site_name.get('content'):
company_name = meta_og_site_name.get('content')
# Si on n'a pas trouvé de nom, utiliser le domaine
if not company_name:
domain = urlparse(url).netloc
company_name = domain.replace('www.', '').split('.')[0].capitalize()
return company_name
def extract_data_from_page(self, url, base_url, depth=0):
"""Extrait les données d'une page et retourne les liens pour crawling"""
if url in self.visited_urls or depth >= self.max_depth:
return [], {}
self.visited_urls.add(url)
print(f"Traitement de: {url}")
try:
response = requests.get(url, headers=self.headers, timeout=10)
if response.status_code != 200:
return [], {}
soup = BeautifulSoup(response.text, 'html.parser')
page_text = soup.get_text()
# Extraction des emails
emails = set(re.findall(self.email_pattern, page_text))
for link in soup.find_all('a'):
href = link.get('href', '')
if 'mailto:' in href:
email = href.split('mailto:')[1].split('?')[0]
emails.add(email)
# Extraction des téléphones
phones = set(re.findall(self.phone_pattern, page_text))
# Extraction des noms (simple - peut être améliorée)
names = []
for contact_section in soup.find_all(['div', 'section'], class_=lambda c: c and (
'contact' in c.lower() or 'team' in c.lower())):
person_elements = contact_section.find_all(['h2', 'h3', 'h4', 'strong'])
for element in person_elements:
if element.text and len(element.text.strip()) < 50: # Éviter les faux positifs
names.append(element.text.strip())
# Si on est sur la page d'accueil, essayer d'extraire le nom de l'entreprise
company_name = ""
if depth == 0 or "about" in url.lower() or "contact" in url.lower():
company_name = self.extract_company_info(soup, url)
# Trouver d'autres liens à explorer
links_to_follow = []
if len(self.visited_urls) < self.max_pages:
for link in soup.find_all('a'):
href = link.get('href')
if href:
normalized_url = self.normalize_url(href, base_url)
if normalized_url and self.is_valid_url(normalized_url,
base_url) and normalized_url not in self.visited_urls:
# Prioriser les pages de contact
if 'contact' in normalized_url.lower():
links_to_follow.insert(0, normalized_url)
else:
links_to_follow.append(normalized_url)
data = {
'url': url,
'company_name': company_name,
'emails': list(emails),
'phones': list(phones),
'names': names
}
return links_to_follow, data
except Exception as e:
print(f"Erreur sur {url}: {e}")
return [], {}
def crawl_website(self, start_url):
"""Fonction principale pour explorer un site web"""
if not start_url.startswith(('http://', 'https://')):
start_url = 'https://' + start_url
base_url = self.get_base_url(start_url)
pages_to_visit = [start_url]
collected_data = []
while pages_to_visit and len(self.visited_urls) < self.max_pages:
current_url = pages_to_visit.pop(0)
links, data = self.extract_data_from_page(current_url, base_url)
if data and (data.get('emails') or data.get('phones') or data.get('names')):
collected_data.append(data)
pages_to_visit.extend(links)
time.sleep(self.delay) # Respecter le délai entre les requêtes
return collected_data
def process_websites(self, websites):
"""Traite une liste de sites web en parallèle"""
all_results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:
future_to_url = {executor.submit(self.crawl_website, url): url for url in websites}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
results = future.result()
if results:
all_results.extend(results)
print(f"Terminé: {url} - {len(results)} page(s) avec des données")
else:
print(f"Aucune donnée trouvée sur: {url}")
except Exception as e:
print(f"Erreur lors du traitement de {url}: {e}")
return all_results
def save_to_csv(self, data, output_file):
"""Sauvegarde les résultats dans un fichier CSV"""
if not data:
print("Aucune donnée à sauvegarder.")
return
try:
with open(output_file, 'w', newline='', encoding='utf-8') as f:
fieldnames = ['company_name', 'url', 'emails', 'phones', 'names']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for item in data:
writer.writerow({
'company_name': item.get('company_name', ''),
'url': item.get('url', ''),
'emails': '; '.join(item.get('emails', [])),
'phones': '; '.join(item.get('phones', [])),
'names': '; '.join(item.get('names', []))
})
print(f"Données sauvegardées dans {output_file}")
except Exception as e:
print(f"Erreur lors de la sauvegarde: {e}")
# Point d'entrée du programme
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Email & Contact Scraper')
parser.add_argument('-i', '--input', help='Fichier contenant la liste des sites (un par ligne)')
parser.add_argument('-o', '--output', default='contacts_scrape_results.csv', help='Fichier de sortie CSV')
parser.add_argument('-p', '--pages', type=int, default=10, help='Nombre maximum de pages à explorer par site')
parser.add_argument('-d', '--depth', type=int, default=2, help='Profondeur maximale de crawling')
parser.add_argument('-t', '--threads', type=int, default=5, help='Nombre de threads pour le traitement parallèle')
parser.add_argument('-w', '--websites', nargs='+', help='Liste de sites à analyser')
args = parser.parse_args()
websites = []
if args.input:
with open(args.input, 'r') as f:
websites = [line.strip() for line in f if line.strip()]
elif args.websites:
websites = args.websites
else:
websites = input("Entrez les URLs des sites à analyser (séparées par des espaces): ").split()
if not websites:
print("Aucun site web à analyser.")
exit(1)
scraper = EmailScraper(max_pages=args.pages, max_depth=args.depth, threads=args.threads)
results = scraper.process_websites(websites)
scraper.save_to_csv(results, args.output)
Top comments (0)