DEV Community

Cover image for Building scalable ML workflows
Arik
Arik Subscriber

Posted on

16 1

Building scalable ML workflows

A little while back, I wrote a post introducing Tork, an open-source project I've been developing. In a nutshell, Tork is a general-purpose, distributed workflow engine suitable for various workloads. At my work, we primarily use it for CPU/GPU-heavy tasks such as processing digital assets (3D, videos, images etc.), as well as our CI/CD tool for our internal PaaS.

Recently, I've been thinking about how Tork could potentially be leveraged to run machine learning type workloads. I was particularly inspired by the Ollama project and wanted to see if I can do something similar, but using plain old Docker images rather than Ollama's Modelfile.

Given that ML workloads often consist of distinct, interdependent stages—such as data preprocessing, feature extraction, model training, and inference—it’s crucial to have an engine that can orchestrate these steps. These stages frequently require different types of compute resources (e.g., CPUs for preprocessing, GPUs for training) and can benefit greatly from parallelization.

Moreover, resiliency is a critical requirement when running machine learning workflows. Interruptions, whether due to hardware failures, network issues, or resource constraints, can result in significant setbacks, especially for long-running processes like model training.

These requirements are very similar to my other non-ML workloads, so I decided to put my all my theories to the test and see what it would take to execute a simple ML workflow on Tork.

The experiment

For this first experiment, let's try to execute a simple sentiment analysis inference task:

Download the latest Tork binary and untar it.

tar xvzf tork_0.1.109_darwin_arm64.tgz
Enter fullscreen mode Exit fullscreen mode

Start Tork in standalone mode:

./tork run standalone
Enter fullscreen mode Exit fullscreen mode

If all goes well, you should something like this:

...
10:36PM INF Coordinator listening on http://localhost:8000
...

Enter fullscreen mode Exit fullscreen mode

Next, we need to build a docker image that contains the model and the necessary inference script. Tork tasks typically execute within a Docker container.

inference.py

from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import os

MODEL_NAME = os.getenv("MODEL_NAME")

def load_model_and_tokenizer(model_name):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name)
    return tokenizer, model

def predict_sentiment(text, tokenizer, model):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)

    with torch.no_grad():
        outputs = model(**inputs)

    predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
    predicted_label = torch.argmax(predictions, dim=1).item()
    confidence = predictions[0][predicted_label].item()

    return predicted_label, confidence

if __name__ == "__main__":
    tokenizer, model = load_model_and_tokenizer(MODEL_NAME)
    text = os.getenv("INPUT_TEXT")
    label, confidence = predict_sentiment(text, tokenizer, model)
    sentiment_map = {0: "Negative", 1: "Positive"}
    sentiment = sentiment_map[label]
    print(f"{sentiment}")

Enter fullscreen mode Exit fullscreen mode

Dockerfile

FROM huggingface/transformers-pytorch-cpu:latest

WORKDIR /app

COPY inference.py .

# Pre-load the model during image build
RUN python3 -c "from transformers import AutoTokenizer, AutoModelForSequenceClassification; AutoTokenizer.from_pretrained('distilbert-base-uncased-finetuned-sst-2-english'); AutoModelForSequenceClassification.from_pretrained('distilbert-base-uncased-finetuned-sst-2-english')"
Enter fullscreen mode Exit fullscreen mode
docker build -t sentiment-analysis .
Enter fullscreen mode Exit fullscreen mode

Next, let's create the Tork job to run the inference.

sentiment.yaml

name: Sentiment analysis example
inputs:
  input_text: Today is a lovely day
  model_name: distilbert-base-uncased-finetuned-sst-2-english
output: "{{trim(tasks.sentimentResult)}}"
tasks:
  - name: Run sentiment analysis
    var: sentimentResult
    # the image we created in the previous step, 
    # but can be any image available from Docker hub
    # or any other image registry
    image: sentiment-analysis:latest
    run: |
      python3 inference.py > $TORK_OUTPUT
    env:
      INPUT_TEXT: "{{inputs.input_text}}"
      MODEL_NAME: "{{inputs.model_name}}"
Enter fullscreen mode Exit fullscreen mode

Submit the job. Tork jobs execute asynchronously. Once a job is submitted you get back a job ID to track its progress:

JOB_ID=$(curl -s \
  -X POST \
  -H "content-type:text/yaml" \
  --data-binary @sentiment.yaml \
  http://localhost:8000/jobs | jq -r .id)
Enter fullscreen mode Exit fullscreen mode

Poll the job's status and wait for it to complete:

while true; do 
  state=$(curl -s http://localhost:8000/jobs/$JOB_ID | jq -r .state)
  echo "Status: $state"
  if [ "$state" = "COMPLETED" ]; then; 
     break 
  fi 
  sleep 1
done
Enter fullscreen mode Exit fullscreen mode

Inspect the job results:

curl -s http://localhost:8000/jobs/$JOB_ID | jq -r .result
Enter fullscreen mode Exit fullscreen mode
Positive
Enter fullscreen mode Exit fullscreen mode

Try changing the input_text in sentiment.yaml and re-submit the job for different results.

Next steps

Now that I got this basic proof of concept working on my machine I need to push that Docker image to a Docker registry so it is available for any Tork workers on my production cluster. But this seems like a viable approach.

The code for this article can be found on Github.

If you're interested in learning more about Tork:

Documentation: https://www.tork.run
Backend: https://github.com/runabol/tork
Web UI: https://github.com/runabol/tork-web

Image of Timescale

Timescale – the developer's data platform for modern apps, built on PostgreSQL

Timescale Cloud is PostgreSQL optimized for speed, scale, and performance. Over 3 million IoT, AI, crypto, and dev tool apps are powered by Timescale. Try it free today! No credit card required.

Try free

Top comments (9)

Collapse
 
htaidirt profile image
Hassen

I love how simple Tork is. I just did my first couple of tests locally, and they have worked great so far.

Would you recommend using Tork in production? If not, what is missing to make it perform well online?

Collapse
 
acoh3n profile image
Arik

Thanks! FWIW, we've been using it in production at the company I work at, for over a year now on a rather large scale (hundreds of nodes during peak times), and we've had very few issues overall.

Collapse
 
htaidirt profile image
Hassen

Great news. Do you mind sharing how you setup the production deployment of Tork? I can't find details in the doc :)

I had in mind packaging Tork within a Docker image, but wondering if the Tork container can itself launch other docker containers.

Thread Thread
 
acoh3n profile image
Arik

For sure, we run on raw EC2 machines on AWS for maximum control over the nodes. We have two Coordinator nodes and several types of worker pools for different types of workloads. We have Docker installed on the worker nodes but Tork itself runs outside of Docker as a Linux service. This allows the Tork process to run more privileged while keeping the task containers sandboxed. We use RDS for the DB and Amazon MQ for RabbitMQ. We do have one extension to Tork that we use to monitor the RabbitMQ queues and spin up and down workers based on how many items are in the queue. That piece is not open source unfortunately but shouldn't be hard to replicate. Even outside of Tork.

Thread Thread
 
acoh3n profile image
Arik

This might be helpful as well: tork.run/installation#running-in-a...

Collapse
 
urbanisierung profile image
Adam

Loving this!

Collapse
 
ndressler profile image
Nicole C. Dressler

This was a great read! The focus on scalability and Docker for ML workflows is super practical. It’s cool to see how Tork handles parallelism. This approach can be a game-changer for those needing to manage complex, interdependent tasks!

One thought: How can this method scale efficiently in cloud environments with varying resource availability, especially for large-scale models that demand significant GPU resources?

Collapse
 
acoh3n profile image
Arik

Thanks! Really appreciate the kind words and for taking the time to read.

Excellent question! This is actually what Tork is built for. You can even simulate it on your local machine.

First start RabbitMQ who will act as a broker between the various Tork nodes.

docker run -it --rm -p 5672:5672 rabbitmq:3-management
Enter fullscreen mode Exit fullscreen mode

From another terminal, let's start Tork in a coordinator mode. The Coordinator is the piece that coordinates the execution of tasks across all the Tork workers.

TORK_BROKER_TYPE=rabbitmq ./tork run coordinator
Enter fullscreen mode Exit fullscreen mode

From another terminal, let's start Tork in a worker mode. Workers are responsible for the actual execution of tasks.

TORK_BROKER_TYPE=rabbitmq TORK_WORKER_QUEUES_CPU=1 ./tork run worker
Enter fullscreen mode Exit fullscreen mode

Notice the TORK_WORKER_QUEUES_CPU configuration. That tells the worker to listen on a queue called cpu and only accept up-to one task at any given time. We'll get to that in a second.

From another terminal, let's start another Tork worker mode.

TORK_BROKER_TYPE=rabbitmq TORK_WORKER_QUEUES_GPU=1 ./tork run worker
Enter fullscreen mode Exit fullscreen mode

OK, from (last one I promoise) another terminal let's submit a dummy job:

curl -X POST -H "content-type:text/yaml" http://localhost:8000/jobs -d '
name: my job
tasks:
  - name: fake cpu task
    image: alpine:latest
    # route this task to our cpu worker
    queue: cpu
    run: sleep 1

  - name: fake gpu task
    image: alpine:latest
    # route this task to our gpu worker
    queue: gpu 
    run: sleep 1
' | jq .
Enter fullscreen mode Exit fullscreen mode

The Coordinator will ensure the tasks marked with queue: cpu will be routed to the worker we configured as a CPU workers and the same goes for the GPU worker.

On the cloud, these workers and coordinators instances typically all run on different machines all connected to the same broker.

The more workers you add, the more work your cluster can handle in parallel. Queue names are arbitrary, so you can have as many worker types as you like.

Hope that make sense. Let me know if you have any further questions.

Collapse
 
ndressler profile image
Nicole C. Dressler

Thanks for the detailed explanation Arik! This setup with RabbitMQ for job orchestration sounds like a very nice way of managing resource allocation across different types of workers. It’s great to know that the system scales efficiently with more workers. 😊

Heroku

Build apps, not infrastructure.

Dealing with servers, hardware, and infrastructure can take up your valuable time. Discover the benefits of Heroku, the PaaS of choice for developers since 2007.

Visit Site

👋 Kindness is contagious

Engage with a sea of insights in this enlightening article, highly esteemed within the encouraging DEV Community. Programmers of every skill level are invited to participate and enrich our shared knowledge.

A simple "thank you" can uplift someone's spirits. Express your appreciation in the comments section!

On DEV, sharing knowledge smooths our journey and strengthens our community bonds. Found this useful? A brief thank you to the author can mean a lot.

Okay