DEV Community

monty5811
monty5811

Posted on • Originally published at deanmontgomery.com on

3 3

rich progress and multiprocessing

How to use rich with python’s multiprocessing:

  • What is this?
    • Track progress of long running tasks when using multiprocessing
  • Why would you want to do this?
    • When you are doing lots of things with multiprocessing and each task can take a long time - feedback makes it easier to see that things are actually happening

How to do it:

import multiprocessing
import random
from concurrent.futures import ProcessPoolExecutor
from time import sleep

from rich import progress

def long_running_fn(progress, task_id):
    len_of_task = random.randint(3, 20) # take some random length of time
    for n in range(0, len_of_task):
        sleep(1) # sleep for a bit to simulate work
        progress[task_id] = {"progress": n + 1, "total": len_of_task}

if __name__ == " __main__":
    n_workers = 8 # set this to the number of cores you have on your machine

    with progress.Progress(
        "[progress.description]{task.description}",
        progress.BarColumn(),
        "[progress.percentage]{task.percentage:>3.0f}%",
        progress.TimeRemainingColumn(),
        progress.TimeElapsedColumn(),
        refresh_per_second=1, # bit slower updates
    ) as progress:
        futures = [] # keep track of the jobs
        with multiprocessing.Manager() as manager:
            # this is the key - we share some state between our 
            # main process and our worker functions
            _progress = manager.dict()
            overall_progress_task = progress.add_task("[green]All jobs progress:")

            with ProcessPoolExecutor(max_workers=n_workers) as executor:
                for n in range(0, 20): # iterate over the jobs we need to run
                    # set visible false so we don't have a lot of bars all at once:
                    task_id = progress.add_task(f"task {n}", visible=False)
                    futures.append(executor.submit(long_running_fn, _progress, task_id))

                # monitor the progress:
                while (n_finished := sum([future.done() for future in futures])) < len(
                    futures
                ):
                    progress.update(
                        overall_progress_task, completed=n_finished, total=len(futures)
                    )
                    for task_id, update_data in _progress.items():
                        latest = update_data["progress"]
                        total = update_data["total"]
                        # update the progress bar for this task:
                        progress.update(
                            task_id,
                            completed=latest,
                            total=total,
                            visible=latest < total,
                        )

                # raise any errors:
                for future in futures:
                    future.result()

Enter fullscreen mode Exit fullscreen mode

Sentry image

Hands-on debugging session: instrument, monitor, and fix

Join Lazar for a hands-on session where you’ll build it, break it, debug it, and fix it. You’ll set up Sentry, track errors, use Session Replay and Tracing, and leverage some good ol’ AI to find and fix issues fast.

RSVP here →

Top comments (0)

Billboard image

Create up to 10 Postgres Databases on Neon's free plan.

If you're starting a new project, Neon has got your databases covered. No credit cards. No trials. No getting in your way.

Try Neon for Free →

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay