There’s a bunch of improvements in the works for how ksqlDB handles code deployments and migrations. For now though, for deploying queries there’s the option of using headless mode (which is limited to one query file and disables subsequent interactive work on the server from a CLI), manually running commands (yuck), or using the REST endpoint to deploy queries automagically. Here’s an example of doing that.
Deploying statements using /ksql
endpoint
#!/bin/bash
function deploy_ksql {
echo "Deploying "$1
grep -v '^--' $1 | \
tr '\n' ' ' | \
sed 's/;/;\'$'\n''/g' | \
sed 's/"/\\\\"/g' | \
while read stmt; do
echo '------'
echo "$stmt"
if [${#stmt} -gt 0] ; then
echo '{"ksql":"'"$stmt"'", "streamsProperties": { "ksql.streams.auto.offset.reset": "earliest" }}' | \
curl -s -X "POST" "http://localhost:8088/ksql" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d @- | \
jq
fi
done
# Give ourselves chance to breath
sleep 2
}
With this bash function defined (put it in a .sh
file and source
it) you can deploy multiple files (each with multiple statements in) thus:
echo "Creating ksqlDB tables for reference data"
deploy_ksql ./data/ksql/01_location/00_location.ksql
deploy_ksql ./data/ksql/03_movements/01_canx_reason.ksql
deploy_ksql ./data/ksql/02_cif_schedule/01_schedule_raw.ksql
Running queries using the /query-stream
endpoint
For queries (SELECT…
) you need to use a different endpoint.
function query_ksql {
echo "Querying "$1
grep -v '^--' $1 | \
tr '\n' ' ' | \
sed 's/;/;\'$'\n''/g' | \
sed 's/"/\\\\"/g' | \
while read stmt; do
echo '------'
echo "$stmt"
if [${#stmt} -gt 0] ; then
echo '{"sql":"'"$stmt"'", "properties": { "ksql.streams.auto.offset.reset": "earliest" }}' | \
curl --silent --show-error --http2 -X "POST" "http://localhost:8088/query-stream" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d @- | \
jq
fi
done
}
Use is similar to the deploy_ksql
function - pass it a file with SQL in to run:
deploy_ksql ./data/ksql/03_movements/02_activations.ksql
query_ksql ./data/ksql/03_movements/03_activations_query.ksql
Note that you can’t use SET
in the SQL passed to the endpoint. The above bash function hardcodes 'auto.offset.reset' = 'earliest'
- you can customise it as needed to set other values.
Improvements
There’s a ton to improve here. For example
-
Checking the return code of the REST call and aborting if there’s an error condition
- Perhaps muting verbose output if everything’s OK
Funky characters will invariably trip up the bash code - things like quotation marks and asterisks are handled, but there’s plenty else that probably isn’t
Automagically deploying streams and table to Confluent Cloud
The above scripts can be tailored to work with Confluent Cloud too. Make sure you set the necessary environment variables first. See this article on how to obtain the ksqlDB API key pair
CCLOUD_KSQL_ENDPOINT
CCLOUD_KSQL_API_KEY
CCLOUD_KSQL_API_SECRET
#!/bin/bash
function deploy_ccloud_ksql {
echo "Deploying "$1
grep -v '^--' $1 | \
tr '\n' ' ' | \
sed 's/;/;\'$'\n''/g' | \
sed 's/'\''/'"'"'/g' | \
sed 's/"/\\\\"/g' | \
while read stmt; do
echo '------'
if [${#stmt} -gt 0] ; then
echo '{"ksql":"'"$stmt"'", "streamsProperties": { "ksql.streams.auto.offset.reset": "earliest" }}' | \
curl --silent --show-error \
-u $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET \
-X "POST" $CCLOUD_KSQL_ENDPOINT"/ksql" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d @- | \
jq
fi
done
# Give ourselves chance to breath
sleep 2
}
function query_ksql {
echo "Querying "$1
grep -v '^--' $1 | \
tr '\n' ' ' | \
sed 's/;/;\'$'\n''/g' | \
sed 's/"/\\\\"/g' | \
while read stmt; do
echo '------'
echo "$stmt"
if [${#stmt} -gt 0] ; then
echo '{"sql":"'"$stmt"'", "properties": { "ksql.streams.auto.offset.reset": "earliest" }}' | \
curl --silent --show-error \
-u $CCLOUD_KSQL_API_KEY:$CCLOUD_KSQL_API_SECRET \
-X "POST" $CCLOUD_KSQL_ENDPOINT"/query-stream" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d @- | \
jq
fi
done
}
Top comments (0)