In my previous article, you understood the salient features of Flyte, which could help you decide if Flyte is the right orchestration platform for you.
In this article, you will understand how tasks and workflows in Flyte can be used to implement 'k' nearest neighbours in Python.
The building blocks of Flyte are:
- Tasks: It is a versioned, and shareable unit of execution that encapsulates your code.
- Workflows: It is a directed acyclic graph (DAG) of units of work encapsulated by nodes to describe the order of execution of tasks.
In this post, you will understand how to implement k nearest neighbours using tasks and workflows.
Let's dive into the implementation details!
1. Import the required packages.
from typing import List, NamedTuple
import pandas as pd
from flytekit import task, workflow
from sklearn.datasets import load_wine
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
2. Define a NamedTuple that provides a name for the output (useful while displaying the output on the console).
split_data = NamedTuple(
"split_data",
train_features=pd.DataFrame,
test_features=pd.DataFrame,
train_labels=pd.DataFrame,
test_labels=pd.DataFrame,
)
3. Define a task that loads the wine dataset into your environment and splits it into train and test data. Notice the '@task' decorator specified at the beginning of the method.
@task
def data_processing() -> split_data:
# load wine dataset
wine = load_wine()
# convert features and target (numpy arrays) into Modin DataFrames
wine_features = pd.DataFrame(data=wine.data, columns=wine.feature_names)
wine_target = pd.DataFrame(data=wine.target, columns=["species"])
# split the dataset
X_train, X_test, y_train, y_test = train_test_split(
wine_features, wine_target, test_size=0.4, random_state=101
)
print("Sample data:")
print(X_train.head(5))
return split_data(
train_features=X_train,
test_features=X_test,
train_labels=y_train,
test_labels=y_test,
)
4. Define another task that creates a K-nearest neighbour model and fits the model to the data. The predict function is used to predict values from the test data and store them in a list.
@task
def fit_and_predict(
X_train: pd.DataFrame,
X_test: pd.DataFrame,
y_train: pd.DataFrame,
) -> List[int]:
lr = KNeighborsClassifier() # create a KNeighborsClassifier model
lr.fit(X_train, y_train) # fit the model to the data
predicted_vals = lr.predict(X_test) # predict values for test data
return predicted_vals.tolist()
5. Define another task that determines the accuracy of the model based on the actual values and predicted values using the "accuracy_score" method.
@task
def calc_accuracy(y_test: pd.DataFrame, predicted_vals_list: List[int]) -> float:
return accuracy_score(y_test, predicted_vals_list)
6. Define a workflow (annotated with the @workflow decorator). This workflow lists the tasks in the order in which they are to be executed.
@workflow
def pipeline() -> float:
split_data_vals = data_processing()
predicted_vals_output = fit_and_predict(
X_train=split_data_vals.train_features,
X_test=split_data_vals.test_features,
y_train=split_data_vals.train_labels,
)
return calc_accuracy(
y_test=split_data_vals.test_labels, predicted_vals_list=predicted_vals_output
)
7. The pipeline is invoked, which displays the accuracy of the models.
if __name__ == "__main__":
print(f"Accuracy of the model is {pipeline()}%")
Full code can be found here.
Top comments (0)