DEV Community

David Mezzetti for NeuML

Posted on • Updated on • Originally published at neuml.hashnode.dev

Workflow Scheduling

Workflows are a simple yet powerful construct that takes a callable and returns elements. They are streaming and work on data in batches, allowing large volumes of data to be processed efficiently. When working with streaming data, workflows continually run until the data stream is exhausted.

Workflows can also be scheduled to run. In this case, a static set of elements, dynamically expands. For example, an API service endpoint that returns items, or polling a directory with files coming in and out.

This article will show how to use workflow scheduling in txtai.

Install dependencies

Install txtai and all dependencies.

pip install txtai[workflow]
Enter fullscreen mode Exit fullscreen mode

Create workflow action

Workflows run a series of tasks to transform and process data. This section creates a callable object that can be used as a workflow action. The object iterates over a dataset, returning a batch of data.

from datasets import load_dataset

class Stream:
  def __init__(self):
    self.dataset = load_dataset("ag_news", split="train")
    self.index, self.size = 0, 2500

  def __call__(self, fields):
    outputs = []
    for field in fields:
      output = []
      for row in self.dataset.select(range(self.index, self.index+self.size)):
        output.append((self.index, row[field], None))
        self.index += 1

      outputs.append(output)

    return outputs
Enter fullscreen mode Exit fullscreen mode

Build workflow

Next we'll create the workflow. The workflow reads batches of data from a stream and loads it into an Embeddings index. We'll run this workflow four times on a scheduled interval to demonstrate a scheduled workflow.

from txtai.app import Application

# Run up to every 5 seconds 4 times
workflow = """
writable: true
embeddings:
  path: sentence-transformers/nli-mpnet-base-v2
  content: true

workflow:
  index:
    schedule:
      cron: '* * * * * 0/5'
      elements:
        - text
      iterations: 4
    tasks:
      - __main__.Stream
      - upsert
"""
app = API(workflow)
app.wait()
Enter fullscreen mode Exit fullscreen mode
2022-02-03 02:12:06,751 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5
2022-02-03 02:12:06,757 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:12:10+00:00
2022-02-03 02:12:34,937 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:12:35+00:00
2022-02-03 02:12:59,967 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:13:00+00:00
2022-02-03 02:13:23,349 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:13:25+00:00
2022-02-03 02:13:49,621 [INFO] schedule: 'index' max iterations (4) reached
Enter fullscreen mode Exit fullscreen mode

Reviewing the log above, we see the index job ran four times. Now let's query the index and see what was loaded.

Run an embeddings search

Let's run a search against the newly created index.

import json

# Show total number of records
print(f"Total records: {app.count()}")

# Run a search
print("Search:")
print(json.dumps(app.search("life on mars", limit=1), indent=2))
Enter fullscreen mode Exit fullscreen mode
Total records: 10000
Search:
[
  {
    "id": "119",
    "text": "Life on Mars Likely, Scientist Claims (SPACE.com) SPACE.com - DENVER, COLORADO -- Those twin robots hard at work on Mars have transmitted teasing views that reinforce the prospect that microbial life may exist on the red planet.",
    "score": 0.7236138582229614
  }
]
Enter fullscreen mode Exit fullscreen mode

The index has 10,000 records. We also see the top result for the query on life on mars.

Run a scheduled embeddings search

Now let's incrementally load the dataset with a scheduled workflow and run a scheduled search after each batch is loaded.

from txtai.app import Application

# Run every 5 seconds up to 4 times
workflow = """
writable: true
embeddings:
  path: sentence-transformers/nli-mpnet-base-v2
  content: true

workflow:
  index:
    schedule:
      cron: '* * * * * 0/5'
      elements:
        - text
      iterations: 4
    tasks:
      - __main__.Stream
      - upsert
  search:
    schedule:
      cron: '* * * * * 0/5'
      elements:
        - life on mars
      iterations: 4
    tasks:
      - action: search
        args: [3]
        task: console
"""

app = API(workflow)
app.wait()
Enter fullscreen mode Exit fullscreen mode
2022-02-03 02:13:55,789 [WARNING] _create_builder_config: Using custom data configuration default
2022-02-03 02:13:55,797 [WARNING] download_and_prepare: Reusing dataset ag_news (/root/.cache/huggingface/datasets/ag_news/default/0.0.0/bc2bcb40336ace1a0374767fc29bb0296cdaf8a6da7298436239c54d79180548)
2022-02-03 02:13:55,808 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5
2022-02-03 02:13:55,808 [INFO] schedule: 'search' scheduler started with schedule * * * * * 0/5
2022-02-03 02:13:55,810 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:14:00+00:00
2022-02-03 02:13:55,814 [INFO] schedule: 'search' next run scheduled for 2022-02-03T02:14:00+00:00
2022-02-03 02:14:00,001 [INFO] schedule: 'search' next run scheduled for 2022-02-03T02:14:05+00:00

Inputs: [
  "life on mars"
]
Outputs: [
  null
]

2022-02-03 02:14:24,500 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:14:25+00:00
2022-02-03 02:14:24,522 [INFO] schedule: 'search' next run scheduled for 2022-02-03T02:14:25+00:00

Inputs: [
  "life on mars"
]
Outputs: [
  {
    "id": "119",
    "text": "Life on Mars Likely, Scientist Claims (SPACE.com) SPACE.com - DENVER, COLORADO -- Those twin robots hard at work on Mars have transmitted teasing views that reinforce the prospect that microbial life may exist on the red planet.",
    "score": 0.7236138582229614
  },
  {
    "id": "271",
    "text": "Saturn's Moon Titan: Prebiotic Laboratory by Harry Bortman    In this second and final part of the interview, Lunine explains how Huygens may help scientists understand the origin of life on Earth, even if it doesn't detect life on Titan.    Astrobiology Magazine -- Titan is the only moon in our solar system with an atmosphere, and it is the organic chemistry that has been detected in that atmosphere that has sparked the imagination of planetary scientists like Lunine...",
    "score": 0.4750666916370392
  },
  {
    "id": "1132",
    "text": "Is Mercury the Incredible Shrinking Planet? MESSENGER Spacecraft May Find Out (SPACE.com) SPACE.com - With a new spacecraft bound for Mercury, that tiny planet nbsp;near the heart of the solar system, researchers are hoping to solve a slew of riddles about the small world.",
    "score": 0.47124743461608887
  }
]

2022-02-03 02:14:25,496 [INFO] schedule: 'search' next run scheduled for 2022-02-03T02:14:30+00:00

Inputs: [
  "life on mars"
]
Outputs: [
  {
    "id": "119",
    "text": "Life on Mars Likely, Scientist Claims (SPACE.com) SPACE.com - DENVER, COLORADO -- Those twin robots hard at work on Mars have transmitted teasing views that reinforce the prospect that microbial life may exist on the red planet.",
    "score": 0.7236138582229614
  },
  {
    "id": "271",
    "text": "Saturn's Moon Titan: Prebiotic Laboratory by Harry Bortman    In this second and final part of the interview, Lunine explains how Huygens may help scientists understand the origin of life on Earth, even if it doesn't detect life on Titan.    Astrobiology Magazine -- Titan is the only moon in our solar system with an atmosphere, and it is the organic chemistry that has been detected in that atmosphere that has sparked the imagination of planetary scientists like Lunine...",
    "score": 0.4750666916370392
  },
  {
    "id": "1132",
    "text": "Is Mercury the Incredible Shrinking Planet? MESSENGER Spacecraft May Find Out (SPACE.com) SPACE.com - With a new spacecraft bound for Mercury, that tiny planet nbsp;near the heart of the solar system, researchers are hoping to solve a slew of riddles about the small world.",
    "score": 0.47124743461608887
  }
]

2022-02-03 02:14:50,112 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:14:55+00:00
2022-02-03 02:14:50,138 [INFO] schedule: 'search' max iterations (4) reached

Inputs: [
  "life on mars"
]
Outputs: [
  {
    "id": "119",
    "text": "Life on Mars Likely, Scientist Claims (SPACE.com) SPACE.com - DENVER, COLORADO -- Those twin robots hard at work on Mars have transmitted teasing views that reinforce the prospect that microbial life may exist on the red planet.",
    "score": 0.7236138582229614
  },
  {
    "id": "3300",
    "text": "Mars Hills, Crater Yield Evidence of Flowing Water LOS ANGELES (Reuters) - The hills of Mars yielded more tantalizing clues about how water shaped the Red Planet in tests by NASA #39;s robotic geologist, Spirit, while its twin, Opportunity, observed the deep crater it climbed into two months ...",
    "score": 0.6666488647460938
  },
  {
    "id": "4201",
    "text": "Martian hill shows signs of ancient water LOS ANGELES - NASA #39;s Spirit rover has found more evidence of past water on the hills of Mars, while its twin, Opportunity, has observed a field of dunes inside a crater. ",
    "score": 0.6453495621681213
  }
]

2022-02-03 02:15:18,333 [INFO] schedule: 'index' next run scheduled for 2022-02-03T02:15:20+00:00
2022-02-03 02:15:44,592 [INFO] schedule: 'index' max iterations (4) reached
Enter fullscreen mode Exit fullscreen mode

The workflow above runs up to every 5 seconds. Note that since the index job takes longer than 5 seconds, the time difference between jobs is longer.

The index job loads the next batch of data and the search job runs a recurring search.

See how the search results change over time as more relevant results are found.

Wrapping up

This article covered how to use workflow scheduling with txtai. While there are existing ways to schedule jobs (system cron, serverless, and so on), this is another easy and quick way to do it.

Top comments (0)