We need to collect data through API requests sometimes rather than performing data extraction from databases directly due to a couple of reasons such as database authorization restriction, or we like the fact that the complicated data processing logic has been implemented by backend and available through API, avoiding the need to perform ETL on the raw data ourselves. However, due to the huge amount of API requests and the nature of the unreliable network (or server), careful implementation is needed to avoid partial data loss or unexpected response.
Imagine that there is an API that returns a list of employees' information through pagination. There are M rows of data per page and N pages in total.
The first implementation
import requests
url = "www.example.com/employees/"
# Assume you have obtained the total number of pages
total_page = N
result = []
def get_data_from_api(url, page_number):
# a list of employee's data
return requests.get(url, params={'page': total_page})
for i in range(0, total_page+1):
try:
result += get_data_from_api(url, i)
except Exception:
# sleep for a bit in case that helps
time.sleep(1)
# try again
result += get_data_from_api(url, i)
This is not a recommended solution. You put faith on the server that any problem can be solved on the second trial. You can tweak the above solution with a while loop and keep trying, but that is not an elegant solution either. Your code will not stop running when the API server is facing some problems.
Improved Implementation
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
In the code above, we allow the request to retry on response status code of 500, 502 and 504 at most 3 times with a backoff factor of 0.3.
A backoff factor to apply between attempts after the second try (most errors are resolved immediately by a second try without a delay). urllib3 will sleep for: {backoff factor} * (2 ^ ({number of total retries} - 1)) seconds. If the backoff_factor is 0.1, then sleep() will sleep for [0.0s, 0.2s, 0.4s, ...] between retries. It will never be longer than Retry.BACKOFF_MAX. By default, backoff is disabled (set to 0).
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
url = "www.example.com/employees/"
# Assume you have obtained the total number of pages
total_page = N
result = []
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def get_data_from_api(url, page_number):
# a list of employee's data
return requests_retry_session(url, params={'page': total_page})
for i in range(0, total_page+1):
try:
result += get_data_from_api(url, i)
except Exception as e:
print('It failed :(', e.__class__.__name__)
By introducing retries and backoff factor, we will be able to control the situation and deal with it accordingly.
Speed it up!
Firing requests one after another will take a long time. We can make it faster by introducing multithreading and asynchronous programming.
import requests
import concurrent.futures
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
url = "www.example.com/employees/"
# Assume you have obtained the total number of pages
total_page = N
result = []
number_of_workers = W
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def get_data_from_api(url, page_number):
# a list of employee's data
return requests_retry_session(url, params={'page': total_page})
with concurrent.futures.ThreadPoolExecutor(max_workers= number_of_workers) as executor:
future_result = {executor.submit(get_data_from_api, url, i): i for i in range(0, total_page+1)}
try:
for future in concurrent.futures.as_completed(future_result):
result += future.result()
except Exception as e:
print('It failed :(', e.__class__.__name__)
Session in the rescue!
Good. Now we are able to fire multiple requests at the same time asynchronously. However, we have one more issue to solve. In our current implementation, each request will initiate a new TCP connection to the server. Too many TCP connections will slow down the server and also introducing overhead such as TCP 3-Way Handshake Process. In one of my tasks, the excessive amount of connections handicapped the server, causing timeout on half of the requests even after retries!
Introducing Session Object. The Session object allows you to persist certain parameters across requests. It also persists cookies across all requests made from the Session instance and will use urllib3’s connection pooling. So if you’re making several requests to the same host, the underlying TCP connection will be reused, which can result in a significant performance increase.
One of the ideas is to initiate the same amount of sessions with our Thread workers. These workers will reuse their session to perform requests. Note that the requests_retry_session
object is a Session Object.
The ultimate solution
import requests
import concurrent.futures
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
url = "www.example.com/employees/"
# Assume you have obtained the total number of pages
total_page = N
result = []
number_of_workers = W
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def get_data_from_api(session, url, page_number):
# a list of employee's data
return session.get(url, params={'page': page_number})
sessions = [requests_retry_session() for i in range(W)]
with concurrent.futures.ThreadPoolExecutor(max_workers=number_of_workers) as executor:
# We will assign (i modulo W)th session to page i for equal distribution
future_result = {executor.submit(get_data_from_api, sessions[i%W], url, i): i for i in range(0, total_page+1)}
try:
for future in concurrent.futures.as_completed(future_result):
result += future.result()
except Exception as e:
print('It failed :(', e.__class__.__name__)
With the solution in mind, you are now able to make predictable requests, collecting data from API requests concurrently and asynchronously by reusing sessions. Note that it's an opinionated solution but by its existence, it demonstrates how it works so you can adapt it in your own use case.
I improvise the solution based on the proposed implementation from here. Feel free to read more from there. Have a nice day!
Top comments (1)
Thanks for posting this, Mervyn. It's very well written. Kudos!