DEV Community

Dominique Megnidro
Dominique Megnidro

Posted on

Email Scraping : Techniques et Considérations Éthiques avec Python

Les outils de scraping d'emails sont devenus un élément essentiel pour de nombreuses entreprises et professionnels du marketing. Que ce soit pour la prospection, la recherche de contacts ou la génération de leads, ces outils permettent d'automatiser la collecte d'informations de contact sur le web. Cet article vous présente les principes fondamentaux du scraping d'emails, avec un exemple pratique en Python.
Qu'est-ce que le scraping d'emails ?
Le scraping d'emails consiste à extraire automatiquement des adresses email et autres informations de contact (noms, numéros de téléphone, etc.) à partir de sites web. Cette technique permet de constituer rapidement des bases de données de prospects ou de contacts professionnels.
Considérations éthiques et légales
Avant de vous lancer dans la création ou l'utilisation d'un scraper d'emails, il est crucial de prendre en compte plusieurs aspects :

Respect du RGPD : En Europe, le Règlement Général sur la Protection des Données impose des restrictions strictes sur la collecte et l'utilisation de données personnelles.
Respect des conditions d'utilisation des sites web visités
Respect du fichier robots.txt qui indique les zones interdites au scraping
Limitations techniques mises en place par les sites (CAPTCHA, limite de requêtes, etc.)

Exemple pratique : un scraper d'emails en Python
Voici un exemple de code Python qui illustre les principes de base du scraping d'emails :

import requests
from bs4 import BeautifulSoup
import re
import csv
import time
import concurrent.futures
from urllib.parse import urlparse, urljoin
import argparse


class EmailScraper:
    def __init__(self, max_pages=5, max_depth=2, delay=1, threads=5):
        self.max_pages = max_pages  # Maximum de pages à explorer par domaine
        self.max_depth = max_depth  # Profondeur maximale de crawling
        self.delay = delay  # Délai entre les requêtes
        self.threads = threads  # Nombre de threads pour le traitement parallèle
        self.visited_urls = set()  # URLs déjà visitées

        # User-Agent aléatoires pour éviter les blocages
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }

        # Patterns pour la recherche
        self.email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
        self.phone_pattern = r'(?:\+\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}'
        self.name_pattern = r'(?:Contact|About|Team|Staff).*?(?:<h\d>)(.*?)(?:</h\d>)'

    def is_valid_url(self, url, base_domain):
        """Vérifie si l'URL appartient au même domaine"""
        try:
            parsed_url = urlparse(url)
            parsed_base = urlparse(base_domain)
            return parsed_url.netloc == parsed_base.netloc or parsed_url.netloc == ''
        except:
            return False

    def get_base_url(self, url):
        """Extrait l'URL de base d'un site"""
        parsed = urlparse(url)
        return f"{parsed.scheme}://{parsed.netloc}"

    def normalize_url(self, url, base_url):
        """Normalise les URLs relatives"""
        if not url:
            return None
        if url.startswith(('http://', 'https://')):
            return url
        return urljoin(base_url, url)

    def extract_company_info(self, soup, url):
        """Tente d'extraire le nom de l'entreprise"""
        company_name = ""

        # Essaie de trouver le nom via le title
        if soup.title:
            title = soup.title.string
            if title:
                company_name = title.split('|')[0].split('-')[0].strip()

        # Essaie de trouver via les métadonnées
        meta_og_site_name = soup.find('meta', property='og:site_name')
        if meta_og_site_name and meta_og_site_name.get('content'):
            company_name = meta_og_site_name.get('content')

        # Si on n'a pas trouvé de nom, utiliser le domaine
        if not company_name:
            domain = urlparse(url).netloc
            company_name = domain.replace('www.', '').split('.')[0].capitalize()

        return company_name

    def extract_data_from_page(self, url, base_url, depth=0):
        """Extrait les données d'une page et retourne les liens pour crawling"""
        if url in self.visited_urls or depth >= self.max_depth:
            return [], {}

        self.visited_urls.add(url)
        print(f"Traitement de: {url}")

        try:
            response = requests.get(url, headers=self.headers, timeout=10)
            if response.status_code != 200:
                return [], {}

            soup = BeautifulSoup(response.text, 'html.parser')
            page_text = soup.get_text()

            # Extraction des emails
            emails = set(re.findall(self.email_pattern, page_text))
            for link in soup.find_all('a'):
                href = link.get('href', '')
                if 'mailto:' in href:
                    email = href.split('mailto:')[1].split('?')[0]
                    emails.add(email)

            # Extraction des téléphones
            phones = set(re.findall(self.phone_pattern, page_text))

            # Extraction des noms (simple - peut être améliorée)
            names = []
            for contact_section in soup.find_all(['div', 'section'], class_=lambda c: c and (
                    'contact' in c.lower() or 'team' in c.lower())):
                person_elements = contact_section.find_all(['h2', 'h3', 'h4', 'strong'])
                for element in person_elements:
                    if element.text and len(element.text.strip()) < 50:  # Éviter les faux positifs
                        names.append(element.text.strip())

            # Si on est sur la page d'accueil, essayer d'extraire le nom de l'entreprise
            company_name = ""
            if depth == 0 or "about" in url.lower() or "contact" in url.lower():
                company_name = self.extract_company_info(soup, url)

            # Trouver d'autres liens à explorer
            links_to_follow = []
            if len(self.visited_urls) < self.max_pages:
                for link in soup.find_all('a'):
                    href = link.get('href')
                    if href:
                        normalized_url = self.normalize_url(href, base_url)
                        if normalized_url and self.is_valid_url(normalized_url,
                                                                base_url) and normalized_url not in self.visited_urls:
                            # Prioriser les pages de contact
                            if 'contact' in normalized_url.lower():
                                links_to_follow.insert(0, normalized_url)
                            else:
                                links_to_follow.append(normalized_url)

            data = {
                'url': url,
                'company_name': company_name,
                'emails': list(emails),
                'phones': list(phones),
                'names': names
            }

            return links_to_follow, data

        except Exception as e:
            print(f"Erreur sur {url}: {e}")
            return [], {}

    def crawl_website(self, start_url):
        """Fonction principale pour explorer un site web"""
        if not start_url.startswith(('http://', 'https://')):
            start_url = 'https://' + start_url

        base_url = self.get_base_url(start_url)
        pages_to_visit = [start_url]
        collected_data = []

        while pages_to_visit and len(self.visited_urls) < self.max_pages:
            current_url = pages_to_visit.pop(0)
            links, data = self.extract_data_from_page(current_url, base_url)

            if data and (data.get('emails') or data.get('phones') or data.get('names')):
                collected_data.append(data)

            pages_to_visit.extend(links)
            time.sleep(self.delay)  # Respecter le délai entre les requêtes

        return collected_data

    def process_websites(self, websites):
        """Traite une liste de sites web en parallèle"""
        all_results = []

        with concurrent.futures.ThreadPoolExecutor(max_workers=self.threads) as executor:
            future_to_url = {executor.submit(self.crawl_website, url): url for url in websites}
            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    results = future.result()
                    if results:
                        all_results.extend(results)
                        print(f"Terminé: {url} - {len(results)} page(s) avec des données")
                    else:
                        print(f"Aucune donnée trouvée sur: {url}")
                except Exception as e:
                    print(f"Erreur lors du traitement de {url}: {e}")

        return all_results

    def save_to_csv(self, data, output_file):
        """Sauvegarde les résultats dans un fichier CSV"""
        if not data:
            print("Aucune donnée à sauvegarder.")
            return

        try:
            with open(output_file, 'w', newline='', encoding='utf-8') as f:
                fieldnames = ['company_name', 'url', 'emails', 'phones', 'names']
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                writer.writeheader()

                for item in data:
                    writer.writerow({
                        'company_name': item.get('company_name', ''),
                        'url': item.get('url', ''),
                        'emails': '; '.join(item.get('emails', [])),
                        'phones': '; '.join(item.get('phones', [])),
                        'names': '; '.join(item.get('names', []))
                    })

            print(f"Données sauvegardées dans {output_file}")
        except Exception as e:
            print(f"Erreur lors de la sauvegarde: {e}")


# Point d'entrée du programme
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Email & Contact Scraper')
    parser.add_argument('-i', '--input', help='Fichier contenant la liste des sites (un par ligne)')
    parser.add_argument('-o', '--output', default='contacts_scrape_results.csv', help='Fichier de sortie CSV')
    parser.add_argument('-p', '--pages', type=int, default=10, help='Nombre maximum de pages à explorer par site')
    parser.add_argument('-d', '--depth', type=int, default=2, help='Profondeur maximale de crawling')
    parser.add_argument('-t', '--threads', type=int, default=5, help='Nombre de threads pour le traitement parallèle')
    parser.add_argument('-w', '--websites', nargs='+', help='Liste de sites à analyser')

    args = parser.parse_args()

    websites = []
    if args.input:
        with open(args.input, 'r') as f:
            websites = [line.strip() for line in f if line.strip()]
    elif args.websites:
        websites = args.websites
    else:
        websites = input("Entrez les URLs des sites à analyser (séparées par des espaces): ").split()

    if not websites:
        print("Aucun site web à analyser.")
        exit(1)

    scraper = EmailScraper(max_pages=args.pages, max_depth=args.depth, threads=args.threads)
    results = scraper.process_websites(websites)
    scraper.save_to_csv(results, args.output)
Enter fullscreen mode Exit fullscreen mode

Ce script est beaucoup plus avancé et vous permet de :

Traiter plusieurs sites en masse soit via un fichier d'entrée, soit en ligne de commande
Explorer automatiquement les sites en profondeur (crawling)
Extraire davantage d'informations : emails, numéros de téléphone, noms potentiels, nom de l'entreprise
Utiliser le multithreading pour accélérer le processus
Exporter les résultats dans un fichier CSV

Top comments (0)