Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
4.1 Router
Now, KisFlow provides the capability to externally register Functions. First, we need to define some prototypes for the registered functions and the type of Router that manages these Function mappings.
Create kis-flow/kis/router.go
and define the prototypes as follows:
kis-flow/kis/router.go
package kis
import "context"
// FaaS Function as a Service
type FaaS func(context.Context, Flow) error
// funcRouter
// key: Function Name
// value: Function callback for custom business logic
type funcRouter map[string]FaaS
// flowRouter
// key: Flow Name
// value: Flow
type flowRouter map[string]Flow
FaaS
: This is the prototype of the Function callback business function that developers register with KisFlow. It requires two parameters: Context
mainly carries the business context, and Flow
mainly carries the KisFlow context. Through Flow
, we can obtain the current Function's configuration information, data information, as well as information related to other nodes' Functions on the Flow.
funcRouter
: Manages the mapping between Function names and FaaS
business callbacks. It's a private type and not exposed externally. It's worth noting that the key of funcRouter
is the Function
name because the Function
ID is a generated random ID, and developers cannot predict or read it during route registration. Therefore, the business callback is mapped to the Function name.
flowRouter
: Manages the mapping between Flow names and Flow instances. It's a private type and not exposed externally. flowRouter still maps to Flow names.
4.2 KisPool
KisFlow provides a class called KisPool
to manage all global mapping relationships. KisPool
contains a Router and provides management capabilities for it.
4.2.1 Definition of KisPool
Create the kis-flow/kis/pool.go
file to create the kis_pool module.
kis-flow/kis/pool.go
package kis
import (
"context"
"errors"
"fmt"
"kis-flow/log"
"sync"
)
var _poolOnce sync.Once
// kisPool is used to manage all Function and Flow configuration pools
type kisPool struct {
fnRouter funcRouter // All Function management routes
fnLock sync.RWMutex // Lock for fnRouter
flowRouter flowRouter // All flow objects
flowLock sync.RWMutex // Lock for flowRouter
}
// Singleton
var _pool *kisPool
// Pool Singleton constructor
func Pool() *kisPool {
_poolOnce.Do(func() {
// Create kisPool object
_pool = new(kisPool)
// Initialize fnRouter
_pool.fnRouter = make(funcRouter)
// Initialize flowRouter
_pool.flowRouter = make(flowRouter)
})
return _pool
}
kis_pool
adopts the singleton pattern, and the Pool()
method retrieves the current singleton. The fnRouter
and flowRouter
are initialized only once during the lifecycle, controlled by sync.Once
.
4.2.2 Registering and Getting Flow
KisPool can provide interfaces to add and retrieve Flow information as follows:
kis-flow/kis/pool.go
func (pool *kisPool) AddFlow(name string, flow Flow) {
pool.flowLock.Lock()
defer pool.flowLock.Unlock()
if _, ok := pool.flowRouter[name]; !ok {
pool.flowRouter[name] = flow
} else {
errString := fmt.Sprintf("Pool AddFlow Repeat FlowName=%s\n", name)
panic(errString)
}
log.Logger().InfoF("Add FlowRouter FlowName=%s\n", name)
}
func (pool *kisPool) GetFlow(name string) Flow {
pool.flowLock.RLock()
defer pool.flowLock.RUnlock()
if flow, ok := pool.flowRouter[name]; ok {
return flow
} else {
return nil
}
}
AddFlow
performs duplicate checking based on the same FlowName
; the same Flow cannot be registered multiple times.
4.2.3 Registering and Scheduling Functions
KisPool provides methods to register Function callbacks and schedule Functions as follows.
kis-flow/kis/pool.go
// FaaS registers Function computing business logic, indexed and registered by Function Name
func (pool *kisPool) FaaS(fnName string, f FaaS) {
pool.fnLock.Lock()
defer pool.fnLock.Unlock()
if _, ok := pool.fnRouter[fnName]; !ok {
pool.fnRouter[fnName] = f
} else {
errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
panic(errString)
}
log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
}
// CallFunction schedules a Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
if f, ok := pool.fnRouter[fnName]; ok {
return f(ctx, flow)
}
log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}
In CallFunction()
, the Flow
parameter is required as the context environment for data stream scheduling. Developers can use Flow in custom FaaS
to obtain some Function information. Therefore, we need to add some interfaces to Flow
to retrieve configuration information if needed. These interfaces are added as follows:
kis-flow/kis/flow.go
type Flow interface {
// Run schedules the Flow, sequentially schedules Functions in the Flow and executes them
Run(ctx context.Context) error
// Link connects the Functions in the Flow according to the configuration in the configuration file
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow submits Flow data to the upcoming Function layer
CommitRow(row interface{}) error
// Input gets the input source data for the currently executing Function in the Flow
Input() common.KisRowArr
// ++++++++++++++++++++++++++++++++++
// GetName gets the name of the Flow
GetName() string
// GetThisFunction gets the currently executing Function
GetThisFunction() Function
// GetThisFuncConf gets the configuration of the currently executing Function
GetThisFuncConf() *config.KisFuncConfig
}
kis-flow/flow/kis_flow.go
func (flow *KisFlow) GetName() string {
return flow.Name
}
func (flow *KisFlow) GetThisFunction() kis.Function {
return flow.ThisFunction
}
func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig {
return flow.ThisFunction.GetConfig()
}
4.3 KisFunction Refers to KisPool for Scheduling
Now, we can use the Pool for scheduling within the Call()
method of KisFunctionX. Let's modify the Call()
method for each Function accordingly.
kis-flow/function/kis_function_c.go
package function
import (
"context"
"kis-flow/kis"
"kis-flow/log"
)
type KisFunctionC struct {
BaseFunction
}
func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow)
// Route to the specific executing computation Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
return err
}
return nil
}
kis-flow/function/kis_function_e.go
package function
import (
"context"
"kis-flow/kis"
"kis-flow/log"
)
type KisFunctionE struct {
BaseFunction
}
func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow)
// Route to the specific executing computation Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
return err
}
return nil
}
kis-flow/function/kis_function_l.go
package function
import (
"context"
"kis-flow/kis"
"kis-flow/log"
)
type KisFunctionL struct {
BaseFunction
}
func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionL, flow = %+v\n", flow)
// Route to the specific executing computation Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
return err
}
return nil
}
kis-flow/function/kis_function_s.go
package function
import (
"context"
"kis-flow/kis"
"kis-flow/log"
)
type KisFunctionS struct {
BaseFunction
}
func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionS, flow = %+v\n", flow)
// Route to the specific executing computation Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
return err
}
return nil
}
kis-flow/function/kis_function_v.go
package function
import (
"context"
"kis-flow/kis"
"kis-flow/log"
)
type KisFunctionV struct {
BaseFunction
}
func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().InfoF("KisFunctionV, flow = %+v\n", flow)
// Route to the specific executing computation Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
return err
}
return nil
}
4.4 KisPool Unit Testing
Next, let's perform unit testing for KisPool.
4.4.1 Custom FaaS
kis-flow/test/kis_pool_test.go
package test
import (
"context"
"fmt"
"kis-flow/common"
"kis-flow/config"
"kis-flow/flow"
"kis-flow/kis"
"testing"
)
func funcName1Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName1Handler ----")
for index, row := range flow.Input() {
// Print data
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
// Calculate result data
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// Commit result data
_ = flow.CommitRow(resultStr)
}
return nil
}
func funcName2Handler(ctx context.Context, flow kis.Flow) error {
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return nil
}
4.4.2 Register FaaS and Start Flow
kis-flow/test/kis_pool_test.go
func TestNewKisPool(t *testing.T) {
ctx := context.Background()
// 0. Register Functions
kis.Pool().FaaS("funcName1", funcName1Handler)
kis.Pool().FaaS("funcName2", funcName2Handler)
// 1. Create 2 KisFunction configuration instances
source1 := config.KisSource{
Name: "Public account data from TikTok Mall",
Must: []string{"order_id", "user_id"},
}
source2 := config.KisSource{
Name: "User order error rate",
Must: []string{"order_id", "user_id"},
}
myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
if myFuncConfig1 == nil {
panic("myFuncConfig1 is nil")
}
myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil)
if myFuncConfig2 == nil {
panic("myFuncConfig2 is nil")
}
// 2. Create a KisFlow configuration instance
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 3. Create a KisFlow object
flow1 := flow.NewKisFlow(myFlowConfig1)
// 4. Link Functions to Flow
if err := flow1.Link(myFuncConfig1, nil); err != nil {
panic(err)
}
if err := flow1.Link(myFuncConfig2, nil); err != nil {
panic(err)
}
// 5. Commit original data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 6. Run flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
Execute the following command in the kis-flow/test/
directory:
go test -test.v -test.paniconexit0 -test.run TestNewKisPool
The result is as follows:
=== RUN TestNewKisPool
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionC, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c0190 ThisFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-51527b72a4ee447fb0bd494bda9a84ad, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-1fdae2bfac684f1d8edf89d9000208c0
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionE, flow = &{Id:flow-1fdae2bfac684f1d8edf89d9000208c0 Name:flowName1 Conf:0xc0000e27c0 Funcs:map[func-51527b72a4ee447fb0bd494bda9a84ad:0xc0000c0190 func-9cd2ab870b384794b312d2be10bb06fa:0xc0000c01e0] FlowHead:0xc0000c0190 FlowTail:0xc0000c01e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000c01e0 ThisFunctionId:func-9cd2ab870b384794b312d2be10bb06fa PrevFunctionId:func-51527b72a4ee447fb0bd494bda9a84ad funcParams:map[func-51527b72a4ee447fb0bd494bda9a84ad:map[] func-9cd2ab870b384794b312d2be10bb06fa:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-51527b72a4ee447fb0bd494bda9a84ad:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}
In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-9cd2ab870b384794b312d2be10bb06fa, row = data from funcName[funcName1], index = 2
--- PASS: TestNewKisPool (0.00s)
PASS
ok kis-flow/test 0.520s
After detailed logging verification, the result meets our expectations.
Now that the business capabilities of Functions have been exposed to developers, let's continue to enhance the capabilities of KisFlow.
4.5 [V0.3] Source Code
https://github.com/aceld/kis-flow/releases/tag/v0.3
Author: Aceld
GitHub: https://github.com/aceld
KisFlow Open Source Project Address: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (0)