Real-Time Slack Bots Powered By Generative AI and Data Flows
Utilizing WatsonX.AI LLM Foundation Models with Cloudera DataFlow via Apache NiFi sending, receiving and processing Slack messages.
Real-time Integration of WatsonX.AI Foundation Models with NiFi
Hi, I am Timothy Spann, Principal Developer Advocate for Streaming at Cloudera.
In this article I will show you how to use Cloudera DataFlow powered by Apache NiFi to interact with IBM WatsonX.AI foundation large language models in real-time. We can work with any of the foundation models such as Google FLAN T5 XXL or IBM Granite models.
I'll show you how easy it is to build a real-time data pipeline feeding your like Slack and mobile applications questions directly to secure WatsonX.AI models running in IBM Cloud. We will handle all the security, management, lineage and governance with Cloudera Data Flow. As part of decision making we can choose different WatsonX.AI models on the fly based on what type of prompt it is. For example, if we want to continue a sentence versus answering a question I can pick different models. For questions answering Google FLAN T5 XXL works well. If I want to continue sentences I would use one of the IBM Granite models.
You will notice how amazingly fast the WatsonX.AI models return the results we need. I do some quick enrichment and transformation and then send them out their way to Cloudera Apache Kafka to be used for continuous analytics and distribution to many other applications, systems, platforms and downstream consumers. We will also output our answers to the original requester which could be someone in a Slack channel or someone in an application. All of this happens real-time, with no code, full governance, lineage, data management and security at any scale and on any platform.
The power of IBM and Cloudera together in private, public and hybrid cloud environments for real-time data and AI is just getting started. Try it today.
2023Step by Step Real-Time Flow
First, in Slack I type a question,
"Q: What is a good way to integrate Generative AI and Apache NiFi?"
NiFi Flow TopOnce that question is typed, the Slack server sends these events to our registered service. This can be hosted anywhere publicly facing.
https://api.slack.com/apps/myapp/event-subscriptions
Slack APIOnce enabled your server will start received JSON events for each Slack post. This is easy to receive and parse in NiFi. Cloudera DataFlow enables receiving secure HTTPS REST calls in the public cloud hosted edition with ease, even in Designer mode.
NiFi Top Flow 2In the first part of the flow we received the REST JSON Post, which is as follows.
Slackbot 1.0 (+https://api.slack.com/robots)
application/json
POST
HTTP/1.1
This is a very rich detailed JSON file that we could push immediately raw to an Apache Iceberg Open Cloud Lakehouse, a Kafka topic or an object store as a JSON document. (Enhancement Option) I am just going to parse what I need.
EvaluateJSONPathWe parse out the channel ID and plain text of the post. I only want messages from general ("C1SD6N197"). Then I copy the texts to an inputs field as is required for hugging face.
We check our input if it's stocks or weather (more to come) we avoid calling the LLM.
SELECT * FROM FLOWFILE
WHERE upper(inputs) like '%WEATHER%'
AND not upper(inputs) like '%LLM SKIPPED%'
SELECT * FROM FLOWFILE
WHERE upper(inputs) like '%STOCK%'
AND not upper(inputs) like '%LLM SKIPPED%'
SELECT * FROM FLOWFILE
WHERE (upper(inputs) like 'QUESTION:%'
OR upper(inputs) like 'Q:%') and not upper(inputs) like '%WEATHER%'
and not upper(inputs) like '%STOCK%'
For Stocks processing:
To parse what stock we need I am using my Open NLP processor to get it.
So you will need to download the processor and the Entity extraction models.
GitHub - tspannhw/nifi-nlp-processor: Apache NiFi NLP Processor
Apache NiFi NLP Processor. Contribute to tspannhw/nifi-nlp-processor development by creating an account on GitHub.github.com
Open NLP Example Apache NiFi Processor
Open NLP Example Apache NiFi Processor I wanted to be able to add NLP processing to my dataflow without calling to…community.cloudera.com
Then we pass that company name to an HTTP REST Endpoint from AlphaVantage that converts Company Name's to Stock symbols. In free accounts you only get a few calls a day, so if we fail we then bypass this step and try to just use whatever you passed in.
https://www.alphavantage.co/query?function=SYMBOL_SEARCH&keywords=${nlp_org_1:trim()}&apikey=GetYourselfAKey&datatype=csv
Using RouteOnContent we filter an Error messages out.
Then we use a QueryRecord processor to convert from CSV to JSON and filter.
SELECT name as companyName, symbol FROM FLOWFILE
ORDER BY matchScore DESC
LIMIT 1
We do a SplitRecord to ensure we are only one record. We then run EvaluateJsonPath to get our fields as attributes.
In an UpdateAttribute we trim the symbol just in case.
${stockSymbol:trim()}
We then pass that stock symbol to Twelve Data via InvokeHTTP to get our stock data.
https://api.twelvedata.com/time_series?apikey=GetAnAPIKey&interval=1min&symbol=${stockSymbol}&format=JSON
We then get a lot of stock data back.
{
"meta" : {
"symbol" : "IBM",
"interval" : "1min",
"currency" : "USD",
"exchange_timezone" : "America/New_York",
"exchange" : "NYSE",
"mic_code" : "XNYS",
"type" : "Common Stock"
},
"values" : [ {
"datetime" : "2023-11-15 10:37:00",
"open" : "152.07001",
"high" : "152.08000",
"low" : "151.99500",
"close" : "152.00999",
"volume" : "8525"
}, {
"datetime" : "2023-11-15 10:36:00",
"open" : "152.08501",
"high" : "152.12250",
"low" : "152.08000",
"close" : "152.08501",
"volume" : "15204"
} ...
We then run EvaluateJSONPath to grab the exchange information.
We fork the record to just get one record as this is just to return to Slack. We use UpdateRecord calls to enrich the stock data with other values. We then run a QueryRecord to limit us to 1 record to send to Slack.
SELECT * FROM FLOWFILE
ORDER BY 'datetime' DESC
LIMIT 1
We run an EvaluateJsonPath to get the most value fields to display.
We then run a PutSlack with our message.
LLM Skipped. Stock Value for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date} is ${closeStockValue}. stock date ${stockdateTime}. stock exchange ${exchange}
We also have a separate flow that split from Company Name.
In the first step we call Yahoo Finance to get RSS headlines for that stock.
https://feeds.finance.yahoo.com/rss/2.0/headline?s=${stockSymbol:trim()}®ion=US&lang=en-US
We use QueryRecord to convert RSS/XML Records to JSON.
We then run a SplitJSON to break out the news items.
We run a SplitRecord to limit to 1 record. We use EvaluateJSONPath to get the fields we need for our Slack message.
We then run UpdateRecord to finalize our JSON.
We then send this message to Slack.
LLM Skipped. Stock News Information for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date}
${title} : ${description}.
${guid} article date ${pubdate}
For those who selected weather, we follow a similiar route (we should add caching with Redis @ Aiven), to stocks. We use my OpenNLP processor to extract locations you might want to have weather on.
The next step is taking the output of the processor and building a value to send to our Geoencoder.
weatherlocation = ${nlp_location_1:notNull():ifElse(${nlp_location_1}, "New York City")}
If we couldn't find a valid location, I am going to say "New York City". We could use some other lookup. I am doing some work on loading all location and could do some advanced PostgreSQL searches on that. Or perhaps OpenSearch or a vectorized datastore.
I pass that location to Open Meteo to find the geo via InvokeHTTP.
https://geocoding-api.open-meteo.com/v1/search?name=${weatherlocation:trim():urlEncode()}&count=1&language=en&format=json
We then parse the values we need from the results.
{
"results" : [ {
"id" : 5128581,
"name" : "New York",
"latitude" : 40.71427,
"longitude" : -74.00597,
"elevation" : 10.0,
"feature_code" : "PPL",
"country_code" : "US",
"admin1_id" : 5128638,
"timezone" : "America/New_York",
"population" : 8175133,
"postcodes" : [ "10001", "10002", "10003", "10004", "10005", "10006", "10007", "10008", "10009", "10010", "10011", "10012", "10013", "10014", "10016", "10017", "10018", "10019", "10020", "10021", "10022", "10023", "10024", "10025", "10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035", "10036", "10037", "10038", "10039", "10040", "10041", "10043", "10044", "10045", "10055", "10060", "10065", "10069", "10080", "10081", "10087", "10090", "10101", "10102", "10103", "10104", "10105", "10106", "10107", "10108", "10109", "10110", "10111", "10112", "10113", "10114", "10115", "10116", "10117", "10118", "10119", "10120", "10121", "10122", "10123", "10124", "10125", "10126", "10128", "10129", "10130", "10131", "10132", "10133", "10138", "10150", "10151", "10152", "10153", "10154", "10155", "10156", "10157", "10158", "10159", "10160", "10161", "10162", "10163", "10164", "10165", "10166", "10167", "10168", "10169", "10170", "10171", "10172", "10173", "10174", "10175", "10176", "10177", "10178", "10179", "10185", "10199", "10203", "10211", "10212", "10213", "10242", "10249", "10256", "10258", "10259", "10260", "10261", "10265", "10268", "10269", "10270", "10271", "10272", "10273", "10274", "10275", "10276", "10277", "10278", "10279", "10280", "10281", "10282", "10285", "10286" ],
"country_id" : 6252001,
"country" : "United States",
"admin1" : "New York"
} ],
"generationtime_ms" : 0.92196465
}
We then parse the results so we can call another API to get the current weather for that latitude and longitude via InvokeHTTP.
https://api.weather.gov/points/${latitude:trim()},${longitude:trim()}
The results are geo-json.
We use EvaluateJSONPath to grab a forecast URL.
Then we call that forecast URL via invokeHTTP.
That produces a larger JSON output that we will parse for the results we want to return to Slack.
We parse the data with EvaluateJSONPath to get primary fields for weather.
We then format those fields to PutSlack.
LLM Skipped. Read forecast on ${date} for ${weatherlocation} @ ${latitude},${longitude}
Used ${forecasturl} ${icon} Temp: ${temperature} ${temperatureunit} - ${temperaturetrend}
There is a wind ${winddirection} at ${windspeed}. ${detailedforecast}
Slack Output
If we do have an LLM question, let's make sure it's just one record.
We use a few different models that are available at IBM WatsonX.AI on IBM Cloud to quickly be access by our REST prompts.
I tested and built the prompts initially at IBM's Prompt Lab and then copied the initial curl statement from there.
IBM watsonx.ai
https://www.ibm.com/docs/en/watsonx-as-a-service?topic=models-
ibm/mpt-7b-instruct2
meta-llama/llama-2–70b-chat
ibm/granite-13b-chat-v1
We have to send our unique secure key to IBM and they will give us a token to use in our next call.
We parse out the question and then send to the WatsonX via REST API.
We build a prompt to send to IBM as follows.
{
"model_id": "meta-llama/llama-2-70b-chat",
"input": "${inputs:urlEncode()}",
"parameters": {
"decoding_method": "greedy",
"max_new_tokens": 200,
"min_new_tokens": 50,
"stop_sequences": [],
"repetition_penalty": 1
},
"project_id": "0ead8ec4-d137-4f9c-8956-50b0da4a7068" }
We parse the generated text which is our Generative AI results plus some helpful metadata on timings.
The result posted to Slack is:
"You can use Apache NiFi to integrate Generative AI models in several ways:1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.
- Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.
- Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.
- Real-time Inference: You can use NiFi's StreamingJobs" After the slack bot posted the results, it posted metrics and debugging information to the chat channel. All of the metadata is posted to another slack channel for administrator monitoring. ==== NiFi to IBM WatsonX.AI LLM Answers On Date: Wed, 15 Nov 2023 15:43:29 GMT Created: 2023-11-15T15:43:29.248Z Prompt: Q: What is a good way to integrate Generative AI and Apache NiFi? Response: ) You can use Apache NiFi to integrate Generative AI models in several ways:
- Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.
- Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.
- Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.
- Real-time Inference: You can use NiFi's StreamingJobs Token: 200 Req Duration: 8153 HTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fb IBM Msg: This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx IBM Msg ID: disclaimer_warning Model ID: meta-llama/llama-2-70b-chat Stop Reason: max_tokens Token Count: 38 TX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756 UUID: da0806cb-6133-4bf4-808e-1fbf419c09e3 Corr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756 Global TX ID: 20c3a9cf276c38bcdaf26e3c27d0479b Service Time: 478 Request ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9c File Name: 1a3c4386-86d2-4969-805b-37649c16addb Request Duration: 8153 Request URL: https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29 cf-ray: 82689bfd28e48ce2-EWR ===== Source GitHub - tspannhw/FLaNK-watsonx.ai: FLaNK Stack with watsonx.ai for google/flan-ul2… FLaNK Stack with watsonx.ai for google/flan-ul2, google/flan-t5-xxl, Granite and other foundation models - GitHub …github.com Make your own Slack Bot
Slack Output
Kafka Distribute
Apache Flink SQL Table Creation DDL
CREATE TABLE ssb
.Meetups
.watsonairesults
(
date
VARCHAR(2147483647),
x_global_transaction_id
VARCHAR(2147483647),
x_request_id
VARCHAR(2147483647),
cf_ray
VARCHAR(2147483647),
inputs
VARCHAR(2147483647),
created_at
VARCHAR(2147483647),
stop_reason
VARCHAR(2147483647),
x_correlation_id
VARCHAR(2147483647),
x_proxy_upstream_service_time
VARCHAR(2147483647),
message_id
VARCHAR(2147483647),
model_id
VARCHAR(2147483647),
invokehttp_request_duration
VARCHAR(2147483647),
message
VARCHAR(2147483647),
uuid
VARCHAR(2147483647),
generated_text
VARCHAR(2147483647),
transaction_id
VARCHAR(2147483647),
tokencount
VARCHAR(2147483647),
generated_token
VARCHAR(2147483647),
ts
VARCHAR(2147483647),
advisoryId
VARCHAR(2147483647),
eventTimeStamp
TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
WATERMARK FOR eventTimeStamp
AS eventTimeStamp
- INTERVAL '3' SECOND
) WITH (
'deserialization.failure.policy' = 'ignore_and_log',
'properties.request.timeout.ms' = '120000',
'format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'watsonxaillmanswers',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'watsonxaillmconsumer'
)
CREATE TABLE ssb
.Meetups
.watsonxresults
(
date
VARCHAR(2147483647),
x_global_transaction_id
VARCHAR(2147483647),
x_request_id
VARCHAR(2147483647),
cf_ray
VARCHAR(2147483647),
inputs
VARCHAR(2147483647),
created_at
VARCHAR(2147483647),
stop_reason
VARCHAR(2147483647),
x_correlation_id
VARCHAR(2147483647),
x_proxy_upstream_service_time
VARCHAR(2147483647),
message_id
VARCHAR(2147483647),
model_id
VARCHAR(2147483647),
invokehttp_request_duration
VARCHAR(2147483647),
message
VARCHAR(2147483647),
uuid
VARCHAR(2147483647),
generated_text
VARCHAR(2147483647),
transaction_id
VARCHAR(2147483647),
tokencount
VARCHAR(2147483647),
generated_token
VARCHAR(2147483647),
ts
VARCHAR(2147483647),
eventTimeStamp
TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
WATERMARK FOR eventTimeStamp
AS eventTimeStamp
- INTERVAL '3' SECOND
) WITH (
'deserialization.failure.policy' = 'ignore_and_log',
'properties.request.timeout.ms' = '120000',
'format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'watsonxaillm',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'allwatsonx1'
)
Example Prompt
{"inputs":"Please answer to the following question. What is the capital of the United States?"}
IBM DB2 SQL
alter table "DB2INST1"."TRAVELADVISORY"
add column "summary" VARCHAR(2048);
-- DB2INST1.TRAVELADVISORY definition
CREATE TABLE "DB2INST1"."TRAVELADVISORY" (
"TITLE" VARCHAR(250 OCTETS) ,
"PUBDATE" VARCHAR(250 OCTETS) ,
"LINK" VARCHAR(250 OCTETS) ,
"GUID" VARCHAR(250 OCTETS) ,
"ADVISORYID" VARCHAR(250 OCTETS) ,
"DOMAIN" VARCHAR(250 OCTETS) ,
"CATEGORY" VARCHAR(4096 OCTETS) ,
"DESCRIPTION" VARCHAR(4096 OCTETS) ,
"UUID" VARCHAR(250 OCTETS) NOT NULL ,
"TS" BIGINT NOT NULL ,
"summary" VARCHAR(2048 OCTETS) )
IN "IBMDB2SAMPLEREL"
ORGANIZE BY ROW;
ALTER TABLE "DB2INST1"."TRAVELADVISORY"
ADD PRIMARY KEY
("UUID")
ENFORCED;
GRANT CONTROL ON TABLE "DB2INST1"."TRAVELADVISORY" TO USER "DB2INST1";
GRANT CONTROL ON INDEX "SYSIBM "."SQL230620142604860" TO USER "DB2INST1";
SELECT "summary", TITLE , ADVISORYID , TS, PUBDATE FROM DB2INST1.TRAVELADVISORY t
WHERE "summary" IS NOT NULL
ORDER BY ts DESC
For an example output email
https://github.com/tspannhw/FLaNK-watsonx.ai/blob/main/example.email.txt
References
The Latest in Real-Tim(e) Analytics: Generative AI, LLM and Beyond
Thursday October 26th, we had a meetup with two Tim's speaking on real-time analytics plus LLM and more. It was a great…medium.com
Streaming LLM with Apache NiFi (HuggingFace)
See my talk on August 23, 2023 at NYC AI Dev Day.medium.com
IBM Documentation
IBM Documentation.www.ibm.com
Supported foundation models available with watsonx.ai
A collection of open source and IBM foundation models are deployed in IBM watsonx.ai.www.ibm.com
Tips for writing foundation model prompts: prompt engineering
Part art, part science, prompt engineering is the process of crafting prompt text to best effect for a given model and…www.ibm.com
Sample foundation model prompts for common tasks
Try these samples to learn how different prompts can guide foundation models to do common tasks.www.ibm.com
IBM Documentation
IBM Documentation.www.ibm.com
IBM Technology Chooses Cloudera as its Preferred Partner for Addressing Real Time Data Movement…
Organizations increasingly rely on streaming data sources not only to bring data into the enterprise but also to…blog.cloudera.com
Travel Advisories
https://travel.state.gov/content/travel/en/traveladvisories/traveladvisories.html/
https://travel.state.gov/_res/rss/TAsTWs.xml
https://medium.com/@tspann/building-a-travel-advisory-app-with-apache-nifi-in-k8-969b44c84958
https://github.com/tspannhw/FLaNK-TravelAdvisory
Video
https://www.youtube.com/watch?v=RPz7Xm4fLF4&t=6s
Source Code
https://github.com/tspannhw/FLaNK-watsonx.ai
Models
https://www.ibm.com/docs/en/watsonx-as-a-service?topic=models-
https://medium.com/cloudera-inc/ingesting-events-into-dockerized-ibm-db2-jdbc-with-apache-nifi-f0ca452d1351
https://www.youtube.com/watch?v=-r8zf_nfxCw
https://medium.com/cloudera-inc/no-code-sentiment-analysis-with-hugging-face-and-apache-nifi-for-article-summaries-cf06d1df1283
https://medium.com/cloudera-inc/cdc-not-cat-data-capture-e43713879c03
https://medium.com/cloudera-inc/building-a-real-time-data-pipeline-a-comprehensive-tutorial-on-minifi-nifi-kafka-and-flink-ee03ee6722cb
https://medium.com/cloudera-inc/building-a-real-time-data-pipeline-a-comprehensive-tutorial-on-minifi-nifi-kafka-and-flink-ee03ee6722cb
Other Related Articles
The Latest in Real-Tim(e) Analytics: Generative AI, LLM and Beyond
Thursday October 26th, we had a meetup with two Tim's speaking on real-time analytics plus LLM and more. It was a great…medium.com
Streaming LLM with Apache NiFi (HuggingFace)
See my talk on August 23, 2023 at NYC AI Dev Day.medium.com
Evolve NYC 2023 Wrap-Up
Evolve NYC 2023 was an intense event with a ton of speakers, sessions, topics and cool people.medium.com
No Code Sentiment Analysis with Hugging Face and Apache NiFi for Article Summaries
Apache NiFi, Hugging Face, NY Times RSS, Sentiment Analysis, Deep Learning, REST API, Classification, Distilbert…medium.com
Top comments (0)