DEV Community

Aceld
Aceld

Posted on • Edited on

(Part 4)Golang Framework Hands-on - KisFlow Stream Computing Framework-Function Scheduling

#go

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications


4.1 Router

Now, KisFlow provides the capability to externally register Functions. First, we need to define some prototypes for the registered functions and the type of Router that manages these Function mappings.

Create kis-flow/kis/router.go and define the prototypes as follows:

kis-flow/kis/router.go

package kis

import "context"

// FaaS Function as a Service
type FaaS func(context.Context, Flow) error

// funcRouter
// key: Function Name
// value: Function callback for custom business logic
type funcRouter map[string]FaaS

// flowRouter
// key: Flow Name
// value: Flow
type flowRouter map[string]Flow
Enter fullscreen mode Exit fullscreen mode

FaaS: This is the prototype of the Function callback business function that developers register with KisFlow. It requires two parameters: Context mainly carries the business context, and Flow mainly carries the KisFlow context. Through Flow, we can obtain the current Function's configuration information, data information, as well as information related to other nodes' Functions on the Flow.

funcRouter: Manages the mapping between Function names and FaaS business callbacks. It's a private type and not exposed externally. It's worth noting that the key of funcRouter is the Function name because the Function ID is a generated random ID, and developers cannot predict or read it during route registration. Therefore, the business callback is mapped to the Function name.

flowRouter: Manages the mapping between Flow names and Flow instances. It's a private type and not exposed externally. flowRouter still maps to Flow names.

4.2 KisPool

KisFlow provides a class called KisPool to manage all global mapping relationships. KisPool contains a Router and provides management capabilities for it.

4.2.1 Definition of KisPool

Create the kis-flow/kis/pool.go file to create the kis_pool module.

kis-flow/kis/pool.go

package kis

import (
    "context"
    "errors"
    "fmt"
    "kis-flow/log"
    "sync"
)

var _poolOnce sync.Once

// kisPool is used to manage all Function and Flow configuration pools
type kisPool struct {
    fnRouter funcRouter   // All Function management routes
    fnLock   sync.RWMutex // Lock for fnRouter

    flowRouter flowRouter   // All flow objects
    flowLock   sync.RWMutex // Lock for flowRouter
}

// Singleton
var _pool *kisPool

// Pool Singleton constructor
func Pool() *kisPool {
    _poolOnce.Do(func() {
        // Create kisPool object
        _pool = new(kisPool)

        // Initialize fnRouter
        _pool.fnRouter = make(funcRouter)

        // Initialize flowRouter
        _pool.flowRouter = make(flowRouter)
    })

    return _pool
}
Enter fullscreen mode Exit fullscreen mode

kis_pool adopts the singleton pattern, and the Pool() method retrieves the current singleton. The fnRouter and flowRouter are initialized only once during the lifecycle, controlled by sync.Once.

4.2.2 Registering and Getting Flow

KisPool can provide interfaces to add and retrieve Flow information as follows:

kis-flow/kis/pool.go

func (pool *kisPool) AddFlow(name string, flow Flow) {
    pool.flowLock.Lock()
    defer pool.flowLock.Unlock()

    if _, ok := pool.flowRouter[name]; !ok {
        pool.flowRouter[name] = flow
    } else {
        errString := fmt.Sprintf("Pool AddFlow Repeat FlowName=%s\n", name)
        panic(errString)
    }

    log.Logger().InfoF("Add FlowRouter FlowName=%s\n", name)
}

func (pool *kisPool) GetFlow(name string) Flow {
    pool.flowLock.RLock()
    defer pool.flowLock.RUnlock()

    if flow, ok := pool.flowRouter[name]; ok {
        return flow
    } else {
        return nil
    }
}
Enter fullscreen mode Exit fullscreen mode

AddFlow performs duplicate checking based on the same FlowName; the same Flow cannot be registered multiple times.

4.2.3 Registering and Scheduling Functions

KisPool provides methods to register Function callbacks and schedule Functions as follows.

kis-flow/kis/pool.go

// FaaS registers Function computing business logic, indexed and registered by Function Name
func (pool *kisPool) FaaS(fnName string, f FaaS) {
    pool.fnLock.Lock()
    defer pool.fnLock.Unlock()

    if _, ok := pool.fnRouter[fnName]; !ok {
        pool.fnRouter[fnName] = f
    } else {
        errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
        panic(errString)
    }

    log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
}

// CallFunction schedules a Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

    if f, ok := pool.fnRouter[fnName]; ok {
        return f(ctx, flow)
    }

    log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)

    return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}
Enter fullscreen mode Exit fullscreen mode

In CallFunction(), the Flow parameter is required as the context environment for data stream scheduling. Developers can use Flow in custom FaaS to obtain some Function information. Therefore, we need to add some interfaces to Flow to retrieve configuration information if needed. These interfaces are added as follows:

kis-flow/kis/flow.go

type Flow interface {
    // Run schedules the Flow, sequentially schedules Functions in the Flow and executes them
    Run(ctx context.Context) error
    // Link connects the Functions in the Flow according to the configuration in the configuration file
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow submits Flow data to the upcoming Function layer
    CommitRow(row interface{}) error
    // Input gets the input source data for the currently executing Function in the Flow
    Input() common.KisRowArr

    // ++++++++++++++++++++++++++++++++++
    // GetName gets the name of the Flow
    GetName() string
    // GetThisFunction gets the currently executing Function
    GetThisFunction() Function
    // GetThisFuncConf gets the configuration of the currently executing Function
    GetThisFuncConf() *config.KisFuncConfig
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/flow/kis_flow.go

func (flow *KisFlow) GetName() string {
    return flow.Name
}

func (flow *KisFlow) GetThisFunction() kis.Function {
    return flow.ThisFunction
}

func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig {
    return flow.ThisFunction.GetConfig()
}
Enter fullscreen mode Exit fullscreen mode

4.3 KisFunction Refers to KisPool for Scheduling

Now, we can use the Pool for scheduling within the Call() method of KisFunctionX. Let's modify the Call() method for each Function accordingly.

kis-flow/function/kis_function_c.go

package function

import (
    "context"
    "kis-flow/kis"
    "kis-flow/log"
)

type KisFunctionC struct {
    BaseFunction
}

func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
    log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow)

    // Route to the specific executing computation Function through KisPool
    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
        log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
        return err
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/function/kis_function_e.go

package function

import (
    "context"
    "kis-flow/kis"
    "kis-flow/log"
)

type KisFunctionE struct {
    BaseFunction
}

func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
    log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow)

    // Route to the specific executing computation Function through KisPool
    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
        log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
        return err
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/function/kis_function_l.go

package function

import (
    "context"
    "kis-flow/kis"
    "kis-flow/log"
)

type KisFunctionL struct {
    BaseFunction
}

func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
    log.Logger().InfoF("KisFunctionL, flow = %+v\n", flow)

    // Route to the specific executing computation Function through KisPool
    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
        log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
        return err
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/function/kis_function_s.go

package function

import (
    "context"
    "kis-flow/kis"
    "kis-flow/log"
)

type KisFunctionS struct {
    BaseFunction
}

func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
    log.Logger().InfoF("KisFunctionS, flow = %+v\n", flow)

    // Route to the specific executing computation Function through KisPool
    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
        log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
        return err
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/function/kis_function_v.go

package function

import (
    "context"
    "kis-flow/kis"
    "kis-flow/log"
)

type KisFunctionV struct {
    BaseFunction
}

func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
    log.Logger().InfoF("KisFunctionV, flow = %+v\n", flow)

    // Route to the specific executing computation Function through KisPool
    if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
        log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
        return err
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

4.4 KisPool Unit Testing

Next, let's perform unit testing for KisPool.

4.4.1 Custom FaaS

kis-flow/test/kis_pool_test.go

package test

import (
    "context"
    "fmt"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/flow"
    "kis-flow/kis"
    "testing"
)

func funcName1Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName1Handler ----")

    for index, row := range flow.Input() {
        // Print data
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        // Calculate result data
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // Commit result data
        _ = flow.CommitRow(resultStr)
    }

    return nil
}

func funcName2Handler(ctx context.Context, flow kis.Flow) error {

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return nil
}

Enter fullscreen mode Exit fullscreen mode

4.4.2 Register FaaS and Start Flow

kis-flow/test/kis_pool_test.go

func TestNewKisPool(t *testing.T) {

    ctx := context.Background()

    // 0. Register Functions
    kis.Pool().FaaS("funcName1", funcName1Handler)
    kis.Pool().FaaS("funcName2", funcName2Handler)

    // 1. Create 2 KisFunction configuration instances
    source1 := config.KisSource{
        Name: "Public account data from TikTok Mall",
        Must: []string{"order_id", "user_id"},
    }

    source2 := config.KisSource{
        Name: "User order error rate",
        Must: []string{"order_id", "user_id"},
    }

    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
    if myFuncConfig1 == nil {
        panic("myFuncConfig1 is nil")
    }

    myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil)
    if myFuncConfig2 == nil {
        panic("myFuncConfig2 is nil")
    }

    // 2. Create a KisFlow configuration instance
    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

    // 3. Create a KisFlow object
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // 4. Link Functions to Flow
    if err := flow1.Link(myFuncConfig1, nil); err != nil {
        panic(err)
    }
    if err := flow1.Link(myFuncConfig2, nil); err != nil {
        panic(err)
    }

    // 5. Commit original data
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 6. Run flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Execute the following command in the kis-flow/test/ directory:

go test -test.v -test.paniconexit0 -test.run TestNewKisPool
Enter fullscreen mode Exit fullscreen mode

The result is as follows:

=== RUN   TestNewKisPool
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionC, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c0190 ThisFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionE, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c01e0 ThisFunctionId:func-9cd2ab870b384794b312d2be10bb06fa PrevFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}

In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 2
--- PASS: TestNewKisPool (0.00s)
PASS
ok      kis-flow/test   0.520s
Enter fullscreen mode Exit fullscreen mode

After detailed logging verification, the result meets our expectations.

Now that the business capabilities of Functions have been exposed to developers, let's continue to enhance the capabilities of KisFlow.

4.5 [V0.3] Source Code

https://github.com/aceld/kis-flow/releases/tag/v0.3


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications

Top comments (0)