<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Fylype Wase</title>
    <description>The latest articles on DEV Community by Fylype Wase (@fylype_wase_f8964e4b3fe8f).</description>
    <link>https://dev.to/fylype_wase_f8964e4b3fe8f</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F2955712%2F0bd0fb8e-c236-433a-b403-369922b8757b.png</url>
      <title>DEV Community: Fylype Wase</title>
      <link>https://dev.to/fylype_wase_f8964e4b3fe8f</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/fylype_wase_f8964e4b3fe8f"/>
    <language>en</language>
    <item>
      <title>Dynamic Task Mapping (Airflow)</title>
      <dc:creator>Fylype Wase</dc:creator>
      <pubDate>Mon, 31 Mar 2025 14:43:11 +0000</pubDate>
      <link>https://dev.to/fylype_wase_f8964e4b3fe8f/dynamic-task-mapping-airflow-d63</link>
      <guid>https://dev.to/fylype_wase_f8964e4b3fe8f/dynamic-task-mapping-airflow-d63</guid>
      <description>&lt;h2&gt;
  
  
  1. Contexto
&lt;/h2&gt;

&lt;p&gt;Durante o desenvolvimento de uma DAG no Airflow surgiu uma necessidade de criar múltiplas tasks de forma dinâmica, as tasks deveriam ser criadas em tempo de execução e não se sabe previamente a quantidade de tasks.&lt;/p&gt;

&lt;p&gt;Um caso prático seria ter um conjunto com &lt;strong&gt;n&lt;/strong&gt; números e esses números devem passar por um processamento. Supondo que esse conjunto de valores tem 3 elementos, portanto seria a necessária a declaração de 3 tasks. Para valores pré-definidos a declaração dessas tasks são simples.&lt;/p&gt;

&lt;p&gt;Abaixo um exemplo de código que processa números multiplicando-os por 2:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# imports

def get_values():
    return [10, 20, 30]

def double_value(value):
    return int(value) * 2

# DAG code
get_values_task = PythonOperator(
    task_id=f"get_values",
    python_callable=get_values
)

double_value_1_task = PythonOperator(
    task_id=f"double_value_1",
    python_callable=double_value,
    op_args=["{{ task_instance.xcom_pull('get_values')[0] }}"]
)

double_value_2_task = PythonOperator(
    task_id=f"double_value_2",
    python_callable=double_value,
    op_args=["{{ task_instance.xcom_pull('get_values')[1] }}"]
)
double_value_3_task = PythonOperator(
    task_id=f"double_value_3",
    python_callable=double_value,
    op_args=["{{ task_instance.xcom_pull('get_values')[2] }}"]
)

get_values_task &amp;gt;&amp;gt; [double_value_1_task, double_value_2_task, double_value_3_task]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Entretanto há 2 limitações:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;A criação de tasks no Airflow é declarativa. Necessariamente se precisa declarar um Operator e definir a ordem de execução dessa task. Se surgir a necessidade de criar tasks que tem a mesma finalidade com valores dos parâmetros diferentes é necessário declarar um Operator individualmente.&lt;br&gt;
É possível a criação das tasks com um loop, entretanto é necessário saber previamente quantas tasks é necessário criar.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Resgatar os valores via XCOM não são acessáveis fora de uma Task Instance. Esse fato acontece pois os valores via XCOM são serealizados/deserealizados e renderizados em tempo de execução ao inicializar a execução da task, a renderização faz parte da execução da task. &lt;br&gt;
Reutilizando o exemplo acima da função double_value, se os valores necessários para esse processamento estiver na XCOM então só seria possível acessá-los durante a execução de outra task. Criar uma task dentro de uma outra task não é recomendável.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Então como posso criar tasks dinamicamente baseado nos valores de tasks anteriores? No Airflow há um recurso para resolver esse problema: &lt;strong&gt;Dynamic Task Mapping&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  2. O que é Dynamic Task Mapping?
&lt;/h2&gt;

&lt;p&gt;O Dynamic Task Mapping permite criar um conjuntos de tasks em tempo de execução baseado em parâmetros, sem o autor da DAG saber quantas tasks são necessárias previamente. É similar a definir as tasks num loop, entretanto o scheduler do Airflow usa como base o output da task anterior.&lt;/p&gt;

&lt;p&gt;Com esse recurso é possível paralelizar a execução da task, onde a task declarada terá uma lista de subtasks, cada substasks será criada dinamicamente baseado no parâmetro passado.&lt;/p&gt;

&lt;p&gt;Esse recurso no Airflow tornou-se disponível a partir da versão 2.3.0.&lt;/p&gt;

&lt;h2&gt;
  
  
  3. Um exemplo prático
&lt;/h2&gt;

&lt;p&gt;A ideia do Dynamic Task Mapping é definir os parâmetros onde um desses parâmetros será utilizado para a criação das tasks dinâmicas. A sintaxe de declaração do Operator é semelhante a um Operator convencional, haverá a adição de 2 métodos: partial e expand.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;expand&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Como o próprio nome diz significa expandir o parâmetro utilizado. Dado a lista do parâmetro definido então sera expandido essa lista. Para cada expansão é criado uma nova task.&lt;/p&gt;

&lt;p&gt;É possível definir 1 ou mais parâmetros para expandir. Caso haja mais de 1 parâmetro então será feito o produto cartesiano dos parâmetros.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;PythonOperator.partial(...).expand(parameter_1, parameter_2,...)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;partial&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Valores intermediários onde serão fixos para cada expansão dos parâmetros do expand, esses valores não são expandidos.&lt;/p&gt;

&lt;p&gt;No partial terá os parâmetros da task como task_id, python_callable, parâmetros definidos e etc.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;PythonOperator.partial(task_id="task_id", python_callable=...).expand(...)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Abrindo um parênteses, há uma forma de pegar o output de uma task da seguinte maneira:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Task arbitraty
arbitraty_task = PythonOperator(...)
arbitraty_task.output
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Juntando essas informações e definindo a Dynamic Task Mapping para o primeiro exemplo dado de processa os números multiplicando-os por 2:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;double_value_task = PythonOperator.partial(
    task_id=f"double_value",
    python_callable=double_value,
).expand(op_args=get_values_task.output)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Na prática não se torna necessário declarar explicitamente a task para cada valor retornado da task anterior.&lt;/p&gt;

&lt;p&gt;Como se interpreta o PythonOperator criado com Dynamic Task Mapping?&lt;/p&gt;

&lt;p&gt;Para cada valor retornado da task get_values será invocado a função double_value.&lt;/p&gt;

&lt;p&gt;Como o Airflow mostra essa informação na UI?&lt;/p&gt;

&lt;p&gt;O grafo mostrado será diferente, onde terá apenas 1 PythonOperator declarado.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffohlmafte0tpx3orklnw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffohlmafte0tpx3orklnw.png" alt="Image description" width="" height=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Para acessar a informação de cada task criada dinamicamente será necessário clicar na task e acessar o campo Mapped Tasks. O Map Index é criado de 0 até n, onde n é n-ésimo índice que indica a quantidade de elementos retornados da task anterior.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F459og0bi3s6sg7ipknwf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F459og0bi3s6sg7ipknwf.png" alt="Image description" width="" height=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Por fim a DAG descrita no primeiro exemplo, que processa números multiplicando-os por 2, pode ser reduzida para:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# imports

def get_values():
    return [[10], [20], [30]]

def double_value(value):
    return int(value) * 2

# DAG code
get_values_task = PythonOperator(
    task_id=f"get_values",
    python_callable=get_values
)

double_value_task = PythonOperator.partial(
    task_id=f"double_value",
    python_callable=double_value,
).expand(op_args=get_values_task.output)

get_values_task &amp;gt;&amp;gt; double_value_task
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Em resumo o Dynamic Task Mapping pode ser utilizado para criar tasks de forma dinâmica e paralelizar processamentos. Abaixo é possível acessar a documentação para entender com mais detalhes.&lt;/p&gt;

&lt;h2&gt;
  
  
  4. Referências
&lt;/h2&gt;

&lt;p&gt;[1] Airflow. Dynamic Task Mapping. Disponível em: &lt;a href="https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html" rel="noopener noreferrer"&gt;https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html&lt;/a&gt;. Acessado em: 09/12/2024.&lt;/p&gt;

</description>
      <category>python</category>
      <category>airflow</category>
    </item>
  </channel>
</rss>
