DEV Community

Brad
Brad

Posted on

Python API Client: Connect to Any REST API in 5 Minutes

Python API Client: Connect to Any REST API in 5 Minutes

Every web service has an API. Here's how to build a robust Python API client that handles auth, retries, rate limiting, and pagination.

Base API Client Class

import requests
import time
import logging
from typing import Any, Optional, Dict
from urllib.parse import urljoin

logger = logging.getLogger(__name__)

class APIClient:
    """Reusable REST API client with auth, retry, and rate limiting."""

    def __init__(
        self,
        base_url: str,
        api_key: str = None,
        timeout: int = 30,
        max_retries: int = 3,
        rate_limit_per_second: float = 10
    ):
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        self.max_retries = max_retries
        self.min_interval = 1.0 / rate_limit_per_second
        self._last_request_time = 0

        self.session = requests.Session()

        if api_key:
            self.session.headers.update({
                'Authorization': f'Bearer {api_key}',
                'Content-Type': 'application/json'
            })

    def _wait_for_rate_limit(self):
        elapsed = time.time() - self._last_request_time
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        self._last_request_time = time.time()

    def _request(
        self,
        method: str,
        endpoint: str,
        **kwargs
    ) -> dict:
        """Make an HTTP request with retry logic."""

        url = f"{self.base_url}/{endpoint.lstrip('/')}"

        for attempt in range(self.max_retries):
            try:
                self._wait_for_rate_limit()

                response = self.session.request(
                    method,
                    url,
                    timeout=self.timeout,
                    **kwargs
                )

                # Handle rate limiting
                if response.status_code == 429:
                    retry_after = int(response.headers.get('Retry-After', 60))
                    logger.warning(f"Rate limited. Waiting {retry_after}s...")
                    time.sleep(retry_after)
                    continue

                # Handle server errors with retry
                if response.status_code >= 500:
                    if attempt < self.max_retries - 1:
                        wait_time = 2 ** attempt
                        logger.warning(f"Server error {response.status_code}. Retrying in {wait_time}s...")
                        time.sleep(wait_time)
                        continue

                response.raise_for_status()

                return response.json() if response.content else {}

            except requests.exceptions.ConnectionError:
                if attempt < self.max_retries - 1:
                    time.sleep(2 ** attempt)
                    continue
                raise

        raise Exception(f"Max retries exceeded for {url}")

    def get(self, endpoint: str, params: dict = None) -> dict:
        return self._request('GET', endpoint, params=params)

    def post(self, endpoint: str, data: dict = None, json: dict = None) -> dict:
        return self._request('POST', endpoint, data=data, json=json)

    def put(self, endpoint: str, json: dict = None) -> dict:
        return self._request('PUT', endpoint, json=json)

    def delete(self, endpoint: str) -> dict:
        return self._request('DELETE', endpoint)

    def paginate(
        self,
        endpoint: str,
        page_param: str = 'page',
        per_page: int = 100
    ):
        """Yield all items from a paginated endpoint."""

        page = 1
        while True:
            data = self.get(endpoint, params={page_param: page, 'per_page': per_page})

            # Handle different pagination formats
            if isinstance(data, list):
                items = data
            elif 'data' in data:
                items = data['data']
            elif 'results' in data:
                items = data['results']
            elif 'items' in data:
                items = data['items']
            else:
                items = [data]

            if not items:
                break

            yield from items

            # Check if there are more pages
            if isinstance(data, dict):
                total_pages = data.get('total_pages') or data.get('pages')
                if total_pages and page >= total_pages:
                    break

                has_next = data.get('has_next') or data.get('next_page')
                if has_next is False:
                    break

            if len(items) < per_page:
                break

            page += 1
Enter fullscreen mode Exit fullscreen mode

Real Example: GitHub API Client

class GitHubClient(APIClient):
    def __init__(self, token: str):
        super().__init__(
            base_url="https://api.github.com",
            api_key=token,
            rate_limit_per_second=1  # GitHub: 5000 requests/hour
        )
        self.session.headers.update({
            'Accept': 'application/vnd.github.v3+json'
        })

    def get_repos(self, username: str) -> list:
        """Get all repositories for a user."""
        return list(self.paginate(f"/users/{username}/repos"))

    def get_issues(self, repo: str, state: str = 'open') -> list:
        """Get all issues for a repository."""
        return list(self.paginate(f"/repos/{repo}/issues", per_page=100))

    def create_issue(self, repo: str, title: str, body: str, labels: list = None) -> dict:
        """Create a new issue."""
        return self.post(f"/repos/{repo}/issues", json={
            'title': title,
            'body': body,
            'labels': labels or []
        })

    def get_pull_requests(self, repo: str, state: str = 'open') -> list:
        """Get all pull requests."""
        return list(self.paginate(
            f"/repos/{repo}/pulls",
            per_page=100
        ))

# Usage
github = GitHubClient(token="your_token_here")
repos = github.get_repos("lukassbrad")
print(f"Found {len(repos)} repos")

issues = github.get_issues("myuser/myrepo")
for issue in issues:
    print(f"#{issue['number']}: {issue['title']}")
Enter fullscreen mode Exit fullscreen mode

Handle OAuth 2.0

class OAuth2Client(APIClient):
    def __init__(self, base_url: str, client_id: str, client_secret: str):
        super().__init__(base_url)
        self.client_id = client_id
        self.client_secret = client_secret
        self._access_token = None
        self._token_expires_at = 0

    def _get_access_token(self) -> str:
        """Get or refresh the access token."""

        if self._access_token and time.time() < self._token_expires_at - 60:
            return self._access_token

        response = requests.post(f"{self.base_url}/oauth/token", data={
            'grant_type': 'client_credentials',
            'client_id': self.client_id,
            'client_secret': self.client_secret
        })
        response.raise_for_status()

        token_data = response.json()
        self._access_token = token_data['access_token']
        self._token_expires_at = time.time() + token_data.get('expires_in', 3600)

        return self._access_token

    def _request(self, method: str, endpoint: str, **kwargs) -> dict:
        # Automatically add/refresh token
        token = self._get_access_token()
        self.session.headers['Authorization'] = f'Bearer {token}'
        return super()._request(method, endpoint, **kwargs)
Enter fullscreen mode Exit fullscreen mode

Want More API Integration Scripts?

This client library is part of my Python automation toolkit.

👉 Get 50+ ready-to-use Python scripts — API clients, web scrapers, data processors, email automators, and more.

Build integrations in minutes, not hours.

Top comments (0)