En el artículo anterior vimos una introducción al concepto de Channel. En este, vamos a profundizar un poco más en el concepto de process
- WARNING
-
Como ya dijimos un process es la unidad básica en el DSL de Nextflow y usamos los canales para conectarlos, por lo que parecería lógico empezar explicandolos primero. Sin embargo he optado por explicar primero los canales porque así ahora podemos hacer cosas más interesantes con ellos
El process es la parte del DSL mediante la que definimos una entrada, una salida y una ejecución de sistema. Es decir, Nextflow es agnóstico de lo que quieras procesar (sólo requiere que se pueda ejecutar en Linux). Tú compones el/los comando(s) a ejecutar y él se encarga de orquestar las ejecuciones
De esta forma tu pipeline puede ser tan simple como buscar una cadena en un fichero (básicamente ejecutar un grep) hasta leer un fichero de gigas y procesarlos de forma paralela, ejecutando un Python por cada línea, etc y todo ello con la ventaja de que el mismo pipeline puedes ejecutarlo en tu local, en un contenedor docker, usando AWS Batch, Google, etc
Contando lineas de un fichero
Vamos a empezar definiendo un proceso simple:
uno.nf
process example {
input:
path 'entrada'
output:
stdout
"wc -l $entrada"
}
workflow{
example( params.param1 ) | view
}
Ejecutamos el pipeline
nextflow run uno.nf --param1 $(pwd)/uno.nf
y veremos que la salida nos dice el número de líneas del fichero uno.nf
N E X T F L O W ~ version 22.04.5
Launching `uno.nf` [exotic_jones] DSL2 - revision: c79a922618
executor > local (1)
[28/a1eb27] process > example [100%] 1 of 1 ✔
12 input
Como vemos, el proceso example
requiere un fichero de entrada (dentro de su ejecución va a crear una variable entrada
que podremos referenciarla dentro de la misma), y que va a emitir una salida tomándola del stdout del comando que va a ejecutar. Más adelante veremos qué otras posibilidades podemos usar
Comando
El comando a ejecutar se puede definir de varias formas:
- La más simple es con un string al final de la definición del proceso
process example{
.. ... ...
'echo hola'
}
El comando que ejecutará example
será un simple echo hola
- Un string multilinea, usando triple comillas
process example{
.. .. ...
'''
echo hola > f
echo hi >> f
echo ciao >> f
cat f
'''
}
Así mismo el comando a ejecutar puede ser "personalizado" usando los inputs del process (atención al uso de comillas dobles en lugar de simples!!) como en el caso de uno.nf
:
"wc -l $entrada"
En este caso Nextflow ejecutará un wc
sobre un fichero cuya ruta no está predefinida, sino que se usará la que se le proporcione al process. Esto es así por el uso de comillas dobles que lo que hace es que la cadena sea "compilada" y se use el resultado
Uniendo procesos
Como puedes imaginar, podemos "concatenar" process uniendo las salidas de uno con las entradas de otro (en realidad usando canales)
dos.nf
process lines {
input:
path 'input'
output:
stdout
// wc es un programa de la shell
"wc -l $input"
}
process cut {
input:
val lines
output:
stdout
// cut es un programa de la shell
// indicamos que use el espacio como delimitador y extraiga el primer campo
// le pasamos una cadena con espacios como entrada, por eso la entrecomillamos
"cut -d ' ' -f 1 <<< '$lines'"
}
workflow{
lines( params.param1 )
| cut
| view
}
En este caso el workflow creará dos procesos, lines
y cut
. Este último permanecerá a la espera de recibir valores por su canal de entrada (espera recibir un simple objeto val
y lo llamará 'line' ). Por su parte lines
(el proceso, no confundir con la variable del proceso cut) permanecerá a la espera de recibir un fichero de entrada (al declarar un path
como entrada)
Mediante el caracter pipe "|" indicamos al workflow cómo unir estos procesos de tal forma que la salida de uno sea la entrada de otro
Una vez que se inicia el worflow, Nextflow alimenta el canal params
lo que hace que lines
se ejecute y su salida alimente a cut
Paralelización. Divide y vencerás
El ejemplo anterior muestra cómo definimos los procesos así como el "flujo de trabajo". Como lines emite un sólo valor (el stdout generado por wc) el worflow es lineal.
En el ejemplo siguiente vamos a ver la capacidad de definir ejecuciones de procesos en paralelo. El ejemplo va a consistir en generar un fichero de N líneas, y por cada línea ejecutaremos un process que "haga algo" con ella (simplemente imprimirla por su stdout). Para poder comprobar la paralelización cada ejecución realizará primero una espera aleatoria mediante un random
tres.nf
process generarFicheroEntrada {
input:
val size
output:
path 'origin.txt'
// el script puede ser multilinea
// y podemos "mezclar" variables del proceso con las del script a ejecutar
// si es variable del proceso no la escapamos con una barra: $size
// si es variable del script la escapamos con la barra: \$x
"""
for x in {0..$size}
do
echo \$x >> origin.txt
done
"""
}
process simulate{
input:
val value
output:
stdout
"""
sleep \$((RANDOM % 10))
echo $value
"""
}
workflow{
simulate(
generarFicheroEntrada(params.size).splitText()
) | view
}
Si por ejemplo ejecutas
nextflow run tres.nf --size=10
repetidas veces verás que la salida es diferente en cada ejecución. Esto es debido a que el proceso simulate
es ejecutado en paralelo (en realidad todos los valores en paralelo a la vez no, sino en bloques que puedes definir como parametro de splitText)
El workflow de este ejemplo es sencillo de leer pero por detrás se están ejecutando conceptos bastante importantes:
se crea un process
simulate
que permanecerá a la espera de que se vayan generando valores en su canal de entrada. Mientras no reciba un EOF en el canal, el proceso se ejecutará repetidamente por cada valor en el mismo.generarFicheroEntrada leerá de su canal de entrada un valor y enviará por el de salida un Path (no el contenido, sino un objeto Path que apunta al fichero).
splitText es un operador que actua sobre los valores emitidos por un canal. Lee sobre el canal de entrada (en este caso un Path) y emite valores, uno por cada linea
La definición del workflow es en realidad una Closure, un bloque de código, lo que permite definir el worflow de muchas formas. Por ejemplo el mismo workflow puede escribirse:
workflow{
def fichero = generarFicheroEntrada(params.size)
def split = fichero.splitText()
simulate( split ).view()
}
Conclusión
En este post hemos visto cómo usar los process para ejecutar comandos de sistema dentro de un workflow. Básicamente su función es generar y ejecutar un fichero .sh con el comando que definas, integrando dicha ejecución en el flujo de trabajo permitiendo unir y controlar todas las ejecuciones.
También hemos visto una pequeña introducción al concepto de workflow el cual trataremos más adelante
Top comments (0)