DEV Community

Cover image for How to profile your multi-threaded python production code programmatically
Or Yaacov
Or Yaacov

Posted on

1 1 1 1 1

How to profile your multi-threaded python production code programmatically

Profiling multi-threaded Python applications can be challenging, especially in production environments. Unfortunately, 90% of the guides don’t explain how to really do it, especially with remote production multi-threaded code and no cli, and they will lead you to use something like python -m cProfile -s time my_app.py <args>
Don't worry, this is not another one of those.

So why is it “hard” to do? You can’t always run your program in production using the command-line interface (CLI), and cProfile only offers limited support for multi-threaded programming.

So, How to do Multi-Threading profiling?

To perform multi-threading profiling, we can use one of the following approaches:

  1. Sum different threads’ stats together to create one set of statistics using stats.add().
  2. Create a separate profiler for each thread.

Since our program is built from 3 different engine threads, I will cover only the second approach in this guide.

So, I created the following utility that allows me to create my threads:

import cProfile
import threading
import time

class ProfilerManager:
    def __init__(self, is_enabled, dump_interval_seconds, profiler_dir_path):
        self._is_enabled = is_enabled
        self._logger = "YOUR LOGGER"
        self._profiler_dir_path = profiler_dir_path
        self._profiler_dump_interval_seconds = dump_interval_seconds

        if self._profiler_dump_interval_seconds <= 0:
            self._logger.warning('invalid _profiler_dump_interval_seconds value {}, setting default'.format(self._profiler_dump_interval_seconds))
            self._profiler_dump_interval_seconds = 60   

        self._profilers = {}

    def thread_with_profiler(self, target, tag):
       if not self._is_enabled:
              return threading.Thread(target=target)

        self._disable_profiler_if_exists(tag)

        profiler = cProfile.Profile()

        self._profilers[tag] = profiler

        return threading.Thread(target=self._target_with_profiler(target, profiler, tag))

    def run(self, target, tag):
        if not self._is_enabled:
            return target

        profiler = cProfile.Profile()

        self._profilers[tag] = profiler

        return self._target_with_profiler(target, profiler, tag)

    def _disable_profiler_if_exists(self, tag):
        if tag in self._profilers:
            self._logger.warning('tag {} already exists, disabling, and overriding with new target'.format(tag))
            existing_profiler = self._profilers[tag]
            self._disable_profiler(existing_profiler, tag)

    def _target_with_profiler(self, target, profiler, tag):
        def target_with_profiler():
            self._enable_profiler(profiler, tag)
            target()
            self._disable_profiler(profiler, tag)

        return target_with_profiler

    def _enable_profiler(self, profiler, tag):
        self._logger.debug('enabling profiler {}'.format(tag))
        profiler.enable()

    def _disable_profiler(self, profiler, tag):
        self._logger.debug('disabling profiler {}'.format(tag))
        profiler.disable()

    def _dump_periodic_profilers(self):
        while True:
            time.sleep(self._profiler_dump_interval_seconds)

            self._logger.trace("dumping profiles")

            for tag, profiler in self._profilers.items():
                dump_file_path = '{}/{}.dump'.format(self._profiler_dir_path, tag)
                self._logger.trace("dumping profile, {}".format(dump_file_path))

                profiler.dump_stats(dump_file_path)
Enter fullscreen mode Exit fullscreen mode

Let’s take a look at the thread_with_profiler use which is similar to threading.Thread the following:

ip_manager_thread = profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='ip_manager')
ip_manager_thread.start()
Enter fullscreen mode Exit fullscreen mode

let’s do a quick dive into thread_with_profiler:

def thread_with_profiler(self, target, tag):
        if not self._is_enabled:
            return threading.Thread(target=target)

        self._disable_profiler_if_exists(tag)

        profiler = cProfile.Profile()

        self._profilers[tag] = profiler

        return threading.Thread(target=self._target_with_profiler(target, profiler, tag))
Enter fullscreen mode Exit fullscreen mode

All that this function does is disable the profiler with the same tag, in case it already exists. It creates a new profiler and a new thread, then runs _target_with_profiler, which is the real magic behind the scenes

def _target_with_profiler(self, target, profiler, tag):
      def target_with_profiler():
          self._enable_profiler(profiler, tag)
          target()
          self._disable_profiler(profiler, tag)

      return target_with_profiler
Enter fullscreen mode Exit fullscreen mode

As we can see, we create and return a new function wrapper that our thread will execute. All that the wrapper does is enable the profiler (start it), call your actual function, and disable it when/if it ends.

Then, we have the following function, which actually dumps the current stats to files by tags every X seconds

    def _dump_periodic_profilers(self):
        while True:
            time.sleep(self._profiler_dump_interval_seconds)

            self._logger.trace("dumping profiles")

            for tag, profiler in self._profilers.items():
                dump_file_path = '{}/{}.dump'.format(self._profiler_dir_path, tag)
                self._logger.trace("dumping profile, {}".format(dump_file_path))

                profiler.dump_stats(dump_file_path)
Enter fullscreen mode Exit fullscreen mode

so, now that we got how this all thing works, let’s see a full use example!

def async_worker1():
  #work

def async_worker2():
  #work

def main_worker():
  #work

def main():
    profiler_manager = ProfilerManager(is_enabled=True, dump_interval_seconds=10, profiler_dir_path="/var/logs")
    profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='async_worker1')
    profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='async_worker2')

   profiler_manager.run(target=self._run_ip_monitor, tag='main_worker')profiler_manager.thread_with_profiler(target=self._run_ip_monitor, tag='ip_manager')

if __name__ == '__main__':
    main()
Enter fullscreen mode Exit fullscreen mode

happy profiling! :)

SurveyJS custom survey software

JavaScript UI Libraries for Surveys and Forms

SurveyJS lets you build a JSON-based form management system that integrates with any backend, giving you full control over your data and no user limits. Includes support for custom question types, skip logic, integrated CCS editor, PDF export, real-time analytics & more.

Learn more

Top comments (0)

SurveyJS custom survey software

JavaScript UI Library for Surveys and Forms

Generate dynamic JSON-driven forms directly in your JavaScript app (Angular, React, Vue.js, jQuery) with a fully customizable drag-and-drop form builder. Easily integrate with any backend system and retain full ownership over your data, with no user or form submission limits.

View demo

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay