DEV Community

Dominique Megnidro
Dominique Megnidro

Posted on

Email Scraping: Techniques and Ethical Considerations with Python


Enter fullscreen mode Exit fullscreen mode

Email scraping tools have become an essential component for many businesses and marketing professionals. Whether for prospecting, contact research, or lead generation, these tools automate the collection of contact information from the web. This article presents the fundamental principles of email scraping, with a practical Python example.
What is email scraping?
Email scraping involves automatically extracting email addresses and other contact information (names, phone numbers, etc.) from websites. This technique allows for rapidly building databases of prospects or professional contacts.
Ethical and legal considerations
Before creating or using an email scraper, it's crucial to consider several aspects:

GDPR compliance: In Europe, the General Data Protection Regulation imposes strict restrictions on the collection and use of personal data.
Respect for terms of service of visited websites
Observance of robots.txt files which indicate areas forbidden to scraping
Technical limitations implemented by websites (CAPTCHA, request limits, etc.)

Practical example: an email scraper in Python
Here's a Python code example that illustrates the basic principles of email scraping:

import requests
from bs4 import BeautifulSoup
import re
import csv
import time
import concurrent.futures
from urllib.parse import urlparse, urljoin
import argparse


class EmailScraper:
    def __init__(self, max_pages=5, max_depth=2, delay=1, threads=5):
        self.max_pages = max_pages  # Maximum de pages à explorer par domaine
        self.max_depth = max_depth  # Profondeur maximale de crawling
        self.delay = delay  # Délai entre les requêtes
        self.threads = threads  # Nombre de threads pour le traitement parallèle
        self.visited_urls = set()  # URLs déjà visitées

        # User-Agent aléatoires pour éviter les blocages
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }

        # Patterns pour la recherche
        self.email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
        self.phone_pattern = r'(?:\+\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}'
        self.name_pattern = r'(?:Contact|About|Team|Staff).*?(?:<h\d>)(.*?)(?:</h\d>)'

    def is_valid_url(self, url, base_domain):
        """Vérifie si l'URL appartient au même domaine"""
        try:
            parsed_url = urlparse(url)
            parsed_base = urlparse(base_domain)
            return parsed_url.netloc == parsed_base.netloc or parsed_url.netloc == ''
        except:
            return False

    def get_base_url(self, url):
        """Extrait l'URL de base d'un site"""
        parsed = urlparse(url)
        return f"{parsed.scheme}://{parsed.netloc}"

    def normalize_url(self, url, base_url):
        """Normalise les URLs relatives"""
        if not url:
            return None
        if url.startswith(('http://', 'https://')):
            return url
        return urljoin(base_url, url)

    def extract_company_info(self, soup, url):
        """Tente d'extraire le nom de l'entreprise"""
        company_name = ""

        # Essaie de trouver le nom via le title
        if soup.title:
            title = soup.title.string
            if title:
                company_name = title.split('|')[0].split('-')[0].strip()

        # Essaie de trouver via les métadonnées
        meta_og_site_name = soup.find('meta', property='og:site_name')
        if meta_og_site_name and meta_og_site_name.get('content'):
            company_name = meta_og_site_name.get('content')

        # Si on n'a pas trouvé de nom, utiliser le domaine
        if not company_name:
            domain = urlparse(url).netloc
            company_name = domain.replace('www.', '').split('.')[0].capitalize()

        return company_name

    def extract_data_from_page(self, url, base_url, depth=0):
        """Extrait les données d'une page et retourne les liens pour crawling"""
        if url in self.visited_urls or depth >= self.max_depth:
            return [], {}

        self.visited_urls.add(url)
        print(f"Traitement de: {url}")

        try:
            response = requests.get(url, headers=self.headers, timeout=10)
            if response.status_code != 200:
                return [], {}

            soup = BeautifulSoup(response.text, 'html.parser')
            page_text = soup.get_text()

            # Extraction des emails
            emails = set(re.findall(self.email_pattern, page_text))
            for link in soup.find_all('a'):
                href = link.get('href', '')
                if 'mailto:' in href:
                    email = href.split('mailto:')[1].split('?')[0]
                    emails.add(email)

            # Extraction des téléphones
            phones = set(re.findall(self.phone_pattern, page_text))

            # Extraction des noms (simple - peut être améliorée)
            names = []
            for contact_section in soup.find_all(['div', 'section'], class_=lambda c: c and (
                    'contact' in c.lower() or 'team' in c.lower())):
                person_elements = contact_section.find_all(['h2', 'h3', 'h4', 'strong'])
                for element in person_elements:
                    if element.text and len(element.text.strip()) < 50:  # Éviter les faux positifs
                        names.append(element.text.strip())

            # Si on est sur la page d'accueil, essayer d'extraire le nom de l'entreprise
            company_name = ""
            if depth == 0 or "about" in url.lower() or "contact" in url.lower():
                company_name = self.extract_company_info(soup, url)

            # Trouver d'autres liens à explorer
            links_to_follow = []
            if len(self.visited_urls) < self.max_pages:
                for link in soup.find_all('a'):
                    href = link.get('href')
                    if href:
                        normalized_url = self.normalize_url(href, base_url)
                        if normalized_url and self.is_valid_url(normalized_url,
                                                                base_url) and normalized_url not in self.visited_urls:
                            # Prioriser les pages de contact
                            if 'contact' in normalized_url.lower():
                                links_to_follow.insert(0, normalized_url)
                            else:
                                links_to_follow.append(normalized_url)

            data = {
                'url': url,
                'company_name': company_name,
                'emails': list(emails),
                'phones': list(phones),
                'names': names
            }

            return links_to_follow, data

        except Exception as e:
            print(f"Erreur sur {url}: {e}")
            return [], {}

    def crawl_website(self, start_url):
        """Fonction principale pour explorer un site web"""
        if not start_url.startswith(('http://', 'https://')):
            start_url = 'https://' + start_url

        base_url = self.get_base_url(start_url)
        pages_to_visit = [start_url]
        collected_data = []

        while pages_to_visit and len(self.visited_urls) < self.max_pages:
            current_url = pages_to_visit.pop(0)
            links, data = self.extract_data_from_page(current_url, base_url)

            if data and (data.get('emails') or data.get('phones') or data.get('names')):
                collected_data.append(data)

            pages_to_visit.extend(links)
            time.sleep(self.delay)  # Respecter le délai entre les requêtes

        return collected_data

    def process_websites(self, websites):
        """Traite une liste de sites web en parallèle"""
        all_results = []

        with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:
            future_to_url = {executor.submit(self.crawl_website, url): url for url in websites}
            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    results = future.result()
                    if results:
                        all_results.extend(results)
                        print(f"Terminé: {url} - {len(results)} page(s) avec des données")
                    else:
                        print(f"Aucune donnée trouvée sur: {url}")
                except Exception as e:
                    print(f"Erreur lors du traitement de {url}: {e}")

        return all_results

    def save_to_csv(self, data, output_file):
        """Sauvegarde les résultats dans un fichier CSV"""
        if not data:
            print("Aucune donnée à sauvegarder.")
            return

        try:
            with open(output_file, 'w', newline='', encoding='utf-8') as f:
                fieldnames = ['company_name', 'url', 'emails', 'phones', 'names']
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                writer.writeheader()

                for item in data:
                    writer.writerow({
                        'company_name': item.get('company_name', ''),
                        'url': item.get('url', ''),
                        'emails': '; '.join(item.get('emails', [])),
                        'phones': '; '.join(item.get('phones', [])),
                        'names': '; '.join(item.get('names', []))
                    })

            print(f"Données sauvegardées dans {output_file}")
        except Exception as e:
            print(f"Erreur lors de la sauvegarde: {e}")


# Point d'entrée du programme
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Email & Contact Scraper')
    parser.add_argument('-i', '--input', help='Fichier contenant la liste des sites (un par ligne)')
    parser.add_argument('-o', '--output', default='contacts_scrape_results.csv', help='Fichier de sortie CSV')
    parser.add_argument('-p', '--pages', type=int, default=10, help='Nombre maximum de pages à explorer par site')
    parser.add_argument('-d', '--depth', type=int, default=2, help='Profondeur maximale de crawling')
    parser.add_argument('-t', '--threads', type=int, default=5, help='Nombre de threads pour le traitement parallèle')
    parser.add_argument('-w', '--websites', nargs='+', help='Liste de sites à analyser')

    args = parser.parse_args()

    websites = []
    if args.input:
        with open(args.input, 'r') as f:
            websites = [line.strip() for line in f if line.strip()]
    elif args.websites:
        websites = args.websites
    else:
        websites = input("Entrez les URLs des sites à analyser (séparées par des espaces): ").split()

    if not websites:
        print("Aucun site web à analyser.")
        exit(1)

    scraper = EmailScraper(max_pages=args.pages, max_depth=args.depth, threads=args.threads)
    results = scraper.process_websites(websites)
    scraper.save_to_csv(results, args.output)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)

👋 Kindness is contagious

DEV shines when you're signed in, unlocking a customized experience with features like dark mode!

Okay