DEV Community

Aceld
Aceld

Posted on • Edited on

(Part 2.2)Golang Framework Hands-on - KisFlow Stream Computing Framework - Project Construction / Basic Modules

#go

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection

Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications


First, we need to define the core structure of KisFlow, the KisFlow struct. Based on the design philosophy described above, we understand that KisFlow represents the structure of an entire data computing stream. In this structure, each piece of data in a flow is processed sequentially by the functions attached to that flow.

2.2.1 KisFunction Family

KisFunction should be a chain of calls, so the basic form of the struct should be a linked list, where after the execution of one function, it can automatically schedule to the next function node. In KisFlow, there are various types of functions such as save, load, calculate, extend(sink), and verify. Therefore, we adopt the template classes for these five types of functions, making it more flexible and facilitating the isolation and modification of features for different types of functions in the future.

The overall class diagram design for KisFunction is as follows:

Image

2.2.2 Definition of Abstraction Layer KisFunction

Create a new directory function in kis-flow to store the function code.
First, write the abstract interface in the kis/ directory.

kis-flow/kis/function.go

package kis

import (
    "context"
    "kis-flow/config"
)

// Function is the basic calculation module for streaming computing.
// KisFunction is a fundamental logic unit for streaming computation, 
// and any number of KisFunctions can be combined into a KisFlow.
type Function interface {
    // Call executes the streaming computation logic.
    Call(ctx context.Context, flow Flow) error

    // SetConfig configures the current Function instance.
    SetConfig(s *config.KisFuncConfig) error
    // GetConfig gets the configuration policy of the current Function instance.
    GetConfig() *config.KisFuncConfig

    // SetFlow sets the Flow instance that the current Function instance depends on.
    SetFlow(f Flow) error
    // GetFlow gets the Flow instance that the current Function instance depends on.
    GetFlow() Flow

    // CreateId generates a random instance KisID for the current Function instance.
    CreateId()
    // GetId gets the FID of the current Function.
    GetId() string
    // GetPrevId gets the FID of the previous Function node on the current Function.
    GetPrevId() string
    // GetNextId gets the FID of the next Function node on the current Function.
    GetNextId() string

    // Next returns the next layer of calculation flow Function. 
    // If the current layer is the last layer, it returns nil.
    Next() Function
    // Prev returns the previous layer of calculation flow Function. 
    // If the current layer is the last layer, it returns nil.
    Prev() Function
    // SetN sets the next Function instance.
    SetN(f Function)
    // SetP sets the previous Function instance.
    SetP(f Function)
}
Enter fullscreen mode Exit fullscreen mode

2.2.3 KisId: Random Unique Instance ID

The new concept of KisId has been introduced. KisID serves as the instance ID of Function, used within KisFlow to distinguish different instance objects. The difference between KisId and Fid in Function Config is that Fid describes the ID of a type of Function strategy, while KisId is the ID of the instantiated Function object in KisFlow, which is randomly generated and unique.

Create a kis-flow/id/ directory and create a kis_id.go file to implement the algorithm for generating kis_id.

kis-flow/id/kis_id.go

package id

import (
    "github.com/google/uuid"
    "kis-flow/common"
    "strings"
)

// KisID generates a random instance ID.
// The format is "prefix1-[prefix2-][prefix3-]ID"
// Examples: 
// flow-1234567890
// func-1234567890
// conn-1234567890
// func-1-1234567890
func KisID(prefix ...string) (kisId string) {

    idStr := strings.Replace(uuid.New().String(), "-", "", -1)
    kisId = formatKisID(idStr, prefix...)

    return
}

func formatKisID(idStr string, prefix ...string) string {
    var kisId string

    for _, fix := range prefix {
        kisId += fix
        kisId += common.KisIdJoinChar
    }

    kisId += idStr

    return kisId
}
Enter fullscreen mode Exit fullscreen mode

The kisId module provides the KisID() method, which depends on the third-party distributed ID generation library github.com/google/uuid. The generated random ID is a string, and the caller can provide multiple prefixes, which are concatenated by the - symbol to get the random string ID, e.g., func-1234567890.

For the prefixes of KisId, some string enumerations are provided as follows:

kis-flow/common/const.go

// KisIdType is used to generate the KisId string prefix
const (
    KisIdTypeFlow       = "flow"
    KisIdTypeConnnector = "conn"
    KisIdTypeFunction   = "func"
    KisIdTypeGlobal     = "global"
    KisIdJoinChar       = "-"
)
Enter fullscreen mode Exit fullscreen mode

2.2.4 BaseFunction: Basic Parent Class

According to the design, we need to provide a BaseFunction as a subclass of Function to implement some basic functional interfaces. The Call() method is left empty for concrete types of KisFunctionX to override the implementation. Let's define the BaseFunction structure below.

A. Structure Definition

kis-flow/function/kis_base_function.go

package function

import (
    "context"
    "errors"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/id"
    "kis-flow/kis"
)

type BaseFunction struct {
    // Id is the instance ID of KisFunction, used within KisFlow to distinguish different instance objects
    Id     string
    Config *config.KisFuncConfig

    // flow
    Flow kis.Flow // Contextual KisFlow

    // link
    N kis.Function // Next streaming computation Function
    P kis.Function // Previous streaming computation Function
}
Enter fullscreen mode Exit fullscreen mode

B. Method Implementation

kis-flow/function/kis_base_function.go

// Call
// BaseFunction is an empty implementation, aiming to allow other concrete types of KisFunctions, such as KisFunction_V, to inherit BaseFunction and override this method.
func (base *BaseFunction) Call(ctx context.Context, flow kis.Flow) error { return nil }

func (base *BaseFunction) Next() kis.Function {
    return base.N
}

func (base *BaseFunction) Prev() kis.Function {
    return base.P
}

func (base *BaseFunction) SetN(f kis.Function) {
    base.N = f
}

func (base *BaseFunction) SetP(f kis.Function) {
    base.P = f
}

func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error {
    if s == nil {
        return errors.New("KisFuncConfig is nil")
    }

    base.Config = s

    return nil
}

func (base *BaseFunction) GetId() string {
    return base.Id
}

func (base *BaseFunction) GetPrevId() string {
    if base.P == nil {
        // Function is the first node
        return common.FunctionIdFirstVirtual
    }
    return base.P.GetId()
}

func (base *BaseFunction) GetNextId() string {
    if base.N == nil {
        // Function is the last node
        return common.FunctionIdLastVirtual
    }
    return base.N.GetId()
}

func (base *BaseFunction) GetConfig() *config.KisFuncConfig {
    return base.Config
}

func (base *BaseFunction) SetFlow(f kis.Flow) error {
    if f == nil {
        return errors.New("KisFlow is nil")
    }
    base.Flow = f
    return nil
}

func (base *BaseFunction) GetFlow() kis.Flow {
    return base.Flow
}

func (base *BaseFunction) CreateId() {
    base.Id = id.KisID(common.KisIdTypeFunction)
}
Enter fullscreen mode Exit fullscreen mode

Note the implementation of GetPrevId() and GetNextId() methods. If the current Function is the first or last node in the bidirectional linked list, their previous or next node does not exist, so the ID also does not exist. To prevent situations where the ID cannot be obtained during use, we provide two virtual FIDs for special case boundary processing, defined in const.go.

kis-flow/common/const.go

const (
    // FunctionIdFirstVirtual is the previous virtual Function ID for the first node
    FunctionIdFirstVirtual = "FunctionIdFirstVirtual"
    // FunctionIdLastVirtual is the next virtual Function ID for the last node
    FunctionIdLastVirtual = "FunctionIdLastVirtual"
)
Enter fullscreen mode Exit fullscreen mode

2.2.5 Definition of KisFunction V/S/L/C/E Mode Classes

Next, we'll implement subclasses of KisFunction for the five different modes: V, S, L, C, and E. We'll use separate files to implement each.

A. KisFunctionV

kis-flow/function/kis_function_v.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionV struct {
    BaseFunction
}

func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
    fmt.Printf("KisFunctionV, flow = %+v\n", flow)

    // TODO: Invoke the specific function execution method

    return nil
}
Enter fullscreen mode Exit fullscreen mode

B. KisFunctionS

kis-flow/function/kis_function_s.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionS struct {
    BaseFunction
}

func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
    fmt.Printf("KisFunctionS, flow = %+v\n", flow)

    // TODO: Invoke the specific function execution method

    return nil
}
Enter fullscreen mode Exit fullscreen mode

C. KisFunctionL

kis-flow/function/kis_function_l.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionL struct {
    BaseFunction
}

func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
    fmt.Printf("KisFunctionL, flow = %+v\n", flow)

    // TODO: Invoke the specific function execution method

    return nil
}
Enter fullscreen mode Exit fullscreen mode

D. KisFunctionC

kis-flow/function/kis_function_c.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionC struct {
    BaseFunction
}

func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
    fmt.Printf("KisFunction_C, flow = %+v\n", flow)

    // TODO: Invoke the specific function execution method

    return nil
}
Enter fullscreen mode Exit fullscreen mode

E. KisFunctionE

kis-flow/function/kis_function_e.go

package function

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

type KisFunctionE struct {
    BaseFunction
}

func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
    fmt.Printf("KisFunctionE, flow = %+v\n", flow)

    // TODO: Invoke the specific function execution method

    return nil
}
Enter fullscreen mode Exit fullscreen mode

2.2.6 Creating KisFunction Instances

Here, we provide a method to create specific Function modes using the simple factory method pattern.

kis-flow/function/kis_base_function.go

func (base *BaseFunction) CreateId() {
    base.Id = id.KisID(common.KisIdTypeFunction)
}

// NewKisFunction creates an NsFunction
// flow: the current associated flow instance
// s : the configuration strategy for the current function
func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
    var f kis.Function

    // Factory produces generic objects
    switch common.KisMode(config.FMode) {
    case common.V:
        f = new(KisFunctionV)
        break
    case common.S:
        f = new(KisFunctionS)
    case common.L:
        f = new(KisFunctionL)
    case common.C:
        f = new(KisFunctionC)
    case common.E:
        f = new(KisFunctionE)
    default:
        //LOG ERROR
        return nil
    }

    // Generate a random instance unique ID
    f.CreateId()

    // Set basic information properties
    if err := f.SetConfig(config); err != nil {
        panic(err)
    }

    if err := f.SetFlow(flow); err != nil {
        panic(err)
    }

    return f
}
Enter fullscreen mode Exit fullscreen mode

Note that the NewKisFunction() method returns an abstract interface Function.

Also, note that currently, we have not implemented the Flow object yet. However, creating a KisFunciton requires passing a Flow object. For now, we can temporarily create a constructor for a Flow object, and we will refine this part of the code in the Flow section later on.
Create a flow.go file in kis-flow/kis/.

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/config"
)

type Flow interface {
   // TODO
}
Enter fullscreen mode Exit fullscreen mode

Create a kis_flow.go file under kis-flow/flow/ with the following:

kis-flow/flow/kis_flow.go

package flow

import "kis-flow/config"

// KisFlow is used to traverse the entire streaming computation context
type KisFlow struct {
    Id   string
    Name string
    // TODO
}

// TODO for test
// NewKisFlow creates a KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
    flow := new(KisFlow)

    // Basic information
    flow.Id = id.KisID(common.KisIdTypeFlow)
    flow.Name = conf.FlowName

    return flow
}
Enter fullscreen mode Exit fullscreen mode

2.2.7 Unit Testing for KisFunction Creation Instances

Now, let's create a simple unit test for the KisFunction instance creation. Create a kis_function_test.go file in kis-flow/test/.

kis-flow/test/kis_function_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
    "kis-flow/flow"
    "kis-flow/function"
    "testing"
)

func TestNewKisFunction(t *testing.T) {
    ctx := context.Background()

    // 1. Create a KisFunction configuration instance
    source := config.KisSource{
        Name: "Public Account TikTok Store User Order Data",
        Must: []string{"order_id", "user_id"},
    }

    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source, nil)
    if myFuncConfig1 == nil {
        panic("myFuncConfig1 is nil")
    }

    // 2. Create a KisFlow configuration instance
    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

    // 3. Create a KisFlow object
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // 4. Create a KisFunction object
    func1 := function.NewKisFunction(flow1, myFuncConfig1)

    if err := func1.Call(ctx, flow1); err != nil {
        t.Errorf("func1.Call() error = %v", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

The process is simple and divided into four small steps:

  • Create a KisFunction configuration instance
  • Create a KisFlow configuration instance
  • Create a KisFlow object
  • Create a KisFunction object Navigate to the kis-flow/test/ directory and execute:
go test -test.v -test.paniconexit0 -test.run TestNewKisFunction
Enter fullscreen mode Exit fullscreen mode

The result is as follows:

=== RUN   TestNewKisFunction
KisFunctionC, flow = &{Id:flow-866de5abc8134fc9bb8e5248a3ce7137 Name:flowName1 Conf:0xc00014e780 Funcs:map[] FlowHead:<nil> FlowTail:<nil> flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:<nil> ThisFunctionId: PrevFunctionId: funcParams:map[] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

--- PASS: TestNewKisFunction (0.00s)
PASS
ok      kis-flow/test   1.005s

Enter fullscreen mode Exit fullscreen mode

We have successfully called the Call() method of the specific KisFunction_C instance.

2.5 [V0.1] Source Code

https://github.com/aceld/kis-flow/releases/tag/v0.1

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications

Top comments (0)