So I have wanted to learn to create a pipeline on a windows platform, and I came across Luigi, a tool created by Spotify, which is an alternative to Apache Airflow. It's a package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.
The two things to know before coding with Luigi is the structure of the Tasks and Targets.
Target
Target is an abstract class that represents the center of the state of the workflow.
Typically, a Task defines one or more Targets as output, and the Task is considered complete if and only if each of its output Targets exists.
Although (probably) improperly, I consider the targets as a sort of pointer, telling the program path info.
Task
Tasks are the basic building blocks of Luigi:
Within a Luigi Task class three are the most important methods:
• requires()
• run()
• output()
requires()
The method requires() is the first method that if present is executed.
This method makes a call to another Luigi Task. Consequentially, the program moves to the new tasks, once again, it looks for a requires() method and repeat the process until it cannot find one. At that moment, it runs the method run().
run()
Once the pipeline reaches the Task with no requires() it executes the method run().
output()
The output() method returns one or more Target objects.
Similarly to requires, you can return them wrapped up in any way that’s convenient for you.
It is recommended that any Task only return one single Target in output.
Now let's move on to the coding section. So like always, let's begin with a Hello World program.
You can download luigi using pip.
!pip install luigi
We'll create a program that will display 'Hello', then 'World' and then print out 'Hello World' together.
For our first Task, let's write 'Hello' into a file.
import luigi
class HelloTask(luigi.Task) :
def run(self):
with open('hello.txt','w') as hello_file :
hello_file.write('Hello')
hello_file.close()
def output(self) :
return luigi.LocalTarget('hello.txt')
if name =='main' :
luigi.run()
All we'll do in our run function is to open a file called hello.txt, write 'Hello' into the file and close it. In the output function, LocalTarget provides a concrete implementation of a Target class that uses files on the local file system.
Let's run the program. To run the program we'll need to define a scheduler and the task to run. We'll use a LocalScheduler in this program. We can do that by typing.
Hello.py HelloTask --local-scheduler
This is the output we get when we successfully run the code. Note that we can use the output function to also print out whatever we want. If you check the file in your output function then you'll notice that Hello has been written onto the file.
Now that we've successfully run the HelloTask, let's move on to the WorldTask. It's the same Task with the 'Hello' replaced with 'World'.
import luigi
class WorldTask(luigi.Task) :
def run(self):
with open('world.txt','w') as world_file :
world_file.write('World')
world_file.close()
def output(self) :
return luigi.LocalTarget('world.txt')
if name =='main' :
luigi.run()
Let's run the program.
World.py WorldTask --local-scheduler
So now we've created the dependencies for the HelloWorldTask, let's move on to define the Task. We'll define the requires function, which we didn't do in the previous two tasks. We're basically telling the HelloWorldTask to check if the two tasks stated there are completed or not if they are then only the HelloWorldTask will execute its run function.
import luigi
import os
import time
class HelloTask(luigi.Task) :
def run(self):
with open('hello.txt','w') as hello_file :
hello_file.write('Hello')
hello_file.close()
def output(self) :
return luigi.LocalTarget('hello.txt')
class WorldTask(luigi.Task) :
def run(self):
with open('world.txt','w') as world_file :
world_file.write('World')
world_file.close()
def output(self) :
return luigi.LocalTarget('world.txt')
class HelloWorldTask(luigi.Task) :
def requires(self) :
return [HelloTask(),WorldTask()]
def run(self):
with open('hello.txt','r') as hello_file :
hello = hello_file.read()
with open('world.txt','r') as world_file :
world = world_file.read()
with open('helloworld.txt','w') as helloworld_file :
helloworld_file.write(hello)
helloworld_file.write(world)
helloworld_file.close()
def output(self) :
return luigi.LocalTarget('helloworld.txt')
if name=='main' :
luigi.run()
Now we've created our program, let's execute it.
HelloWorld.py HelloWorldTask --local-scheduler
Keep in mind that the tasks, show in the output are in alphabetical order and not in execution order. The order of execution is defined by us and Luigi will strictly execute the Tasks in that manner.
The best part about Luigi is that once it encounters a task that has been completed, it doesn't execute it again. This can be really useful when creating scheduled pipelines.
This is how you create a pipelined execution using Luigi, I hope you've understood it. In the next post, I'll talk about sending IDs and paths to our Tasks, which will give a bit more flexibility to our programs.
Top comments (0)