DEV Community

Smriti S
Smriti S

Posted on

'Hello World' in Flyte

In my previous article, you understood the salient features of Flyte, which could help you decide if Flyte is the right orchestration platform for you.

In this article, you will understand how tasks and workflows in Flyte can be used to implement 'k' nearest neighbours in Python.

The building blocks of Flyte are:

  • Tasks: It is a versioned, and shareable unit of execution that encapsulates your code.
  • Workflows: It is a directed acyclic graph (DAG) of units of work encapsulated by nodes to describe the order of execution of tasks.

In this post, you will understand how to implement k nearest neighbours using tasks and workflows.

Let's dive into the implementation details!

1. Import the required packages.

from typing import List, NamedTuple

import pandas as pd
from flytekit import task, workflow
from sklearn.datasets import load_wine
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
Enter fullscreen mode Exit fullscreen mode

2. Define a NamedTuple that provides a name for the output (useful while displaying the output on the console).

split_data = NamedTuple(
    "split_data",
    train_features=pd.DataFrame,
    test_features=pd.DataFrame,
    train_labels=pd.DataFrame,
    test_labels=pd.DataFrame,
)
Enter fullscreen mode Exit fullscreen mode

3. Define a task that loads the wine dataset into your environment and splits it into train and test data. Notice the '@task' decorator specified at the beginning of the method.

@task
def data_processing() -> split_data:
    # load wine dataset
    wine = load_wine()

    # convert features and target (numpy arrays) into Modin DataFrames
    wine_features = pd.DataFrame(data=wine.data, columns=wine.feature_names)
    wine_target = pd.DataFrame(data=wine.target, columns=["species"])

    # split the dataset
    X_train, X_test, y_train, y_test = train_test_split(
        wine_features, wine_target, test_size=0.4, random_state=101
    )
    print("Sample data:")
    print(X_train.head(5))
    return split_data(
        train_features=X_train,
        test_features=X_test,
        train_labels=y_train,
        test_labels=y_test,
    )
Enter fullscreen mode Exit fullscreen mode

4. Define another task that creates a K-nearest neighbour model and fits the model to the data. The predict function is used to predict values from the test data and store them in a list.

@task
def fit_and_predict(
    X_train: pd.DataFrame,
    X_test: pd.DataFrame,
    y_train: pd.DataFrame,
) -> List[int]:
    lr = KNeighborsClassifier()  # create a KNeighborsClassifier model
    lr.fit(X_train, y_train)  # fit the model to the data
    predicted_vals = lr.predict(X_test)  # predict values for test data
    return predicted_vals.tolist()
Enter fullscreen mode Exit fullscreen mode

5. Define another task that determines the accuracy of the model based on the actual values and predicted values using the "accuracy_score" method.

@task
def calc_accuracy(y_test: pd.DataFrame, predicted_vals_list: List[int]) -> float:
    return accuracy_score(y_test, predicted_vals_list)
Enter fullscreen mode Exit fullscreen mode

6. Define a workflow (annotated with the @workflow decorator). This workflow lists the tasks in the order in which they are to be executed.

@workflow
def pipeline() -> float:
    split_data_vals = data_processing()
    predicted_vals_output = fit_and_predict(
        X_train=split_data_vals.train_features,
        X_test=split_data_vals.test_features,
        y_train=split_data_vals.train_labels,
    )
    return calc_accuracy(
        y_test=split_data_vals.test_labels, predicted_vals_list=predicted_vals_output
    )
Enter fullscreen mode Exit fullscreen mode

7. The pipeline is invoked, which displays the accuracy of the models.

if __name__ == "__main__":
    print(f"Accuracy of the model is {pipeline()}%")
Enter fullscreen mode Exit fullscreen mode

Full code can be found here.

Top comments (0)