Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
First, we need to define the core structure of KisFlow, the KisFlow struct. Based on the design philosophy described above, we understand that KisFlow represents the structure of an entire data computing stream. In this structure, each piece of data in a flow is processed sequentially by the functions attached to that flow.
2.2.1 KisFunction Family
KisFunction should be a chain of calls, so the basic form of the struct should be a linked list, where after the execution of one function, it can automatically schedule to the next function node. In KisFlow, there are various types of functions such as save
, load
, calculate
, extend(sink)
, and verify
. Therefore, we adopt the template classes for these five types of functions, making it more flexible and facilitating the isolation and modification of features for different types of functions in the future.
The overall class diagram design for KisFunction is as follows:
2.2.2 Definition of Abstraction Layer KisFunction
Create a new directory function
in kis-flow
to store the function code.
First, write the abstract interface in the kis/
directory.
kis-flow/kis/function.go
package kis
import (
"context"
"kis-flow/config"
)
// Function is the basic calculation module for streaming computing.
// KisFunction is a fundamental logic unit for streaming computation,
// and any number of KisFunctions can be combined into a KisFlow.
type Function interface {
// Call executes the streaming computation logic.
Call(ctx context.Context, flow Flow) error
// SetConfig configures the current Function instance.
SetConfig(s *config.KisFuncConfig) error
// GetConfig gets the configuration policy of the current Function instance.
GetConfig() *config.KisFuncConfig
// SetFlow sets the Flow instance that the current Function instance depends on.
SetFlow(f Flow) error
// GetFlow gets the Flow instance that the current Function instance depends on.
GetFlow() Flow
// CreateId generates a random instance KisID for the current Function instance.
CreateId()
// GetId gets the FID of the current Function.
GetId() string
// GetPrevId gets the FID of the previous Function node on the current Function.
GetPrevId() string
// GetNextId gets the FID of the next Function node on the current Function.
GetNextId() string
// Next returns the next layer of calculation flow Function.
// If the current layer is the last layer, it returns nil.
Next() Function
// Prev returns the previous layer of calculation flow Function.
// If the current layer is the last layer, it returns nil.
Prev() Function
// SetN sets the next Function instance.
SetN(f Function)
// SetP sets the previous Function instance.
SetP(f Function)
}
2.2.3 KisId: Random Unique Instance ID
The new concept of KisId
has been introduced. KisID serves as the instance ID of Function
, used within KisFlow to distinguish different instance objects. The difference between KisId and Fid in Function Config is that Fid describes the ID of a type of Function strategy, while KisId is the ID of the instantiated Function object in KisFlow, which is randomly generated and unique.
Create a kis-flow/id/
directory and create a kis_id.go
file to implement the algorithm for generating kis_id.
kis-flow/id/kis_id.go
package id
import (
"github.com/google/uuid"
"kis-flow/common"
"strings"
)
// KisID generates a random instance ID.
// The format is "prefix1-[prefix2-][prefix3-]ID"
// Examples:
// flow-1234567890
// func-1234567890
// conn-1234567890
// func-1-1234567890
func KisID(prefix ...string) (kisId string) {
idStr := strings.Replace(uuid.New().String(), "-", "", -1)
kisId = formatKisID(idStr, prefix...)
return
}
func formatKisID(idStr string, prefix ...string) string {
var kisId string
for _, fix := range prefix {
kisId += fix
kisId += common.KisIdJoinChar
}
kisId += idStr
return kisId
}
The kisId
module provides the KisID()
method, which depends on the third-party distributed ID generation library github.com/google/uuid
. The generated random ID is a string, and the caller can provide multiple prefixes, which are concatenated by the - symbol to get the random string ID, e.g., func-1234567890
.
For the prefixes of KisId
, some string enumerations are provided as follows:
kis-flow/common/const.go
// KisIdType is used to generate the KisId string prefix
const (
KisIdTypeFlow = "flow"
KisIdTypeConnnector = "conn"
KisIdTypeFunction = "func"
KisIdTypeGlobal = "global"
KisIdJoinChar = "-"
)
2.2.4 BaseFunction: Basic Parent Class
According to the design, we need to provide a BaseFunction as a subclass of Function to implement some basic functional interfaces. The Call()
method is left empty for concrete types of KisFunctionX
to override the implementation. Let's define the BaseFunction
structure below.
A. Structure Definition
kis-flow/function/kis_base_function.go
package function
import (
"context"
"errors"
"kis-flow/common"
"kis-flow/config"
"kis-flow/id"
"kis-flow/kis"
)
type BaseFunction struct {
// Id is the instance ID of KisFunction, used within KisFlow to distinguish different instance objects
Id string
Config *config.KisFuncConfig
// flow
Flow kis.Flow // Contextual KisFlow
// link
N kis.Function // Next streaming computation Function
P kis.Function // Previous streaming computation Function
}
B. Method Implementation
kis-flow/function/kis_base_function.go
// Call
// BaseFunction is an empty implementation, aiming to allow other concrete types of KisFunctions, such as KisFunction_V, to inherit BaseFunction and override this method.
func (base *BaseFunction) Call(ctx context.Context, flow kis.Flow) error { return nil }
func (base *BaseFunction) Next() kis.Function {
return base.N
}
func (base *BaseFunction) Prev() kis.Function {
return base.P
}
func (base *BaseFunction) SetN(f kis.Function) {
base.N = f
}
func (base *BaseFunction) SetP(f kis.Function) {
base.P = f
}
func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error {
if s == nil {
return errors.New("KisFuncConfig is nil")
}
base.Config = s
return nil
}
func (base *BaseFunction) GetId() string {
return base.Id
}
func (base *BaseFunction) GetPrevId() string {
if base.P == nil {
// Function is the first node
return common.FunctionIdFirstVirtual
}
return base.P.GetId()
}
func (base *BaseFunction) GetNextId() string {
if base.N == nil {
// Function is the last node
return common.FunctionIdLastVirtual
}
return base.N.GetId()
}
func (base *BaseFunction) GetConfig() *config.KisFuncConfig {
return base.Config
}
func (base *BaseFunction) SetFlow(f kis.Flow) error {
if f == nil {
return errors.New("KisFlow is nil")
}
base.Flow = f
return nil
}
func (base *BaseFunction) GetFlow() kis.Flow {
return base.Flow
}
func (base *BaseFunction) CreateId() {
base.Id = id.KisID(common.KisIdTypeFunction)
}
Note the implementation of GetPrevId()
and GetNextId()
methods. If the current Function is the first or last node in the bidirectional linked list, their previous or next node does not exist, so the ID
also does not exist. To prevent situations where the ID
cannot be obtained during use, we provide two virtual FIDs
for special case boundary processing, defined in const.go
.
kis-flow/common/const.go
const (
// FunctionIdFirstVirtual is the previous virtual Function ID for the first node
FunctionIdFirstVirtual = "FunctionIdFirstVirtual"
// FunctionIdLastVirtual is the next virtual Function ID for the last node
FunctionIdLastVirtual = "FunctionIdLastVirtual"
)
2.2.5 Definition of KisFunction V/S/L/C/E Mode Classes
Next, we'll implement subclasses of KisFunction for the five different modes: V, S, L, C, and E. We'll use separate files to implement each.
A. KisFunctionV
kis-flow/function/kis_function_v.go
package function
import (
"context"
"fmt"
"kis-flow/kis"
)
type KisFunctionV struct {
BaseFunction
}
func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionV, flow = %+v\n", flow)
// TODO: Invoke the specific function execution method
return nil
}
B. KisFunctionS
kis-flow/function/kis_function_s.go
package function
import (
"context"
"fmt"
"kis-flow/kis"
)
type KisFunctionS struct {
BaseFunction
}
func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionS, flow = %+v\n", flow)
// TODO: Invoke the specific function execution method
return nil
}
C. KisFunctionL
kis-flow/function/kis_function_l.go
package function
import (
"context"
"fmt"
"kis-flow/kis"
)
type KisFunctionL struct {
BaseFunction
}
func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionL, flow = %+v\n", flow)
// TODO: Invoke the specific function execution method
return nil
}
D. KisFunctionC
kis-flow/function/kis_function_c.go
package function
import (
"context"
"fmt"
"kis-flow/kis"
)
type KisFunctionC struct {
BaseFunction
}
func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunction_C, flow = %+v\n", flow)
// TODO: Invoke the specific function execution method
return nil
}
E. KisFunctionE
kis-flow/function/kis_function_e.go
package function
import (
"context"
"fmt"
"kis-flow/kis"
)
type KisFunctionE struct {
BaseFunction
}
func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
fmt.Printf("KisFunctionE, flow = %+v\n", flow)
// TODO: Invoke the specific function execution method
return nil
}
2.2.6 Creating KisFunction Instances
Here, we provide a method to create specific Function
modes using the simple factory method pattern.
kis-flow/function/kis_base_function.go
func (base *BaseFunction) CreateId() {
base.Id = id.KisID(common.KisIdTypeFunction)
}
// NewKisFunction creates an NsFunction
// flow: the current associated flow instance
// s : the configuration strategy for the current function
func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
var f kis.Function
// Factory produces generic objects
switch common.KisMode(config.FMode) {
case common.V:
f = new(KisFunctionV)
break
case common.S:
f = new(KisFunctionS)
case common.L:
f = new(KisFunctionL)
case common.C:
f = new(KisFunctionC)
case common.E:
f = new(KisFunctionE)
default:
//LOG ERROR
return nil
}
// Generate a random instance unique ID
f.CreateId()
// Set basic information properties
if err := f.SetConfig(config); err != nil {
panic(err)
}
if err := f.SetFlow(flow); err != nil {
panic(err)
}
return f
}
Note that the NewKisFunction()
method returns an abstract interface Function.
Also, note that currently, we have not implemented the Flow object yet. However, creating a KisFunciton requires passing a Flow object. For now, we can temporarily create a constructor for a Flow object, and we will refine this part of the code in the Flow section later on.
Create a flow.go
file in kis-flow/kis/
.
kis-flow/kis/flow.go
package kis
import (
"context"
"kis-flow/config"
)
type Flow interface {
// TODO
}
Create a kis_flow.go file under kis-flow/flow/
with the following:
kis-flow/flow/kis_flow.go
package flow
import "kis-flow/config"
// KisFlow is used to traverse the entire streaming computation context
type KisFlow struct {
Id string
Name string
// TODO
}
// TODO for test
// NewKisFlow creates a KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
flow := new(KisFlow)
// Basic information
flow.Id = id.KisID(common.KisIdTypeFlow)
flow.Name = conf.FlowName
return flow
}
2.2.7 Unit Testing for KisFunction Creation Instances
Now, let's create a simple unit test for the KisFunction instance creation. Create a kis_function_test.go file in kis-flow/test/
.
kis-flow/test/kis_function_test.go
package test
import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/flow"
"kis-flow/function"
"testing"
)
func TestNewKisFunction(t *testing.T) {
ctx := context.Background()
// 1. Create a KisFunction configuration instance
source := config.KisSource{
Name: "Public Account TikTok Store User Order Data",
Must: []string{"order_id", "user_id"},
}
myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source, nil)
if myFuncConfig1 == nil {
panic("myFuncConfig1 is nil")
}
// 2. Create a KisFlow configuration instance
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 3. Create a KisFlow object
flow1 := flow.NewKisFlow(myFlowConfig1)
// 4. Create a KisFunction object
func1 := function.NewKisFunction(flow1, myFuncConfig1)
if err := func1.Call(ctx, flow1); err != nil {
t.Errorf("func1.Call() error = %v", err)
}
}
The process is simple and divided into four small steps:
- Create a KisFunction configuration instance
- Create a KisFlow configuration instance
- Create a KisFlow object
- Create a KisFunction object
Navigate to the
kis-flow/test/
directory and execute:
go test -test.v -test.paniconexit0 -test.run TestNewKisFunction
The result is as follows:
=== RUN TestNewKisFunction
KisFunctionC, flow = &{Id:flow-866de5abc8134fc9bb8e5248a3ce7137 Name:flowName1 Conf:0xc00014e780 Funcs:map[] FlowHead:<nil> FlowTail:<nil> flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:<nil> ThisFunctionId: PrevFunctionId: funcParams:map[] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
--- PASS: TestNewKisFunction (0.00s)
PASS
ok kis-flow/test 1.005s
We have successfully called the Call()
method of the specific KisFunction_C
instance.
2.5 [V0.1] Source Code
https://github.com/aceld/kis-flow/releases/tag/v0.1
Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (0)