This post is a second in a series of posts on how to use the Stream Machine platform. The first post is about getting started.
This time, I'll look at how to get Stream Machine events into a Google Cloud Storage Bucket and from there into a Pandas dataframe. Let's get started.
I assume you have a Google Cloud Bucket somewhere, and you have created a service account.
- IAM -> Service Accounts -> Create Service Account
The post assumes you've stored the Service Account credentials in a file credentials.json
in the working directory. The file contents are something like this:
{
"type": "service_account",
"project_id": "stream-machine-development",
"private_key_id": "bae39......d6efda",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG......3kS+0=\n-----END PRIVATE KEY-----\n",
...
}
Creating a sink and an exporter
In order to store event data in a cloud bucket, you need to
- configure the bucket (name, credentials and path) in a so-called
sink
entity. - create a batch-exporter from a stream to that sink.
strm create sink demo strm-demo --credentials-file=credentials.json
{
"ref": { "billingId": "strmbart5986941267", "name": "demo" },
"sinkType": "GCLOUD",
"bucket": { "bucketName": "strm-demo", "credentials": "..." }
}
# batch exporter for the encrypted stream at path demo-in on the bucket
strm create batch-exporter demo --interval 30 --path-prefix demo-in --sink demo
# batch exporter for the decrypted level 2 stream at path demo-2 on the bucket
strm create batch-exporter demo-2 --interval 30 --sink demo --path-prefix demo-2
{
"ref": {
"billingId": "strmbart5986941267",
"name": "demo-demo-2"
},
"streamRef": {
"billingId": "strmbart5986941267",
"name": "demo-2"
},
"interval": "30s",
"sinkName": "demo",
"pathPrefix": "demo-2"
}
Start sending some data
The cli has a simulator on board. Use version 1.4.0 of the cli, otherwise you'll get different simulated events.
strm sim run-random demo --interval 100
Starting to simulate random streammachine/demo/1.0.2 events to stream demo.
Sending one event every 100 ms.
Sent 50 events
Sent 100 events
Sent 150 events
Sent 200 events
...
We should see some data in our bucket:
gsutil ls gs://strm-demo/demo-in | head -5
gs://strm-demo/demo-in/2021-08-12T09:38:00-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
gs://strm-demo/demo-in/2021-08-12T09:38:30-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
gs://strm-demo/demo-in/2021-08-12T11:09:00-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
gs://strm-demo/demo-in/2021-08-12T11:09:30-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
gs://strm-demo/demo-in/2021-08-12T11:10:00-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
gsutil ls gs://strm-demo/demo-2 | head -5
gs://strm-demo/demo-2/2021-08-12T09:38:00-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
gs://strm-demo/demo-2/2021-08-12T09:38:30-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
gs://strm-demo/demo-2/2021-08-12T09:39:00-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
gs://strm-demo/demo-2/2021-08-12T11:09:00-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
gs://strm-demo/demo-2/2021-08-12T11:09:30-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
and in one of the files we can see JSON events, newline based (for readability I've formatted an example JSON event below).
gsutil cat gs://strm-demo/demo-in/.....jsonl | head -1
{
"strmMeta": {
"eventContractRef": "streammachine/example/1.3.0",
"nonce": -1557054268,
"timestamp": 1628761079144,
"keyLink": "1d960e7a-4169-4bcf-bece-e8fcc3243c06",
"billingId": "strmbart5986941267",
"consentLevels": [ 0, 1 ]
},
"uniqueIdentifier": "AQq8Ihq3DBOahkZNXpBfdky8m04pb6c02RIUNOHo",
"consistentValue": "AQq8IhqopLqWCpIDd1+xZUw/KCtXObL7irK5NbgE1I4=",
"someSensitiveValue": "AQq8Ihq62mxCv1fqEZ+0bcijMEFZ/VFnpA4EEs8XRp0P",
"notSensitiveValue": "not-sensitive-30"
}
The demo-2
directory will contain decrypted data.
gsutil cat gs://strm-demo/demo-2/.....jsonl | head -1
{
"strmMeta": {
"eventContractRef": "streammachine/example/1.3.0",
"nonce": -62169113,
"timestamp": 1628761077936,
"keyLink": "1f118159-a27a-4468-a540-23a4938bce14",
"billingId": "strmbart5986941267",
"consentLevels": [ 0, 1, 2 ]
},
"uniqueIdentifier": "unique-25",
"consistentValue": "session-488",
"someSensitiveValue": "AXHCR9j0KLyYy7Bivvrk+xfU0D4pRJkIlHAE/PtvtsPx",
"notSensitiveValue": "not-sensitive-19"
}
To Jupyter and Pandas
The next step is to get these into a Pandas Dataframe in a Jupyter notebook. You need credentials.json
that defines the Google Cloud Service Account credentials.
The essence of the Jupyter notebook is:
pip install jupyter gcsfs pandas
import pandas as pd
from pandas import json_normalize
import gcsfs
import json
# set these to your own project and bucket
bucket = "strm-demo"
project = "stream-machine-development"
Load the data
fs = gcsfs.GCSFileSystem(project=project, token="credentials.json")
def one_object_to_df(path):
with fs.open(path, "r") as _f:
df = json_normalize([json.loads(l) for l in _f.readlines()])
df['strmMeta.timestamp'] = pd.to_datetime(df['strmMeta.timestamp'], unit='ms')
df = df.set_index('strmMeta.timestamp').sort_index()
return df
def make_df(folder):
print(folder)
df = None
for f in fs.find(f"{bucket}/{folder}"):
if df is None:
df = one_object_to_df(f)
else:
df = df.append(one_object_to_df(f))
return df
demo = make_df("demo-in")
demo_2 = make_df("demo-2")
Show some data
demo.consistentValue.value_counts()
AScCSOgnaoVZw2nqRtTSBlwV8pWe5R7SXXJcL1tXC5M= 55
AV3MqegMMQ3Bikr1klqdNN+X+6rykyNtftZizyRKA6U= 52
AQMdVpGbhAScpImKh1I5Lx1FLhb19N97/1reQhZd0ig= 50
AV4eJeE0kNrUw5svSpJTUryc+C4ZZ/zCdZL++VEgY6g= 48
AX2OWCN2zha+odokuX9rTDwOkM47lbNBGnMDPbJZieU= 48
..
AV8dQPY3mopfROkxlMDeLqcYAxA3qqbYG89J0SSEDio= 19
AVn0ykPCKcX5vtT07DjoV+tcTgUeLmknOytDWzcPAQ== 19
ATKFsJ5+fXeCnsLOc1k+IA+VSYtWR+wT7Iq/4IfwPjA= 19
AXRHjzm9RTN60ocpLNVgMvdW8mSEKKg0/fezhT/We78= 19
ARtzjnb15acL/xVvsS6dslv3M7A8WHUkCmy94LmK5g== 19
Name: consistentValue, Length: 1000, dtype: int64
Top comments (0)