DEV Community

Reuven K for Aerospike

Posted on

2 2

Managing Time Series Data with Aerospike, viewing with Grafana

A common use case that is implemented with Aerospike involves storing time series data that arrives at a very high throughput rate. The ability to view that data is real-time is also a desired component of this use case. A demo using price data for equity (stock) prices has been created which will demonstrate both of those capabilities.

First we want to populate a baseline of historical opening and closing stock price data for some symbols. To populate this data, a Java program loaded source data for a few stock symbols, IBM, WMT, XOM, BRK, and AMZN from freely available Yahoo Finance export files. That source data only includes one record per day with information regarding opening, closing, high, and low prices. The program generates data for each second within the price ranges provided in the file. All the generated tick price data for the stock symbol for a single day is stored in a single Sorted List which is then written to the Aerospike database. A separate List entry is appended to another record which encapsulates all closing prices over a single year for that same stock symbol.

The code is as follows:

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.cdt.ListOperation;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.cdt.ListOrder;
import com.aerospike.client.cdt.ListPolicy;
import com.aerospike.client.cdt.ListWriteFlags;
import com.aerospike.client.Value.ListValue;
import com.aerospike.client.Value.LongValue;
import com.aerospike.client.Value;
import com.aerospike.client.Value.StringValue;
import java.util.Vector;
import java.util.ArrayList;
import java.text.SimpleDateFormat;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
/**
*
* @author reuven
*/
public class PopulateTimestampList {
// The ticker symbols that we will load
enum StockSymbols {
IBM,
WMT,
XOM,
BRK,
AMZN
}
public static void main(String[] args) throws Exception {
// Connect to Aerospike. Change the IP address as appropriate
AerospikeClient client = new AerospikeClient("192.168.1.25", 3000);
WritePolicy policy = new WritePolicy();
// Since some of the records written are fairly large (and I had a WIFI connection), need to change the timeouts from the default
policy.totalTimeout = 10000;
policy.socketTimeout = 7500;
// Create a ListPolicy that will force the List of Lists to be ordered by timestamp (the first member of each tick List)
ListPolicy lpolicy = new ListPolicy(ListOrder.ORDERED, ListWriteFlags.DEFAULT);
String cvsSplitBy = ",";
// For each of the stock symbols
for (StockSymbols aStock: StockSymbols.values()){
// Open up the associated input file
// The first few records of one of the files, extracted from Yahoo Finance, is as follows:
// Date,Open,High,Low,Close,Adj Close,Volume
// 2014-11-11,163.699997,163.899994,162.600006,163.300003,131.590302,3534400
// 2014-11-12,162.279999,163.000000,161.759995,161.919998,130.478241,3378200
// 2014-11-13,162.000000,162.800003,161.800003,162.789993,131.179306,3239700
String csvFile = "/Users/reuven/Downloads/" + aStock + ".csv";
System.out.println("File: " + csvFile);
BufferedReader br = null;
String line = "";
String[] dailyData = new String[0];
Vector allTickers = new Vector();
ArrayList<Value> dailyCloses = new ArrayList<Value>();
int currentYear = -1; // Initialized to a dummy year
try {
br = new BufferedReader(new FileReader(csvFile));
// Skip the header line
br.readLine();
// For each input record
while ((line = br.readLine()) != null) {
dailyData = line.split(cvsSplitBy);
allTickers = new Vector();
Vector closeTickers = new Vector();
Vector tickerEntry = new Vector();
// Create a ListValue for the Daily open price (9:30AM) and the close price (4:00PM) based upon the input data
Long tickMsec = new Long(new SimpleDateFormat("yyyy-MM-dd HH:mm").parse(dailyData[0] + " 09:30").getTime());
tickerEntry.add(new LongValue(tickMsec)); // Opening Price Time
tickerEntry.add(new StringValue(dailyData[1]));
allTickers.add(new ListValue(tickerEntry));
// Add the CLOSE value to a list for the symbol
// If there's a change in year, then write the record of CLOSING prices for the year
if ((Integer.valueOf(dailyData[0].substring(0,4))) != currentYear) {
if (currentYear != -1) {
Key key = new Key ("test", "tickerSet", aStock.toString() + "-" + String.valueOf(currentYear));
client.operate(policy, key, ListOperation.appendItems(lpolicy, "datapoints", dailyCloses));
dailyCloses = new ArrayList();
}
currentYear = Integer.valueOf(dailyData[0].substring(0,4));
}
// Create a record for the closing price alone
dailyCloses.add(new ListValue(tickerEntry));
// Now need to generate random prices for a List within a single daily range
// Open is [1], High is [2], Low is [3], Close is [5]
// Open is 9:30AM Close is 4PM
// Each second should have a value between the high and low, with a random change of no more than 5% of the difference.
double prevTick = new Float(dailyData[1]);
float dailyHigh = Float.parseFloat(dailyData[2]);
float dailyLow = Float.parseFloat(dailyData[3]);
float dailyHighLowDiff = dailyHigh - dailyLow;
for (int sec = 1; sec < (6.5 * 60 * 60); sec++) {
tickerEntry = new Vector();
tickMsec = tickMsec + 1000; //add one second
tickerEntry.add(new LongValue(tickMsec));
double tryTick = -1;
// If the random tick value is less than the daily low or greater than the daily high, try again
while (tryTick < dailyLow || tryTick > dailyHigh) {
tryTick = prevTick - (dailyHighLowDiff * .05) + ((Math.random()) * (dailyHighLowDiff * .10));
}
String[] decSplit = String.valueOf(tryTick).split("\\.");
//Convert the tick value to a string
try {
tickerEntry.add(new StringValue(decSplit[0]) + "." + decSplit[1].substring(0,6));
} catch (StringIndexOutOfBoundsException e) {
tickerEntry.add(new StringValue(String.valueOf(tryTick)));
}
// Add the timstamp, tick list value to the list of ticks
allTickers.add(new ListValue(tickerEntry));
prevTick = tryTick;
}
// Now create a record for the symbol per day with all the ticks in a sorted list
Key key = new Key ("test", "tickerSet", aStock.toString() + "-" + dailyData[0].replaceAll("-", ""));
client.operate(policy, key, ListOperation.appendItems(lpolicy, "datapoints", allTickers));
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Key key = new Key ("test", "tickerSet", aStock.toString() + "-" + String.valueOf(currentYear));
client.operate(policy, key, ListOperation.appendItems(lpolicy, "datapoints", dailyCloses));
}
client.close();
System.out.println("DONE!");
System.exit(0);
}
}

The next Java program is utilized to endlessly generate an infinite number of real-time stock random ticker values selected stock symbols. As its base, it relies on a CSV file that provides the stock symbol, low/high range of values and the maximum percentage change per second. For each of those symbols, a random stock price will be generated for the current time (second resolution). That price will be added to the Sorted List of ticker prices for that record in the Aerospike database.

Note that the “operation” API call is utilized to add the entry to a ticker price List for that symbol and date on the server WITHOUT requiring the entire list to be downloaded/edited/updated at the client. The ability to execute operations on the database server makes the storage/manipulation of large lists practical; having to download a large record to the client over the network, make a minor change and then load back to the server simply cannot SCALE to large data volumes.

After generating the values for each symbol, the application will sleep for a second.

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.cdt.ListOperation;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.cdt.ListOrder;
import com.aerospike.client.cdt.ListPolicy;
import com.aerospike.client.cdt.ListWriteFlags;
import com.aerospike.client.Value.ListValue;
import com.aerospike.client.Value.LongValue;
import com.aerospike.client.Value.StringValue;
import java.util.ArrayList;
import java.util.Vector;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
/**
*
* @author reuven
*/
class tickRange {
String symbol;
int lowValue;
int highValue;
int tickPctRange;
double lastTick;
}
public class PopulateStreamingTicks {
public static void main(String[] args) throws Exception {
// Connect to Aerospike. Change the IP address as appropriate
AerospikeClient client = new AerospikeClient("192.168.1.25", 3000);
WritePolicy policy = new WritePolicy();
// Since some of the records written are fairly large (and I had a WIFI connection), need to change the timeouts from the default
policy.totalTimeout = 10000;
policy.socketTimeout = 7500;
System.out.println("Connected to Aerospike");
// Set the ListPolicy so that the list or lists will be ordered
ListPolicy lpolicy = new ListPolicy(ListOrder.ORDERED, ListWriteFlags.DEFAULT);
// Read from a csv file the parameters for the generation of real-time tick data
// A sample of the file:
// Symbol,Min,Max,PctTickChange
// IBM,130,145,20
// WMT,101,120,20
// XOM,65,75,20
String csvFile = "/Users/reuven/TestData/LiveFeed.csv";
String cvsSplitBy = ",";
BufferedReader br = null;
String[] tickerInfo = new String[0];
ArrayList<tickRange> tickerList = new ArrayList<tickRange>();
try {
br = new BufferedReader(new FileReader(csvFile));
br.readLine();
// Get the desired tick parameters for generating data
String line = "";
while ((line = br.readLine()) != null) {
tickerInfo = line.split(cvsSplitBy);
tickRange aTickRange = new tickRange();
aTickRange.symbol = tickerInfo[0];
aTickRange.lowValue = Integer.valueOf(tickerInfo[1]);
aTickRange.highValue = Integer.valueOf(tickerInfo[2]);
aTickRange.tickPctRange = Integer.valueOf(tickerInfo[3]);
aTickRange.lastTick = aTickRange.lowValue + (aTickRange.highValue - aTickRange.lowValue);
tickerList.add(aTickRange);
}
// Now run infinite loop starting from the current time
// Generate a random tick within the desired range for all symbols, then sleep for a second
while (true) {
Long tickMsec = System.currentTimeMillis();
for (tickRange aTickRange: tickerList) {
Vector tickerEntry = new Vector();
tickerEntry.add(new LongValue (tickMsec));
double dailyHighLowDiff = aTickRange.highValue - aTickRange.lowValue;
double tryTick = -1;
// Generate a value; if out of range, generate again
while (tryTick < aTickRange.lowValue || tryTick > aTickRange.highValue) {
tryTick = aTickRange.lastTick - (dailyHighLowDiff * aTickRange.tickPctRange * .01) + ((Math.random()) *
(dailyHighLowDiff * aTickRange.tickPctRange * 2 * .01));
}
tickerEntry.add(new StringValue(String.valueOf(tryTick)));
aTickRange.lastTick = tryTick;
Key key = new Key ("test", "tickerSet", aTickRange.symbol + "-" + new SimpleDateFormat("yyyyMMdd").format(new Date(tickMsec)));
client.operate(policy, key, ListOperation.append(lpolicy, "datapoints", new ListValue(tickerEntry)));
//System.out.println("Symbol, Value: " + aTickRange.symbol + ", " + aTickRange.lastTick);
}
TimeUnit.SECONDS.sleep(1);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

Last, but not least, we’d like to display the real-time stock ticker data graphically. For this, we’ve integrated Grafana with Aerospike. We’ve modified a publicly available sample for building a Grafana datasource into one for use with Aerospike for purposes of this demo. Query requests are parsed and then fulfilled by retrieving the desired records from the Aerospike database, which is being populate in read-time by the Java program described above. If the requested data is for a range of less than 3 days, then the detailed tick data with a granularity of seconds (one database record symbol per date) is retrieved. If the range is greater, then the data is retrieved with the granularity of days (one database record per symbol per year). Note that retrieval of the multiple Aerospike database records required to fulfill the client request is accomplished via a single API call (get_many). As the database is distributed among multiple servers, the request is fanned out in parallel to the servers that have the requested data.

#
# Simple demo of Aerospike - Grafana Integration to display stock ticker values based on data in an Awrospike DB
#
from flask import Flask, request, jsonify, json, abort
from flask_cors import CORS, cross_origin
import aerospike
from aerospike_helpers.operations import list_operations as list_ops
import sys
import time
from datetime import datetime, timedelta
from dateutil import tz
import dateutil
app = Flask(__name__)
cors = CORS(app)
app.config['CORS_HEADERS'] = 'Content-Type'
methods = ('GET', 'POST')
metric_finders= {}
metric_readers = {}
annotation_readers = {}
panel_readers = {}
# Configure the Aerospike client (Change the hot IP address as appropriate)
config = {
'hosts': [ ('127.0.0.1', 3000) ],
'policies': {'batch': {'total_timeout': 5000}}
}
# Create a client object and connect it to the Aerospike cluster
try:
client = aerospike.client(config).connect()
except:
import sys
print("failed to connect to the cluster with", config['hosts'])
sys.exit(1)
print("Connected to Aerospike")
# Respond to an empty request with a message
@app.route('/', methods=methods)
@cross_origin()
def hello_world():
print (request.headers, request.get_json())
return 'Aerospike timeseries tick data demonstration'
# Respond to a '/search' request with the list of available ticker symbols
@app.route('/search', methods=methods)
@cross_origin()
def find_metrics():
print (request.headers, request.get_json())
req = request.get_json()
metrics = [ "IBM", "WMT", "XOM", "BRK", "AMZN" ]
return jsonify(metrics)
# Respond to the /query request via data from Aerospike
@app.route('/query', methods=methods)
@cross_origin(max_age=600)
def query_metrics():
req = request.get_json()
# Determine the local and UTC timezones
localTZ = tz.tzlocal()
utcTZ = tz.tzutc()
# Extract the range of requested times from the request and adjust to the local timezone
lowDate = datetime.strptime(req['range']['from'], "%Y-%m-%dT%H:%M:%S.%fZ")
lowDate = lowDate.replace(tzinfo = utcTZ)
lowDate = lowDate.astimezone(localTZ)
highDate = datetime.strptime(req['range']['to'], "%Y-%m-%dT%H:%M:%S.%fZ")
highDate = highDate.replace(tzinfo=utcTZ)
highDate = highDate.astimezone(localTZ)
timeDiff = highDate - lowDate
# Extract the requested ticker symbols from the request
targetTickers = []
for aTarget in req['targets']:
targetTickers.append(aTarget['target'])
# Setup a batch read for the range of dates
results = []
# If the requested timeframe is less than a day...
if timeDiff.days < 1:
for aTicker in targetTickers:
key = ('test', 'tickerSet', aTicker + "-" + lowDate.strftime("%Y%m%d"))
# Setop a List operation to ONLY retrieve the range of list values within a single record for the requested timestmnp range
# This will greatly minimize the data that has to be sent from the server to the client.
ops = [
list_ops.list_get_by_value_range('datapoints', aerospike.LIST_RETURN_VALUE, [int(lowDate.timestamp()) * 1000, aerospike.CDTWildcard()], [int(highDate.timestamp() * 1000), aerospike.CDTWildcard()])
]
_, _, as_results = client.operate(key, ops)
res = {"target": aTicker}
res["datapoints"] = []
for bin in as_results["datapoints"]:
tmp_bin = bin[0]
bin[0] = float(bin[1])
bin[1] = tmp_bin
res["datapoints"].append(bin)
results.append(res)
else:
for aTicker in targetTickers:
as_keys = []
if timeDiff.days < 3:
for i in range(0, timeDiff.days + 1):
# If the range of days is less than 3...
# Then we want to retrieve the detailed ticks (with a second granularity) from Aerospike
key = ('test', 'tickerSet', aTicker + "-" + (lowDate + timedelta(days=i)).strftime("%Y%m%d"))
as_keys.append(key)
else: # For >= 3 days, get a record for each year with that year's closing daily prices
print ("Low and High year: " + str(lowDate.year) + " " + str(highDate.year))
for aYear in range(lowDate.year, highDate.year + 1):
print ("Key Value")
print (aTicker + "-" + str(aYear))
key = ('test', 'tickerSet', aTicker + "-" + str(aYear))
# Append to the list of keys
as_keys.append(key)
# This is the Aerospike call that will retrieve the multiple records from the Aerospike DB in parallel
as_results = client.get_many(as_keys)
# The remainder is all formatting for the return from the HTTP request
res = {"target": aTicker}
res["datapoints"] = []
for aRes in as_results:
if aRes[1] is None:
continue
dps = aRes[2]
for bin in dps["datapoints"]:
tmp_bin = bin[0]
bin[0] = float(bin[1])
bin[1] = tmp_bin
res["datapoints"].append(bin)
results.append(res)
#Return the results to the caller
return jsonify(results)
if __name__ == '__main__':
# Start listening on port 3334. Change as desired.
app.run(host='0.0.0.0', port=3334, debug=True)

A sample of the results of demo project in a Grafana window is visible at the top of this blog post.

I hope that this demo code is useful for the reader and provides some useful ideas as to how to employ Aerospike towards capturing/viewing “real-time” time series data, such as for stock ticker (this demo) or IIoT applications. Aerospike’s high throughput, low latency, low TCO is optimal for these types of applications.

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more

Top comments (0)

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more