DEV Community

Cover image for DEBS Grand Challenge: Real-Time Stream Processing with YoMo
C.C.
C.C.

Posted on

DEBS Grand Challenge: Real-Time Stream Processing with YoMo

Introduction

DEBS or the ACM International Conference on Distributed Event Based Systems aims to "provide a forum dedicated to the dissemination of original research, the discussion of practical insights, and the reporting of experiences relevant to event based computing that were previously scattered across several scientific and professional communities" (reference).

The DEBS 2014 Grand Challenge - Smart Grid is the 8th ACM International Conference on Distributed Event Based Systems, focused on two problems which are relevant to the industry: real-time load prediction and anomaly detection. The data for the challenge was collected from a number of smart-home installations deployed in Germany.

In traditional processing, data is often stored in the database at first, and then processed to obtain useful information at a secondary stage. The traditional architecture:

Alt Text

However, with stream processing, we're swiftly able to run real-time analytics on incoming data streams. By utilizing Yomo, an open-source serverless stream processing framework, we can address DEBS' challenge in a real-time fashion:

Alt Text

Firstly, we will take a look at the type of data we're dealing with. Next, we will introduce two queries that were originally proposed by ACM DEBS. Lastly, we install YoMo (an open-source framework for real-time stream processing) to implement both queries as described.

Data

From Jerzak and Ziekow (2014):

For the DEBS 2014 Grand Challenge we assume a hierarchical structure with a house, identified by a unique house id, being the topmost entity. Every house contains one or more households, identified by a unique household id (within a house). Every household contains one or more smart plugs, each identified by a unique plug id (within a household). Every smart plug contains exactly two sensors:
(1) load sensor measuring current load with Watt as unit (2) work sensor measuring total accumulated work since the start (or reset) of the sensor with kWh as unit.

The input stream is defined as follows:

  • id – a unique identifier of the measurement [32 bit unsigned int]
  • timestamp – timestamp of measurement [32 bit unsigned int]
  • value – the measurement [32 bit float]
  • property – type of the measurement: 0 for work or 1 for load [boolean]
  • plug_id – a unique identifier (within a household) of the smart plug [32 bit unsigned int]
  • household_id – a unique identifier of a household (within a house) where the plug is located [32 bit unsigned int]
  • house_id – a unique identifier of a house where the household with the plug is located [32 bit unsigned int]

The complete data file is available under this link. For demonstration purposes, we will generate mock data using this file. In real life, we should be dealing with direct sensor data.

Queries

  1. Load Prediction

Let's divide the whole period from t_{start} = 1377986401 to t_{end} = 1380578399 covered by the dataset into N equal slices of |s| seconds and call them s_0, s_1, s_2, etc.

The average load for slice s_{i + 2} is given by

L(s_{i + 2}) = ( avgLoad(s_i) + median({ avgLoad(s_j) }) ) / 2
Enter fullscreen mode Exit fullscreen mode

s_j is a nonempty set defined by s_(i + 2 – n * k), where k is the number of slices within a 24-hour period and n is a natural number between 1 and floor((i + 2) / k).

  1. Outliers

For this query, we will compute the percentage of plugs that have a median load during the last hour greater than the median load of all plugs.

YoMo – what is it and why do we use it?

YoMo is an open-source serverless streaming framework for building low-latency edge computing applications. Built atop QUIC transport protocol and functional reactive programming interface, it makes real-time data processing reliable, secure, and easy.

Getting Started

  1. Follow the instructions here to install YoMo. Assuming that $GOPATH has been set on your device, you should be able to see a directory with the name $GOPATH/src/github.com/yomorun/yomo.

  2. Run the following commands to create a new project. Don't forget to replace ${YOUR_GITHUB_USERNAME} with your actual GitHub username!

$ mkdir -p $GOPATH/src/github.com/${YOUR_GITHUB_USERNAME} && cd $_
$ yomo init debs-flow
Enter fullscreen mode Exit fullscreen mode

Now, cd into $GOPATH/src/github.com/${YOUR_GITHUB_USERNAME}. It should have the following structure.

```shell
.
└── debs-flow
    ├── app.go
    ├── go.mod
    ├── go.sum
    └── sl.so
```
Enter fullscreen mode Exit fullscreen mode

We will be focusing on the Handler function in app.go, which defines how we want the input stream to be processed.

  1. Git clone the yomo-source-example repository. For clarity, we will call it debs-source.

    $ git clone git@github.com:yomorun/yomo-source-example.git
    
  2. Create a file named workflow.yaml with the following content. Place it under debs-source.

    name: service
    host: localhost
    port: 9000
    flows:
    - name: debs 2014
        host: localhost
        port: 4242
    sinks:
    - name: mock db
        host: localhost
        port: 4243
    

Make sure the $GOPATH/src/github.com/${YOUR_GITHUB_USERNAME} directory contains the following files.

```shell
.
├── debs-flow
│   ├── app.go
│   ├── go.mod
│   ├── go.sum
│   └── sl.so
└── debs-source
    ├── go.mod
    ├── go.sum
    ├── main.go
    └── workflow.yaml
```
Enter fullscreen mode Exit fullscreen mode

Algorithm Implementation

By default, the Handler function in debs-flow/app.go should look as follows.

func Handler(rxstream rx.RxStream) rx.RxStream {
    stream := rxstream.
        Subscribe(0x10).
        OnObserve(decoder).
        Map(printer).
        Encode(0x11)

    return stream
}
Enter fullscreen mode Exit fullscreen mode
  • Subscribe(0x10): Subscribe to the input stream. 0x10 is the key. It is defined in debs-source/main.go.
  • OnObserve(decoder): Decode []byte to interface{}. Empty interfaces are often used by code that handles values of unknown type.
  • Map(printer): Print out the data.

For query #1, we need to define two functions in addition to what we discussed above. We will call them average and predict.

func Handler(rxstream rx.RxStream) rx.RxStream {
    stream := rxstream.
        Subscribe(0x10).
        OnObserve(decoder).
        Map(printer).
        BufferWithTime(ss * 1e3). // ss stands for slice size
        Map(average).
        Map(predict).
        Encode(0x11)

    return stream
}
Enter fullscreen mode Exit fullscreen mode
// compute the average load for each plug and save the values to db, which is a global variable in this example
func average(_ context.Context, i interface{}) (interface{}, error) {
    // convert interface{} to []interface{}
    lst, ok := i.([]interface{})
    if !ok {
        err := fmt.Sprintf("expected type '[]interface{}', got '%v' instead",
            reflect.TypeOf(i))
        fmt.Printf("[average] %v\n", err)
        return nil, fmt.Errorf(err)
    }

    // plug # -> value
    total := make(map[string]float32)
    count := make(map[string]float32)

    for _, elem := range lst {
        // convert interface{} to measurement
        x, ok := elem.(Measurement)
        if !ok {
            err := fmt.Sprintf("expected type 'measurement', got '%v' instead",
                reflect.TypeOf(elem))
            fmt.Printf("[average] %v\n", err)
            return nil, fmt.Errorf(err)
        }

        if x.Property { // load
            plug := x.toString()
            total[plug] += x.Value
            count[plug] += 1.0
        }
    }

    // save to db
    fmt.Println("*** average ***")
    for plug, v := range total {
        avg := v / count[plug]
        fmt.Printf("[s_%v] %v %v\n", idx, plug, avg)

        _, ok := db[plug]
        if !ok {
            db[plug] = make(map[uint32]float32)
        }
        db[plug][idx] = avg
    }
    fmt.Println("***************")
    return i, nil
}

// make predictions based on what we have in db
func predict(_ context.Context, i interface{}) (interface{}, error) {
    k := t / ss

    fmt.Println("*** predict ***")
    l := (idx + 2) / k
    if l == 0 {
        fmt.Println("not enough data")
    } else {
        // possible values for j
        lst := make([]uint32, l)
        for m := range lst {
            n := uint32(m + 1)
            j := idx + 2 - n*k
            lst[m] = j
        }

        for plug := range db {
            // average load for s_j
            data := make([]float32, l)
            for m, j := range lst {
                data[m] = db[plug][j]
            }
            pred := (db[plug][idx] + median(data)) / 2
            fmt.Printf("[s_%v] %v %v\n", idx+2, plug, pred)
        }
    }
    fmt.Println("***************")

    idx += 1 // slice #
    return 0.0, nil
}
Enter fullscreen mode Exit fullscreen mode

For query #2, we will define a function called outliers.

func Handler(rxstream rx.RxStream) rx.RxStream {
    stream := rxstream.
        Subscribe(0x10).
        OnObserve(decoder).
        Map(printer).
        BufferWithTime(ss * 1e3).
        Map(outliers).
        Encode(0x11)

    return stream
}
Enter fullscreen mode Exit fullscreen mode
// which plugs have a median load greater than the median load of all plugs?
func outliers(_ context.Context, i interface{}) (interface{}, error) {
    // convert interface{} to []interface{}
    lst, ok := i.([]interface{})
    if !ok {
        err := fmt.Sprintf("expected type '[]interface{}', got '%v' instead",
            reflect.TypeOf(i))
        fmt.Printf("[outliers] %v\n", err)
        return nil, fmt.Errorf(err)
    }

    all := make([]float32, 0, len(lst))
    indiv := make(map[string][]float32) // plug # -> values

    for _, elem := range lst {
        // convert interface{} to measurement
        x, ok := elem.(Measurement)
        if !ok {
            err := fmt.Sprintf("expected type 'measurement', got '%v' instead",
                reflect.TypeOf(elem))
            fmt.Printf("[outliers] %v\n", err)
            return nil, fmt.Errorf(err)
        }

        if x.Property { // load
            all = append(all, x.Value)

            plug := x.toString()
            indiv[plug] = append(indiv[plug], x.Value)
        }
    }

    v := median(all)
    fmt.Printf("all plugs: %v\n", v)

    fmt.Println("*** outliers ***")
    for plug, vs := range indiv {
        m := median(vs)
        if m > v {
            fmt.Printf("[w_%v] %v %v\n", idx, plug, m)
        }
    }
    fmt.Println("****************")

    idx += 1
    return 0.0, nil
}
Enter fullscreen mode Exit fullscreen mode

Now to run the code, we need to:

  1. cd into debs-flow and type

    $ yomo run app.go
    
  2. cd into debs-source and type

    $ yomo wf run workflow.yaml
    $ PORT=9000 go run main.go
    

You might want to try a different set of hyperparameters.

Results

For query #1, you should see something similar to the following.

...
[1620461050] 9.910085 0-1-2 load
[1620461050] 8.087268 0-1-2 work
[1620461050] 13.468374 3-1-2 load
[1620461050] 7.742124 3-1-2 work
[1620461051] 13.738256 0-1-2 load
[1620461051] 16.59261 0-1-2 work
[1620461051] 12.84997 3-1-2 load
[1620461051] 10.838872 3-1-2 work
*** average ***
[s_18] 0-1-2 11.824171
[s_18] 3-1-2 13.159172
***************
*** predict ***
[s_20] 0-1-2 10.375039
[s_20] 3-1-2 12.0406475
***************
...
Enter fullscreen mode Exit fullscreen mode

For query #2, something like this:

...
[1620461271] 6.921172 0-1-2 load
[1620461271] 1.8683584 0-1-2 work
[1620461271] 17.251171 3-1-2 load
[1620461271] 9.761936 3-1-2 work
[1620461272] 10.758014 0-1-2 load
[1620461272] 18.668419 0-1-2 work
[1620461272] 5.806175 3-1-2 load
[1620461272] 1.8562717 3-1-2 work
[1620461273] 0.11624338 0-1-2 load
[1620461273] 5.579194 0-1-2 work
[1620461273] 17.249205 3-1-2 load
[1620461273] 4.9580107 3-1-2 work
[1620461274] 8.087428 0-1-2 load
[1620461274] 7.49426 0-1-2 work
[1620461274] 4.6709924 3-1-2 load
[1620461274] 1.793222 3-1-2 work
[1620461275] 4.8114495 0-1-2 load
[1620461275] 1.9070174 0-1-2 work
[1620461275] 19.199306 3-1-2 load
[1620461275] 9.054778 3-1-2 work
all plugs: 7.5043
*** outliers ***
[s_5] 3-1-2 17.249205
****************
...
Enter fullscreen mode Exit fullscreen mode

About Author

Ivy Guo is a Computer Science student at the University of Washington. If you have any questions, please email Ivy at zhifeig@cs.washington.edu

Further Reading

  • Rohit Gupta, Rinku Shah, and Apurva Mhetre. 2014. In-Memory, High Speed Stream Processing. In Proceedings of the 8th ACM International Conference on Distributed Event-Based Systems (DEBS '14). Association for Computing Machinery, New York, NY, USA, 306–309. DOI: https://doi.org/10.1145/2611286.2611332.

  • Abhinav Sunderrajan, Heiko Aydt, and Alois Knoll. 2014. Real-Time Load Prediction and Outliers Detection using STORM. DEBS 2014 - Proceedings of the 8th ACM International Conference on Distributed Event-Based Systems. 10.1145/2611286.2611327. ResearchGate

  • ACM DEBS Grand Challenge 2014 implementation using Apache Flink: Github

  • DEBS '14: Proceedings of the 8th ACM International Conference on Distributed Event-Based Systems all research papers

References

  • Zbigniew Jerzak and Holger Ziekow. 2014. The DEBS 2014 grand challenge. In Proceedings of the 8th ACM International Conference on Distributed Event-Based Systems (DEBS '14). Association for Computing Machinery, New York, NY, USA, 266–269. DOI: https://doi.org/10.1145/2611286.2611333.

More about YoMo

https://github.com/yomorun/yomo

Top comments (1)

Collapse
 
fanweixiao profile image
C.C.

Before YoMo, we have to focus on the whole architecture rather than the algorithm itself. YoMo introduce serverless to stream processing, reduce complexity of system.