Profiling multi-threaded Python applications can be challenging, especially in production environments. Unfortunately, 90% of the guides don’t explain how to really do it, especially with remote production multi-threaded code and no cli, and they will lead you to use something like python -m cProfile -s time my_app.py <args>
Don't worry, this is not another one of those.
So why is it “hard” to do? You can’t always run your program in production using the command-line interface (CLI), and cProfile only offers limited support for multi-threaded programming.
So, How to do Multi-Threading profiling?
To perform multi-threading profiling, we can use one of the following approaches:
- Sum different threads’ stats together to create one set of statistics using
stats.add()
. - Create a separate profiler for each thread.
Since our program is built from 3 different engine threads, I will cover only the second approach in this guide.
So, I created the following utility that allows me to create my threads:
import cProfile
import threading
import time
class ProfilerManager:
def __init__(self, is_enabled, dump_interval_seconds, profiler_dir_path):
self._is_enabled = is_enabled
self._logger = "YOUR LOGGER"
self._profiler_dir_path = profiler_dir_path
self._profiler_dump_interval_seconds = dump_interval_seconds
if self._profiler_dump_interval_seconds <= 0:
self._logger.warning('invalid _profiler_dump_interval_seconds value {}, setting default'.format(self._profiler_dump_interval_seconds))
self._profiler_dump_interval_seconds = 60
self._profilers = {}
def thread_with_profiler(self, target, tag):
if not self._is_enabled:
return threading.Thread(target=target)
self._disable_profiler_if_exists(tag)
profiler = cProfile.Profile()
self._profilers[tag] = profiler
return threading.Thread(target=self._target_with_profiler(target, profiler, tag))
def run(self, target, tag):
if not self._is_enabled:
return target
profiler = cProfile.Profile()
self._profilers[tag] = profiler
return self._target_with_profiler(target, profiler, tag)
def _disable_profiler_if_exists(self, tag):
if tag in self._profilers:
self._logger.warning('tag {} already exists, disabling, and overriding with new target'.format(tag))
existing_profiler = self._profilers[tag]
self._disable_profiler(existing_profiler, tag)
def _target_with_profiler(self, target, profiler, tag):
def target_with_profiler():
self._enable_profiler(profiler, tag)
target()
self._disable_profiler(profiler, tag)
return target_with_profiler
def _enable_profiler(self, profiler, tag):
self._logger.debug('enabling profiler {}'.format(tag))
profiler.enable()
def _disable_profiler(self, profiler, tag):
self._logger.debug('disabling profiler {}'.format(tag))
profiler.disable()
def _dump_periodic_profilers(self):
while True:
time.sleep(self._profiler_dump_interval_seconds)
self._logger.trace("dumping profiles")
for tag, profiler in self._profilers.items():
dump_file_path = '{}/{}.dump'.format(self._profiler_dir_path, tag)
self._logger.trace("dumping profile, {}".format(dump_file_path))
profiler.dump_stats(dump_file_path)
Let’s take a look at the thread_with_profiler
use which is similar to threading.Thread
the following:
ip_manager_thread = profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='ip_manager')
ip_manager_thread.start()
let’s do a quick dive into thread_with_profiler
:
def thread_with_profiler(self, target, tag):
if not self._is_enabled:
return threading.Thread(target=target)
self._disable_profiler_if_exists(tag)
profiler = cProfile.Profile()
self._profilers[tag] = profiler
return threading.Thread(target=self._target_with_profiler(target, profiler, tag))
All that this function does is disable the profiler with the same tag, in case it already exists. It creates a new profiler and a new thread, then runs _target_with_profiler
, which is the real magic behind the scenes
def _target_with_profiler(self, target, profiler, tag):
def target_with_profiler():
self._enable_profiler(profiler, tag)
target()
self._disable_profiler(profiler, tag)
return target_with_profiler
As we can see, we create and return a new function wrapper that our thread will execute. All that the wrapper does is enable the profiler (start it), call your actual function, and disable it when/if it ends.
Then, we have the following function, which actually dumps the current stats to files by tags every X seconds
def _dump_periodic_profilers(self):
while True:
time.sleep(self._profiler_dump_interval_seconds)
self._logger.trace("dumping profiles")
for tag, profiler in self._profilers.items():
dump_file_path = '{}/{}.dump'.format(self._profiler_dir_path, tag)
self._logger.trace("dumping profile, {}".format(dump_file_path))
profiler.dump_stats(dump_file_path)
so, now that we got how this all thing works, let’s see a full use example!
def async_worker1():
#work
def async_worker2():
#work
def main_worker():
#work
def main():
profiler_manager = ProfilerManager(is_enabled=True, dump_interval_seconds=10, profiler_dir_path="/var/logs")
profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='async_worker1')
profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='async_worker2')
profiler_manager.run(target=self._run_ip_monitor, tag='main_worker')profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='ip_manager')
if __name__ == '__main__':
main()
happy profiling! :)
Top comments (0)