<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Roman</title>
    <description>The latest articles on DEV Community by Roman (@romank).</description>
    <link>https://dev.to/romank</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1979754%2Ff2dcee95-ae35-48c3-8d0d-201eaff54327.png</url>
      <title>DEV Community: Roman</title>
      <link>https://dev.to/romank</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/romank"/>
    <language>en</language>
    <item>
      <title>Processing DAGs with Async Python and graphlib</title>
      <dc:creator>Roman</dc:creator>
      <pubDate>Mon, 26 Aug 2024 20:08:16 +0000</pubDate>
      <link>https://dev.to/romank/processing-dags-with-async-python-and-graphlib-2c0g</link>
      <guid>https://dev.to/romank/processing-dags-with-async-python-and-graphlib-2c0g</guid>
      <description>&lt;p&gt;I recently came across an interesting module in Python's bottomless standard library: &lt;a href="https://docs.python.org/3/library/graphlib.html" rel="noopener noreferrer"&gt;graphlib&lt;/a&gt;. If you haven't worked with it before, it's a small utility that was added in Python 3.9 and only implements one class: &lt;a href="https://docs.python.org/3.9/library/graphlib.html#graphlib.TopologicalSorter" rel="noopener noreferrer"&gt;TopologicalSorter&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The name is self-explanatory -- this is a class for topological sorting of a graph. But I don't think it was originally written with just sorting in mind, since it has rather cryptic, but incredibly useful API, such as &lt;code&gt;prepare()&lt;/code&gt; method or &lt;code&gt;is_active()&lt;/code&gt;. This example in the documentation hints on the motivation behind it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;topological_sorter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;TopologicalSorter&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# Add nodes to 'topological_sorter'...
&lt;/span&gt;
&lt;span class="n"&gt;topological_sorter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;prepare&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;topological_sorter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;is_active&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;node&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;topological_sorter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_ready&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="c1"&gt;# Worker threads or processes take nodes to work on off the
&lt;/span&gt;        &lt;span class="c1"&gt;# 'task_queue' queue.
&lt;/span&gt;        &lt;span class="n"&gt;task_queue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;put&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;node&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# When the work for a node is done, workers put the node in
&lt;/span&gt;    &lt;span class="c1"&gt;# 'finalized_tasks_queue' so we can get more nodes to work on.
&lt;/span&gt;    &lt;span class="c1"&gt;# The definition of 'is_active()' guarantees that, at this point, at
&lt;/span&gt;    &lt;span class="c1"&gt;# least one node has been placed on 'task_queue' that hasn't yet
&lt;/span&gt;    &lt;span class="c1"&gt;# been passed to 'done()', so this blocking 'get()' must (eventually)
&lt;/span&gt;    &lt;span class="c1"&gt;# succeed.  After calling 'done()', we loop back to call 'get_ready()'
&lt;/span&gt;    &lt;span class="c1"&gt;# again, so put newly freed nodes on 'task_queue' as soon as
&lt;/span&gt;    &lt;span class="c1"&gt;# logically possible.
&lt;/span&gt;    &lt;span class="n"&gt;node&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;finalized_tasks_queue&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;topological_sorter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;done&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;node&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;So &lt;code&gt;graphlib&lt;/code&gt; is not a module just for sorting graphs, it's also a utility for running graphs of tasks in topological order, which is useful if your workloads have tasks depending on results of other tasks. Graphs are a great way to model this problem, and topological order is how you make sure tasks are processed in the right order.&lt;/p&gt;

&lt;p&gt;One thing that is missing from the docs is &lt;code&gt;asyncio&lt;/code&gt; example, which turns out to be quite easy to write. Since with &lt;code&gt;asyncio&lt;/code&gt; you don't have to deal with thread-safety, you can get by without using &lt;a href="https://docs.python.org/3/library/queue.html" rel="noopener noreferrer"&gt;queue&lt;/a&gt; for synchronizing threads or any other additional complexity of the sort.&lt;/p&gt;

&lt;p&gt;We'll define a simplistic async node visitor function:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;visit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;node&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;str&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sorter&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;TopologicalSorter&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;processing node &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;node&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;sorter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;done&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;node&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the real world this can be as complex as you'd like, as long as you're doing I/O bound work to reap the benefits of asyncio. The important bit is to call the &lt;code&gt;sorter.done(node)&lt;/code&gt; in the end of the function to let the instance of &lt;code&gt;TopologicalSorter&lt;/code&gt; know we're done with this node and we can progress onto the next.&lt;/p&gt;

&lt;p&gt;Then we plug the &lt;code&gt;visit&lt;/code&gt; function into our topologically ordered run:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;sorter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;TopologicalSorter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;graph&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;sorter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;prepare&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="n"&gt;sorter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;is_active&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="n"&gt;node_group&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sorter&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get_ready&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;node_group&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# no nodes are ready yet, so we sleep for a bit
&lt;/span&gt;        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;asyncio&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;0.25&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;tasks&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;node&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;node_group&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;task&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;asyncio&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;create_task&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;visit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;node&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sorter&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
            &lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_done_callback&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tasks&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;discard&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Full source code of a working script can be found in this &lt;a href="https://gist.github.com/romankolpak/9f7736d2a5c85a7b2c892d27ef1626ff" rel="noopener noreferrer"&gt;gist&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;One peculiar aspect of &lt;code&gt;graphlib&lt;/code&gt; is the format of the graph the &lt;code&gt;TopologicalSorter&lt;/code&gt; accepts as an argument -- it is in reverse order from your typical representation of a graph. E.g. if you have a graph like this &lt;code&gt;A -&amp;gt; B -&amp;gt; C&lt;/code&gt;, normally you'd to represent it like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;graph&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;A&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;B&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
  &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;B&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;C&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
  &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;C&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[],&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;but the &lt;code&gt;TopologicalSorter&lt;/code&gt; wants this graph in the with edge direction reversed:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;If the optional graph argument is provided it must be a dictionary representing a directed acyclic graph where the keys are nodes and the values are iterables of all predecessors of that node in the graph&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;So the right way to represent &lt;code&gt;A -&amp;gt; B -&amp;gt; C&lt;/code&gt; for the &lt;code&gt;TopologicalSorter&lt;/code&gt; is this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="n"&gt;graph&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;C&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;B&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
  &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;B&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;A&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
  &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;A&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[],&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;More info and a rather heated debate on this can be found here: &lt;a href="https://bugs.python.org/issue46071" rel="noopener noreferrer"&gt;https://bugs.python.org/issue46071&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Happy coding!&lt;/p&gt;

</description>
      <category>python</category>
      <category>asyncio</category>
      <category>dag</category>
    </item>
  </channel>
</rss>
