DEV Community

David Mezzetti for NeuML

Posted on • Updated on • Originally published at neuml.hashnode.dev

Push notifications with workflows

Workflows are a simple yet powerful construct that takes a callable and returns elements. They are streaming and work on data in batches, allowing large volumes of data to be processed efficiently.

This article will show how workflows can be used to push notifications upon certain event triggers. Using this method, an activity feed of content can be created.

Install dependencies

Install txtai and all dependencies.

pip install txtai[pipeline,workflow]
Enter fullscreen mode Exit fullscreen mode

Create workflow notification action

Workflows run a series of tasks to transform and process data. This section creates a callable object that can be used as a workflow action.

The action below pushes events to Slack. While Slack is used here, any notification service can easily be substituted in (Zapier, IFTT etc).

It is assumed there is a Slack workspace and application installed and ready to use. See this comprehensive example for more information on setting up a new Slack app and posting messages via the API.

The channel id can be found from the Slack web interface. Log into Slack and click on the channel where messages will be posted. The channel id is the last part of the URL.

https://app.slack.com/client/<team id>/<channel id>
Enter fullscreen mode Exit fullscreen mode
import logging
import requests

# Uncomment and set. The following are dummy parameters. Your parameters should not be publicly shared!
# AUTH = "xoxb-not-a-real-token-this-will-not-work"
# CHANNEL = "C0XXXXXXXXX"
# URL = "https://slack.com/api/chat.postMessage"

# Logging configuration
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

class Slack:
    def __init__(self):
        self.alerts = set()

    def __call__(self, elements):
        for alert in elements:
            uid, text, _ = self.extract(alert)
            if uid not in self.alerts:
                logger.info("Sending alert: %s", alert)
                self.alerts.add(uid)

                headers = {
                    "Content-type": "application/json",
                    "Authorization": f"Bearer {AUTH}"
                }

                response = requests.post(URL, headers=headers, json={
                    "channel": CHANNEL,
                    "text": f"{text} {uid}"
                }).json()

                if not response["ok"]:
                    logger.error(response)

        return elements

    def extract(self, alert):
        if isinstance(alert, dict):
            return (alert["id"], alert["text"], None)

        return alert
Enter fullscreen mode Exit fullscreen mode

Build a semantic notification workflow

Next we'll create a notification workflow. The example below indexes the top trending Hacker News articles and pushes an alert when an article matches an embeddings query for software development library.

from txtai.app import Application

workflow = """
writable: true

embeddings:
  path: sentence-transformers/nli-mpnet-base-v2
  content: true

tabular:
  idcolumn: url
  textcolumns:
    - title

__main__.Slack:

workflow:
  index:
    schedule:
      cron: "* * * * * 0/5"
      elements:
        - front_page
      iterations: 1
    tasks:
      - batch: false
        extract:
          - hits
        method: get
        params:
          tags: null
        task: service
        url: https://hn.algolia.com/api/v1/search?hitsPerPage=50
      - action: tabular
      - action: upsert
  alert:
    schedule:
      cron: 0/1 * * * *
      elements:
        - select id, text, score from txtai where similar('software development library') and score >= 0.4 and id like 'http%'
      iterations: 1
    tasks:
      - action: search
      - action: __main__.Slack
"""

app = API(workflow)
app.wait()
Enter fullscreen mode Exit fullscreen mode
2022-02-10 15:12:42,838 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5
2022-02-10 15:12:42,839 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *
2022-02-10 15:12:42,843 [INFO] schedule: 'index' next run scheduled for 2022-02-10T15:12:45+00:00
2022-02-10 15:12:42,851 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T15:13:00+00:00
2022-02-10 15:12:45,884 [INFO] schedule: 'index' max iterations (1) reached
2022-02-10 15:13:00,042 [INFO] __call__: Sending alert: {'id': 'https://datastation.multiprocess.io/blog/2022-02-08-the-world-of-postgresql-wire-compatibility.html', 'text': 'The world of PostgreSQL wire compatibility', 'score': 0.40123000741004944}
2022-02-10 15:13:00,254 [INFO] schedule: 'alert' max iterations (1) reached
Enter fullscreen mode Exit fullscreen mode

The log above shows the indexing and alerting jobs each ran once. There was a single match and it was sent to Slack. The score threshold of 0.4 is relatively low, it can be raised if more strict matches are desired.

Build a notification workflow using SQL

The next example is similar but it instead runs a SQL search using another column.

This workflow indexes the top trending sports events as identified by neuspo. An alert is generated when an excitement factor of 40 or above is met (the field for excitement is called weight).

from txtai.app import Application

workflow = """
writable: true

embeddings:
  path: sentence-transformers/nli-mpnet-base-v2
  content: true

tabular:
  idcolumn: url
  textcolumns:
    - summary
  content: true

__main__.Slack:

workflow:
  index:
    schedule:
      cron: "* * * * * 0/5"
      elements:
        - 10
      iterations: 1
    tasks:
      - batch: false
        extract:
          - rows
        method: get
        params:
          size: null
        task: service
        url: https://neuspo.com/data/articles/list?category=Top&from=0
      - action: tabular
      - action: upsert
  alert:
    schedule:
      cron: 0/1 * * * *
      elements:
        - select 'https://neuspo.com' || id id, text from txtai where weight >= 40
      iterations: 1
    tasks:
      - action: search
      - action: __main__.Slack
"""

app = Application(workflow)
app.wait()
Enter fullscreen mode Exit fullscreen mode
2022-02-10 15:16:49,702 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5
2022-02-10 15:16:49,704 [INFO] schedule: 'index' next run scheduled for 2022-02-10T15:16:50+00:00
2022-02-10 15:16:49,704 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *
2022-02-10 15:16:49,714 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T15:17:00+00:00
2022-02-10 15:16:50,474 [INFO] schedule: 'index' max iterations (1) reached
2022-02-10 15:17:00,010 [INFO] __call__: Sending alert: {'id': 'https://neuspo.com/stream/be15c852925b53639b63feb7169a2842', 'text': 'Islanders 6, Canucks 3: Five-goal first period keys Islanders win in first game post-'}
2022-02-10 15:17:00,215 [INFO] schedule: 'alert' max iterations (1) reached
Enter fullscreen mode Exit fullscreen mode

And the notification in Slack for this job.

Search-Summarize-Notify

The next section builds on the Hacker News example. Instead of just sending a notification on a match, this workflow will summarize the match first.

There are a number of alternative combinations. For example, the summaries could be built at index time. But this example will do everything on the fly when searching.

from txtai.app import Application

workflow = """
writable: true

embeddings:
  path: sentence-transformers/nli-mpnet-base-v2
  content: true

summary:
  path: sshleifer/distilbart-cnn-12-6

tabular:
  idcolumn: url
  textcolumns:
    - title

textractor:
  join: true
  minlength: 100
  paragraphs: true

__main__.Slack:

workflow:
  index:
    schedule:
      cron: "* * * * * 0/5"
      elements:
        - front_page
      iterations: 1
    tasks:
      - batch: false
        extract:
          - hits
        method: get
        params:
          tags: null
        task: service
        url: https://hn.algolia.com/api/v1/search?hitsPerPage=50
      - action: tabular
      - action: upsert
  alert:
    schedule:
      cron: 0/1 * * * *
      elements:
        - select id url, id title from txtai where similar('software development library') and score >= 0.4 and id like 'http%'
      iterations: 1
    tasks:
      - action: search
      - action: tabular
      - action: textractor
      - action: summary
      - action: __main__.Slack
        unpack: false
"""

app = API(workflow)
app.wait()
Enter fullscreen mode Exit fullscreen mode
2022-02-10 17:19:48,847 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5
2022-02-10 17:19:48,857 [INFO] schedule: 'index' next run scheduled for 2022-02-10T17:19:50+00:00
2022-02-10 17:19:48,848 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *
2022-02-10 17:19:48,864 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T17:20:00+00:00
2022-02-10 17:19:50,368 [INFO] schedule: 'index' max iterations (1) reached
2022-02-10 17:20:02,233 [INFO] __call__: Sending alert: ('https://datastation.multiprocess.io/blog/2022-02-08-the-world-of-postgresql-wire-compatibility.html', 'Every server-client database has a wire protocol. A wire protocol is the format for interactions between a database server and its clients. It does NOT encompass the actual query language itself, let alone database semantics. Proprietary databases like Oracle and IBM Db2 find value in developing their own drivers.', None)
2022-02-10 17:20:02,496 [INFO] schedule: 'alert' max iterations (1) reached
Enter fullscreen mode Exit fullscreen mode

And the result in Slack. See how the text is now the article summary vs. the title.

Search-Summarize-Translate-Notify

One more example to really drive this home. Let's do the same as the last example and add a translate to French step.

from txtai.app import Application

workflow = """
writable: true

embeddings:
  path: sentence-transformers/nli-mpnet-base-v2
  content: true

summary:
  path: sshleifer/distilbart-cnn-12-6

tabular:
  idcolumn: url
  textcolumns:
    - title

textractor:
  join: true
  minlength: 100
  paragraphs: true

translation:

__main__.Slack:

workflow:
  index:
    schedule:
      cron: "* * * * * 0/5"
      elements:
        - front_page
      iterations: 1
    tasks:
      - batch: false
        extract:
          - hits
        method: get
        params:
          tags: null
        task: service
        url: https://hn.algolia.com/api/v1/search?hitsPerPage=50
      - action: tabular
      - action: upsert
  alert:
    schedule:
      cron: 0/1 * * * *
      elements:
        - select id url, id title from txtai where similar('software development library') and score >= 0.4 and id like 'http%'
      iterations: 1
    tasks:
      - action: search
      - action: tabular
      - action: textractor
      - action: summary
      - action: translation
        args:
          - fr
      - action: __main__.Slack
        unpack: false
"""

app = API(workflow)
app.wait()
Enter fullscreen mode Exit fullscreen mode
2022-02-10 17:25:04,448 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5
2022-02-10 17:25:04,449 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *
2022-02-10 17:25:04,451 [INFO] schedule: 'index' next run scheduled for 2022-02-10T17:25:05+00:00
2022-02-10 17:25:04,457 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T17:26:00+00:00
2022-02-10 17:25:05,357 [INFO] schedule: 'index' max iterations (1) reached
2022-02-10 17:26:08,125 [INFO] __call__: Sending alert: ('https://datastation.multiprocess.io/blog/2022-02-08-the-world-of-postgresql-wire-compatibility.html', "Chaque base de données serveur-client a un protocole filaire. Un protocole filaire est le format pour les interactions entre un serveur de base de données et ses clients. Il n'inclut PAS le langage de requête réel lui-même, et encore moins la sémantique de la base de données.", None)
2022-02-10 17:26:08,310 [INFO] schedule: 'alert' max iterations (1) reached
Enter fullscreen mode Exit fullscreen mode

And just like before, Slack has a summary and a link but this time in French!

Wrapping up

This article covered how to build workflow notifications with txtai. There are many directions one could go with this. Build an activity feed, alert when semantic events occur and more. More ideas can be found in the txtai application on Hugging Face Spaces.

Everything in this article can also be written in Python. The benefits of YAML workflows are that they require little to no-code. Work is ongoing as of txtai 4.1 to make workflows easier to containerize and ultimately run in serverless environments. Keep an eye on this!

Top comments (0)