DEV Community

Bart van Deenen
Bart van Deenen

Posted on • Edited on

First steps with Streammachine.io

This post aims to show some first steps with the Stream Machine platform. The target audience is developers, data engineers and possibly data scientists. This post uses Python, but it can similarly be done with Stream Machine's Other language drivers.

About me: I'm Bart van Deenen, lead engineer of Stream Machine, so I should sort of know what I'm talking about :-). This post is meant to be unbiased, and from the perspective of a third-party developer.

Stream Machine

Stream Machine promises to provide Lightning fast, privacy secured, customer data - you can actually use.

So what does this actually mean?

  • Stream Machine accepts events with a strictly defined serialization schema (currently Apache Avro and Json-Schema are supported). Any valid schema can be used, but needs to be registered with Stream Machine.
  • Stream Machine handles events that are subject to an event-contract that defines which fields in the event schema (the serialization schema) contain Personally Identifiable Information ( referred to as PII or PII Data).
  • Stream Machine event contracts may contain customizable validation rules that define field value validity.
  • Stream Machine processes events with a highly available fault tolerant stream processing system that encrypts all PII field values. The encryption keys are rotated every 24 hours, and this leads to a GDPR compliant stream of event data that can be used by everyone in your company. During the 24 hours, the encrypted values remain static.
  • Stream Machine events contains consent-level information, and only those events that allow decryption of PII data for certain purposes can be decrypted into decrypted stream(s) that can only be used by those inside your company that are allowed.

This post uses a debugging output of the stream data that uses a websocket. Production level output streams require hooking up our internal Apache Kafka streams. AWS S3 and Google Cloud Storage buckets can be used for batch processing. This will be explored in a next blog post.

The plan

I'm going to build a Python application that mimics users clicking around on a dummy web-shop, that will send a click stream to Stream Machine. I want to retrieve the anonymized data from a Google Cloud bucket, and show them in a Jupyter notebook. I also want to see that only for those simulated users that have given full personalized marketing permissions I retrieve their click stream events. This first post just gets the basics working, i.e. sending events to Stream Machine, and retrieving them.

The steps

An account

I went to streammachine.io to register an account, and after confirming my email, I was shown this page:

login

Get the CLI

The cli is the most mature way to interact with Stream Machine. Install the most recent version and don't forget to install the shell auto-completion.

Let's create a new stream

strm auth login your-email@wherever
strm create stream demo --save
Enter fullscreen mode Exit fullscreen mode

The --save options causes the credentials to be saved in
~/.config/stream-machine/Stream/demo.json.

Let's send an event!

I'm following along with the Python example in the documentation. I'm going to use the syncsender because I want to play with it in ipython.

The demo example uses the streammachine/demo/1.0.2 schema. You can see its full definition via strm get schema streammachine/demo/1.0.2. The corresponding event-contract that defines validation rules and such is streammachine/example/1.3.0.

For convenience I've copied the saved credentials file (see above) into the working directory.

Keep an eye on the stream

strm egress demo
Enter fullscreen mode Exit fullscreen mode

This starts a websocket connection where you'll see a json serialization of the messages on a stream.

python3 -m venv .venv
. .venv/bin/activate
make
# observe the installed streammachine stuff.
pip freeze | grep streammachine
streammachine-driver==1.0.0
streammachine-schemas-common==1.0.0
streammachine-schemas-demo-avro==1.0.2
Enter fullscreen mode Exit fullscreen mode

Ok, let's start ipython

python3 -m pip install ipython
$> ipython

# import a class that matches the structure of the demo schema
# note that this package name is so long because it's derived from the Stream
# Machine schema ref streammachine/demo and also the `namespace` inside the Avro
# schema (which is io.streammachine.schemas.demo.v1)

from streammachine_io_streammachine_schemas_demo_v1.io.streammachine.schemas.demo.v1 import DemoEvent

from streammachine.driver import StreamMachineEvent, current_time_millis
from streammachine.driver.client.syncsender import SyncSender

event = DemoEvent()

import json
creds = json.load(open("demo.json"))
sender = SyncSender(creds['ref']['billingId'], creds['credentials'][0]['clientId'],
    creds['credentials'][0]['clientSecret'])
sender.start()
sender.wait_ready()
sender.send_event(event)
Enter fullscreen mode Exit fullscreen mode

After the send_event(event) I get the following error:

Error while sending event to Stream Machine (https://in.strm.services/event),
    response status = 400, response: Invalid event contract: . Not supported.
Enter fullscreen mode Exit fullscreen mode

We need to set strmMeta.eventContractRef to a valid contract reference (strm list event-contracts). While a schema defines the shape/structure of an event, an event-contract defines the rules that apply to an event, i.e. what validations, what fields contain personally identifiable data, which field ties the events into a sequence etc... We'll use streammachine/example/1.3.0 (inspect with strm get event-contract ..)

event.strmMeta.eventContractRef="streammachine/example/1.3.0"
sender.send_event(event)
Error while sending event to Stream Machine (https://in.strm.services/event),
    response status = 400, response: Field: 'consistentValue' in event with schema: 'streammachine/demo/1.0.2'
    with value: '' doesn't match regex: '^.+$'
Enter fullscreen mode Exit fullscreen mode

So the field consistentValue needs at least one character?

event.consistentValue="hi there"
sender.send_event(event)
# nothing ....
Enter fullscreen mode Exit fullscreen mode

A whole lot of nothing is returned. Stream Machine is meant for very high throughput and logging anything at thousands of events per second will quickly break the bank if you're using StackDriver. So None and http status 204 are the indicators that everything went fine.

But if we have the strm egress demo running, we would see something like

strm egress demo
{
  "strmMeta": {
    "eventContractRef": "streammachine/example/1.3.0",
    "nonce": -1515353731,
    "timestamp": 1628688372401,
    "keyLink": "cd181172-4ec7-4f0d-86bb-662fc0ee854b",
    "billingId": "strmbart5986941267",
    "consentLevels": []
  },
  "uniqueIdentifier": null,
  "consistentValue": "AWpvnLU8hBPWRfYrAjWPs0wWt6vBMMXXEnSqGTw=",
  "someSensitiveValue": null,
  "notSensitiveValue": null
}

Enter fullscreen mode Exit fullscreen mode

We can observe that the consistentValue field has some data in it (it's actually base64 encoded encrypted "hi there"). Explanation of the various fields in strmMeta are explained in the documentation

Let's fill in some more fields:

import random, uuid

def create_avro_event():
    event = DemoEvent()

    event.strmMeta.eventContractRef = "streammachine/example/1.3.0"
    event.strmMeta.consentLevels = [random.randint(0, 3)]

    event.uniqueIdentifier = str(uuid.uuid4())
    event.someSensitiveValue = "A value that should be encrypted"
    event.consistentValue = "a-user-session"
    event.notSensitiveValue = "Anyone is free to see this text."
    return event

event = create_avro_event()

r = sender.send_event(event)
print(r)
None
Enter fullscreen mode Exit fullscreen mode

In the strm egress demo pane we see:

{
  "strmMeta": {
    "eventContractRef": "streammachine/example/1.3.0",
    "nonce": 1820364498,
    "timestamp": 1628689078226,
    "keyLink": "5074c4de-c51b-4321-a70b-c87db4c79bde",
    "billingId": "strmbart5986941267",
    "consentLevels": [ 0 ]
  },
  "uniqueIdentifier": "AUvu95+NUDFf9krvvVUSU+pJsRBl9XahrMVCTpjqHDa9lTHBTzbRdjazyyMVi3xDy2Ps7HDxJHWA",
  "consistentValue": "AUvu958J4Lf8JWlxwEfdMXXSZpjxdkBSL4hl8Tk5MVHp3L4=",
  "someSensitiveValue": "AUvu95+bq6bw4Z1l9pTYLNQwd/ecdtntrH5mcBJNWv8n6n9jzYxKwEuSDUjig5lPNYqpZpU=",
  "notSensitiveValue": "Anyone is free to see this text."
}
Enter fullscreen mode Exit fullscreen mode

Just looking at it, I can see that all the PII field in the event-contract have been encrypted

Decrypting

We can create a decrypted stream that does contain personal data.

strm create stream --derived-from demo --levels 2 --save
Enter fullscreen mode Exit fullscreen mode

Start sending data in a loop

import time
while True:
    time.sleep(0.1)
    sender.send_event(create_avro_event())
Enter fullscreen mode Exit fullscreen mode

And observe in the egress:

strm egress demo-2
{
  "strmMeta": {
    "eventContractRef": "streammachine/example/1.3.0",
    "nonce": 1165221388,
    "timestamp": 1628689364936,
    "keyLink": "5074c4de-c51b-4321-a70b-c87db4c79bde",
    "billingId": "strmbart5986941267",
    "consentLevels": [
      3
    ]
  },
  "uniqueIdentifier": "279e9dcb-a9a5-497b-8d46-b213106f7fab",
  "consistentValue": "a-user-session",
  "someSensitiveValue": "AUvu95+bq6bw4Z1l9pTYLNQwd/ecdtntrH5mcBJNWv8n6n9jzYxKwEuSDUjig5lPNYqpZpU=",
  "notSensitiveValue": "Anyone is free to see this text."
}
Enter fullscreen mode Exit fullscreen mode

Note that someSensitiveValue is still encrypted (observe the base64 encoding).
This is because

  • we're asking for a stream with consent-levels up to 2.
  • the data owner has given consent level 3, but we don't need it, didn't ask for it and so we don't get it. If we'd asked for level 3 this would also have been decrypted.

If we observe the consent levels in the decrypted stream:

strm egress demo-2 | jq -c .strmMeta.consentLevels
[3]
[3]
[2]
[2]
Enter fullscreen mode Exit fullscreen mode

we only see [3] and [2]. Events with consent levels lower than these
don't get exported to this decrypted stream.

Conclusions

We can send data into Stream Machine, and receive them privacy safe and of guaranteed quality.

The next post in this series will show how to get all these data into a Google Cloud bucket, and how to use those data in a Jupyter notebook.

Top comments (0)