DEV Community

Aceld
Aceld

Posted on • Edited on

(Part 7)Golang Framework Hands-on - KisFlow Stream Computing Framework-KisFlow Action

#go

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications


7.1 Action Abort

KisFlow Action refers to controlling the flow's scheduling logic while executing a Function. KisFlow provides some Action options for developers to choose from. This section introduces the simplest Action, Abort, which terminates the current Flow.

The final usage of Abort is as follows:

func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call AbortFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionAbort)  // Terminate the Flow
}
Enter fullscreen mode Exit fullscreen mode

AbortFuncHandler() is a business callback method of a Function, defined by the developer. After the current Function is executed, the normal situation is to continue to the next Function. However, if flow.Next(kis.ActionAbort) is returned as the current Function's return value, the next Function will not be executed. Instead, the scheduling computation flow of the current Flow is directly terminated.

Let's implement the Abort Action mode of KisFlow below.

7.1.1 Abort Interface Definition

First, let's define the Abort() interface for the Flow.

kis-flow/kis/flow.go

type Flow interface {
    // Run schedules the Flow, sequentially scheduling and executing Functions in the Flow
    Run(ctx context.Context) error
    // Link connects the Functions in the Flow according to the configuration file
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow submits Flow data to the Function layer about to be executed
    CommitRow(row interface{}) error
    // Input obtains the input source data of the currently executing Function in the Flow
    Input() common.KisRowArr
    // GetName retrieves the name of the Flow
    GetName() string
    // GetThisFunction obtains the currently executing Function
    GetThisFunction() Function
    // GetThisFuncConf obtains the configuration of the currently executing Function
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector obtains the Connector of the currently executing Function
    GetConnector() (Connector, error)
    // GetConnConf obtains the configuration of the Connector of the currently executing Function
    GetConnConf() (*config.KisConnConfig, error)
    // GetConfig obtains the configuration of the current Flow
    GetConfig() *config.KisFlowConfig
    // GetFuncConfigByName retrieves the configuration of the specified Function in the Flow
    GetFuncConfigByName(funcName string) *config.KisFuncConfig

    // --- KisFlow Action ---
    // Next proceeds to the next Function in the Flow with the specified Action
    Next(acts ...ActionFunc) error
}
Enter fullscreen mode Exit fullscreen mode

Here, an interface Next(acts ...ActionFunc) error is provided, where the parameter is a variadic parameter of type ActionFunc. This is the method related to Actions that we define for KisFlow. The module for the definition of Action is as follows:

7.1.2 Action Module Definition

Action is a configuration module used to control special behaviors in the Flow execution process through Functions. This includes the Abort behavior mentioned above, which is one of the Actions. The module definition for Action is as follows. Create a file action.go under kis-flow/kis/ and implement it:

kis-flow/kis/action.go

package kis

// Action represents the Actions to be taken during the execution of KisFlow
type Action struct {
    // Abort indicates whether to terminate the Flow execution
    Abort bool
}

// ActionFunc is a type for KisFlow Functional Option
type ActionFunc func(ops *Action)

// LoadActions loads Actions and sequentially executes the ActionFunc functions
func LoadActions(acts []ActionFunc) Action {
    action := Action{}

    if acts == nil {
        return action
    }

    for _, act := range acts {
        act(&action)
    }

    return action
}

// ActionAbort sets the Action to terminate the Flow execution
func ActionAbort(action *Action) {
    action.Abort = true
}
Enter fullscreen mode Exit fullscreen mode

First, currently, Action has only one behavior, Abort, which is represented by a boolean type. If true, it indicates that the Flow needs to be terminated.

Next, type ActionFunc func(ops *Action) is a function type where the parameter is a pointer to an Action{}. The function func ActionAbort(action *Action) is a specific instance of this function type. The purpose of the ActionAbort() method is to set the Abort member of the Action struct to true.

Finally, let's look at the func LoadActions(acts []ActionFunc) Action method. The parameter is an array of ActionFunc functions. LoadActions() creates a new Action{}, then sequentially executes the functions in the []ActionFunc array to modify the members of Action{}. It finally returns the modified Action{} to the upper layer.

7.1.3 Implementation of the Next Method

Next, we need to implement this interface for the KisFlow module. First, we need to add an Action{} member to KisFlow, indicating the action to be taken after each Function execution.

kis-flow/flow/kis_flow.go

// KisFlow provides the context for the entire streaming computation
type KisFlow struct {
    // Basic information
    Id   string                // Distributed instance ID of the Flow (used internally in KisFlow to distinguish different instances)
    Name string                // Readable name of the Flow
    Conf *config.KisFlowConfig // Flow configuration strategy

    // List of Functions
    Funcs          map[string]kis.Function // All Function objects managed by the current flow, key: FunctionName
    FlowHead       kis.Function            // Head of the Function list owned by the current Flow
    FlowTail       kis.Function            // Tail of the Function list owned by the current Flow
    flock          sync.RWMutex            // Lock for managing list insertion and reading/writing
    ThisFunction   kis.Function            // The KisFunction object currently being executed by the Flow
    ThisFunctionId string                  // ID of the currently executing Function
    PrevFunctionId string                  // ID of the previous Function executed

    // Function list parameters
    funcParams map[string]config.FParam // Custom fixed configuration parameters of the current Function in the flow, key: Function instance KisID, value: FParam
    fplock     sync.RWMutex             // Lock for managing reading/writing of funcParams

    // Data
    buffer common.KisRowArr  // Internal buffer for temporarily storing input byte data, a single piece of data is an interface{}, multiple pieces of data are []interface{}, i.e., KisBatch
    data   common.KisDataMap // Data source for each layer of streaming computation
    inPut  common.KisRowArr  // Input data for the currently executing Function

    // +++++++++++++++++++++

    // KisFlow Action
    action kis.Action        // Action to be taken by the current Flow
}
Enter fullscreen mode Exit fullscreen mode

Then implement the Next() interface for KisFlow as follows:

kis-flow/flow/kis_flow.go

// Next proceeds to the next Function in the Flow with the specified Action
func (flow *KisFlow) Next(acts ...kis.ActionFunc) error {

    // Load Actions passed by the Function FaaS
    flow.action = kis.LoadActions(acts)

    return nil
}
Enter fullscreen mode Exit fullscreen mode

Each time a developer executes a custom business callback in a Function, flow.Next() is called to pass the Action at the end. Therefore, Next(acts ...kis.ActionFunc) error loads the passed Action properties and saves them in flow.action.

7.1.4 Abort to Control Flow Execution

Now that we have an Abort action to control the Flow, we need to add a member to KisFlow to represent this state.

kis-flow/flow/kis_flow.go

// KisFlow provides the context for the entire streaming computation
type KisFlow struct {
    // Basic information
    Id   string                // Distributed instance ID of the Flow (used internally in KisFlow to distinguish different instances)
    Name string                // Readable name of the Flow
    Conf *config.KisFlowConfig // Flow configuration strategy

    // List of Functions
    Funcs          map[string]kis.Function // All Function objects managed by the current flow, key: FunctionName
    FlowHead       kis.Function            // Head of the Function list owned by the current Flow
    FlowTail       kis.Function            // Tail of the Function list owned by the current Flow
    flock          sync.RWMutex            // Lock for managing list insertion and reading/writing
    ThisFunction   kis.Function            // The KisFunction object currently being executed by the Flow
    ThisFunctionId string                  // ID of the currently executing Function
    PrevFunctionId string                  // ID of the previous Function executed

    // Function list parameters
    funcParams map[string]config.FParam // Custom fixed configuration parameters of the current Function in the flow, key: Function instance KisID, value: FParam
    fplock     sync.RWMutex             // Lock for managing reading/writing of funcParams

    // Data
    buffer common.KisRowArr  // Internal buffer for temporarily storing input byte data, a single piece of data is an interface{}, multiple pieces of data are []interface{}, i.e., KisBatch
    data   common.KisDataMap // Data source for each layer of streaming computation
    inPut  common.KisRowArr  // Input data for the currently executing Function
    action kis.Action        // Action to be taken by the current Flow

    // +++++++++
    abort  bool              // Indicates whether to abort the Flow
}
Enter fullscreen mode Exit fullscreen mode

Each time the flow.Run() method is executed, the abort variable needs to be reset. Additionally, the loop scheduling needs to check the flow.abort status.

kis-flow/flow/kis_flow.go

// Run starts the streaming computation of KisFlow, executing the stream from the starting Function
func (flow *KisFlow) Run(ctx context.Context) error {

    // +++++++++
    // Reset abort
    flow.abort = false  // Reset the abort state each time scheduling starts

    // ... ...

    // ... ...

    // Stream chain call
    for fn != nil && flow.abort != true { // ++++ Do not enter the next loop if abort is set

        // ... ...
        // ... ...

        if err := fn.Call(ctx, flow); err != nil {
            // Error
            return err
        } else {
            // Success

            // ... ...

            fn = fn.Next()
        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

When Call() schedules the custom method of the Function, if return flow.Next(ActionAbort) is called, it will change the Action state of the Flow, thereby controlling the termination of the Flow execution. Finally, the Abort state of the Action is transferred to the Abort state of KisFlow.

Since we have the Abort state, we can add a condition during the Flow execution. If the current Function does not submit its result data (i.e., flow.buffer is empty), the Flow will not proceed to the next layer and will directly exit the Run() call.

kis-flow/flow/kis_flow_data.go

// commitCurData submits the result data of the currently executing Function in the Flow
func (flow *KisFlow) commitCurData(ctx context.Context) error {

    // Check if there is result data for the current computation; if not, exit the current Flow Run loop
    if (len(flow.buffer) == 0) {
        // ++++++++++++
        flow.abort = true
        return nil
    }

    // ... ...
    // ... ...

    return nil
}
Enter fullscreen mode Exit fullscreen mode

7.1.5 Capturing and Handling Actions

Next, we will implement a method specifically for handling Actions. This method will be defined in the kis-flow/flow/kis_flow_action.go file as follows:

kis-flow/flow/kis_flow_action.go

package flow

import (
    "context"
    "errors"
    "fmt"
    "kis-flow/kis"
)

// dealAction handles Actions and decides the subsequent flow direction
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    if err := flow.commitCurData(ctx); err != nil {
        return nil, err
    }

    // Update the previous FunctionId cursor
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action forces termination
    if flow.action.Abort {
        flow.abort = true
    }

    // Clear Action
    flow.action = kis.Action{}

    return fn, nil
}
Enter fullscreen mode Exit fullscreen mode

Next, we'll slightly modify the KisFlow Run() process to incorporate the dealAction() method.

kis-flow/flow/kis_flow.go

// Run starts the streaming computation of KisFlow, executing the stream from the starting Function
func (flow *KisFlow) Run(ctx context.Context) error {

    var fn kis.Function

    fn = flow.FlowHead
    flow.abort = false

    if flow.Conf.Status == int(common.FlowDisable) {
        // Flow is configured to be disabled
        return nil
    }

    // Since no Function has been executed at this point, PrevFunctionId is FirstVirtual as there is no previous Function
    flow.PrevFunctionId = common.FunctionIdFirstVirtual

    // Submit the original data stream
    if err := flow.commitSrcData(ctx); err != nil {
        return err
    }

    // Stream chain call
    for fn != nil && flow.abort == false {

        // Flow records the current executing Function
        fid := fn.GetId()
        flow.ThisFunction = fn
        flow.ThisFunctionId = fid

        // Get the source data to be processed by the current Function
        if inputData, err := flow.getCurData(); err != nil {
            log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
            return err
        } else {
            flow.inPut = inputData
        }

        if err := fn.Call(ctx, flow); err != nil {
            // Error
            return err
        } else {
            // Success

            // +++++++++++++++++++++++++++++++
            fn, err = flow.dealAction(ctx, fn)
            if err != nil {
                return err
            }
            // +++++++++++++++++++++++++++++++
        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

7.1.6 Action Abort Unit Test

First, let's create a Function configuration file as follows:

kis-flow/test/load_conf/func/func-AbortFunc.yml

kistype: func
fname: abortFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id
Enter fullscreen mode Exit fullscreen mode

The name of the current Function is abortFunc. Next, we implement its FaaS function as follows:

kis-flow/test/faas/faas_abort.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call AbortFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionAbort)
}
Enter fullscreen mode Exit fullscreen mode

This Function will eventually call flow.Next(kis.ActionAbort) to terminate the Flow. Next, we create a Flow that uses the above Function as an intermediate Function to test if it will terminate before executing subsequent Functions. The new flow configuration is as follows:

kis-flow/test/load_conf/flow/flow-FlowName2.yml

kistype: flow
status: 1
flow_name: flowName2
flows:
  - fname: funcName1
  - fname: abortFunc
  - fname: funcName3
Enter fullscreen mode Exit fullscreen mode

The name of the current Flow is flowName2, which contains three Functions: funcName1, abortFunc, and funcName3. If the abort functionality works correctly, funcName3 should not be executed.

Next, we implement the unit test case.

kis-flow/test/kis_action_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestActionAbort(t *testing.T) {
    ctx := context.Background()

    // 0. Register Function callbacks
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // Add abortFunc handler
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. Register ConnectorInit and Connector callbacks
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. Load configuration files and build Flow
    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. Get Flow
    flow1 := kis.Pool().GetFlow("flowName2")

    // 3. Submit original data
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. Execute flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

The following code registers the initial callbacks. You can also write this code in another file to avoid repeating it each time:

// 0. Register Function callbacks
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // Add abortFunc handler
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

// 0. Register ConnectorInit and Connector callbacks
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
Enter fullscreen mode Exit fullscreen mode

Change to the kis-flow/test/ directory and run the following command:

go test -test.v -test.paniconexit0 -test.run  TestActionAbort
Enter fullscreen mode Exit fullscreen mode

The result is as follows:

=== RUN   TestActionAbort
Add KisPool FuncName=funcName1
Add KisPool FuncName=abortFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1

Add FlowRouter FlowName=flowName2

context.Background
====> After CommitSrcData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094cc0 ThisFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] jumpFunc:NoJump abort:false nextOpt:<nil>}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094d20 ThisFunctionId:func-7f5af1521fd64d08839d5bdd26de5254 PrevFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] jumpFunc:NoJump abort:false nextOpt:<nil>}

---> Call AbortFuncHandler ----
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 0
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 1
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 2
--- PASS: TestActionAbort (0.00s)
PASS
ok      kis-flow/test   0.487s
Enter fullscreen mode Exit fullscreen mode

From the result, we can see that after executing the AbortFuncHandler, it did not continue executing and exited the Flow's Run() method.

7.2 Action DataReuse (Reuse Upper-Level Data)

The Action DataReuse is designed to reuse data from the previous function, meaning the current function's submitted result will not be used. Instead, the result data from the previous function will be reused for the next function as its data source.

Let's implement the Action DataReuse functionality.

7.2.1 Adding DataReuse Action

Add a DataReuse member to the Action, which is of boolean type.

kis-flow/kis/action.go

// Action KisFlow execution process Actions
type Action struct {
    // +++++++++++++
    // DataReuse indicates whether to reuse upper-level function data
    DataReuse bool

    // Abort terminates the execution of the Flow
    Abort bool
}

// ActionDataReuse sets the DataReuse option to true
func ActionDataReuse(act *Action) {
    act.DataReuse = true
}
Enter fullscreen mode Exit fullscreen mode

Then provide an Action function named ActionDataReuse, which sets the DataReuse status to true.

7.2.2 Reusing Upper-Level Data to the Next Layer

Here, we need to implement a method for submitting reused data. The logic is as follows:

kis-flow/flow/kis_flow_data.go

// commitReuseData submits reused data from the previous function
func (flow *KisFlow) commitReuseData(ctx context.Context) error {

    // Check if the previous layer has result data, if not, exit the current Flow Run loop
    if len(flow.data[flow.PrevFunctionId]) == 0 {
        flow.abort = true
        return nil
    }

    // The current layer's result data equals the previous layer's result data (reuse upper-level result data)
    flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]

    // Clear the buffer (if ReuseData is selected, all submitted data will not be carried to the next layer)
    flow.buffer = flow.buffer[0:0]

    log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}
Enter fullscreen mode Exit fullscreen mode

The logic is simple. Unlike commitCurData(), which submits flow.buffer data to flow.data[flow.ThisFunctionId], commitReuseData() submits the previous layer's result data to flow.data[flow.ThisFunctionId].

7.2.3 Handling the DataReuse Action

Then, add handling for the DataReuse action in the dealAction() method:

kis-flow/flow/kis_flow_action.go

// dealAction processes actions and determines the next steps for the Flow
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // ++++++++++++++++
    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }

    // Update the previous function ID cursor
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action force termination
    if flow.action.Abort {
        flow.abort = true
    }

    // Clear the Action
    flow.action = kis.Action{}

    return fn, nil
}
Enter fullscreen mode Exit fullscreen mode

This captures and processes the DataReuse action, deciding whether to reuse upper-level data or to commit current data based on the action settings.

7.2.4 Unit Testing

Let's proceed with unit testing for DataReuse. First, create a function named dataReuseFunc, and create its configuration file:

kis-flow/test/load_conf/func/func-dataReuseFunc.yml

kistype: func
fname: dataReuseFunc
fmode: Calculate
source:
  name: User Order Error Rate
  must:
    - order_id
    - user_id
Enter fullscreen mode Exit fullscreen mode

Also, create a new Flow called flowName3 with the following configuration:

kis-flow/test/load_conf/flow/func-FlowName3.yml

kistype: flow
status: 1
flow_name: flowName3
flows:
  - fname: funcName1
  - fname: dataReuseFunc
  - fname: funcName3

Enter fullscreen mode Exit fullscreen mode

For the logic of the dataReuseFunc function, here's the implementation:

kis-flow/test/faas/faas_data_reuse.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

func DataReuseFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call DataReuseFuncHandler ----")

    for index, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        // Calculate result data
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // Commit result data
        _ = flow.CommitRow(resultStr)
    }

    return flow.Next(kis.ActionDataReuse)
}

Enter fullscreen mode Exit fullscreen mode

Finally, implement the test case:

kis-flow/test/kis_action_test.go

func TestActionDataReuse(t *testing.T) {
    ctx := context.Background()

    // 0. Register Function callback business
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // Adding dataReuseFunc business
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. Register ConnectorInit and Connector callback business
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. Load configuration files and build Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. Get Flow
    flow1 := kis.Pool().GetFlow("flowName3")

    // 3. Submit raw data
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. Execute flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Navigate to kis-flow/test/ and execute:

go test -test.v -test.paniconexit0 -test.run  TestActionDataReuse
Enter fullscreen mode Exit fullscreen mode
=== RUN   TestActionDataReuse
Add KisPool FuncName=funcName1
Add KisPool FuncName=dataReuseFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000955c0 ThisFunctionId:func-7886178381634f05b302841141382e59 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data3 from Test
context.Background

 ====> After commitCurData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095620 ThisFunctionId:func-ef567879d0dd45b287ed709e549e9d32 PrevFunctionId:func-7886178381634f05b302841141382e59 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call DataReuseFuncHandler ----
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 0
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 1
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 2
context.Background
 ====> After commitReuseData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095680 ThisFunctionId:func-cfe66e39aba54ff989d6764cc4edda20 PrevFunctionId:func-ef567879d0dd45b287ed709e549e9d32 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 0
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 1
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 2
--- PASS: TestActionDataReuse (0.02s)
PASS
ok      kis-flow

-flow/test   0.523s

Enter fullscreen mode Exit fullscreen mode

The test execution output provides valuable insights into the functioning of the DataReuse feature. Initially, the test sets up various functions and connectors and then loads the configurations. After submitting raw data to flowName3, the flow is executed.

During execution, the data flows through different functions, starting with funcName1, then dataReuseFunc, and finally funcName3. At each step, we observe the input data and the resulting data transformation.

For funcName1, we see the original data being processed, and then for dataReuseFunc, the data from funcName1 is reused. Finally, in funcName3, we observe the reused data being further processed.

This comprehensive unit test ensures that the DataReuse functionality behaves as expected, effectively passing data from one function to another without loss or corruption.

7.3 Action ForceEntryNext (Forcing Entry to the Next Layer)

7.3.1 ForceEntryNext Action Attribute

In the current KisFlow implementation, if the current Function does not commit any data (results data for this layer), the Flow will not proceed to the next layer of Functions after the current one finishes. However, in some streaming computations, it might be necessary to continue executing downward even if there is no data available. Therefore, we can introduce a ForceEntryNext action to trigger this behavior.

Firstly, we add a ForceEntryNext attribute to the Action:

kis-flow/kis/action.go

// Action KisFlow execution Actions
type Action struct {
    // DataReuse indicates whether to reuse data from the upper layer Function
    DataReuse bool

    // By default, if the current Function calculates 0 rows of data, subsequent Functions will not execute
    // ForceEntryNext overrides the above default rule and forces entry to the next layer of Functions even if there's no data
    ForceEntryNext bool

    // Abort terminates the execution of the Flow
    Abort bool
}

// ActionForceEntryNext sets the ForceEntryNext attribute to true
func ActionForceEntryNext(act *Action) {
    act.ForceEntryNext = true
}
Enter fullscreen mode Exit fullscreen mode

We also provide a configuration function ActionForceEntryNext() to modify this attribute's status.

7.3.2 Capturing the Action

In the dealAction() method, which captures the Action, we add a check for this status. If set, the flow.abort status needs to be changed to false, allowing the flow to continue to the next layer.

kis-flow/flow/kis_flow_action.go

// dealAction processes the Action and determines the next steps of the Flow
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }

    // ++++++++++++++++++++++++++++
    // ForceEntryNext Action
    if flow.action.ForceEntryNext {
        if err := flow.commitVoidData(ctx); err != nil {
            return nil, err
        }
        flow.abort = false
    }

    // Update the previous FunctionId cursor
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action
    if flow.action.Abort {
        flow.abort = true
    }

    // Clear the Action
    flow.action = kis.Action{}

    return fn, nil
}
Enter fullscreen mode Exit fullscreen mode

Here is a detail: we need to call a commitVoidData() method, which commits empty data. The reason is that if empty data is not committed, the flow.buffer remains empty, preventing the commit action. This would result in the key flow.data[flow.ThisFunctionId] not existing, causing a key-not-found exception and panic when flow.getCurData() is executed. Therefore, empty data needs to be committed to flow.data[flow.ThisFunctionId].

The specific implementation of commitVoidData() is as follows:

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) commitVoidData(ctx context.Context) error {
    if len(flow.buffer) != 0 {
        return nil
    }

    // Create empty data
    batch := make(common.KisRowArr, 0)

    // Commit the buffer data to the result data of this layer
    flow.data[flow.ThisFunctionId] = batch

    log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}
Enter fullscreen mode Exit fullscreen mode

7.3.3 Unit Test Without Setting ForceEntryNext

First, create a Function configuration for ·noResultFunc· and implement the corresponding callback business function.

kis-flow/test/load_conf/func/func-NoResultFunc.yml

kistype: func
fname: noResultFunc
fmode: Calculate
source:
  name: user_order_error_rate
  must:
    - order_id
    - user_id
Enter fullscreen mode Exit fullscreen mode

kis-flow/test/faas/faas_no_result.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call NoResultFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next()
}
Enter fullscreen mode Exit fullscreen mode

In this Function, at the end, we only call flow.Next() without passing any Action.

Next, create a new Flow FlowName4 with the following configuration:

kis-flow/test/load_conf/flow-FlowName4.yml

kistype: flow
status: 1
flow_name: flowName4
flows:
  - fname: funcName1
  - fname: noResultFunc
  - fname: funcName3
Enter fullscreen mode Exit fullscreen mode

Finally, we write a unit test case code with noResultFunc in the middle.

kis-flow/test/kis_action_test.go

func TestActionForceEntry(t *testing.T) {
    ctx := context.Background()

    // 0. Register Function callback business
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // Add noResultFunc business
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. Register ConnectorInit and Connector callback business
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. Load configuration files and build the Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. Get the Flow
    flow1 := kis.Pool().GetFlow("flowName4")

    // 3. Submit original data
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. Execute flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Navigate to kis-flow/test/ and execute:

go test -test.v -test.paniconexit0 -test.run  TestActionForceEntry
Enter fullscreen mode Exit fullscreen mode

The results are as follows:

=== RUN   TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d560 ThisFunctionId:func-4d113d6a8e744d30a906db310f2d7818 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d5c0 ThisFunctionId:func-47cb6f9ae464484aa779c18284035705 PrevFunctionId:func-4d113d6a8e744d30a906db310f2d7818 funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 2
--- PASS: TestActionForceEntry (0.02s)
PASS
ok      kis-flow/test   0.958s
Enter fullscreen mode Exit fullscreen mode

Because noResultFunc does not generate any result data, the next Function will not be executed. The execution ends with:

---> Call NoResultFuncHandler ----
Enter fullscreen mode Exit fullscreen mode

7.3.4 Unit Testing with ForceEntryNext

Next, we will add the ForceEntryNext action. In NoResultFuncHandler(), we add flow.Next(kis.ActionForceEntryNext) as shown below:

kis-flow/test/faas/faas_no_result.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call NoResultFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionForceEntryNext)
}
Enter fullscreen mode Exit fullscreen mode

Navigate to the kis-flow/test/ directory and execute:

go test -test.v -test.paniconexit0 -test.run TestActionForceEntry
Enter fullscreen mode Exit fullscreen mode

The results are as follows:

=== RUN   TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode=Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-ecddaee7d7d447a9852d07088732f509 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013740 ThisFunctionId:func-c9817c7993894919b8463dea1757544e PrevFunctionId:func-ecddaee7d7d447a9852d07088732f509 funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 2
context.Background
 ====> After commitVoidData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000137a0 ThisFunctionId:func-5729600ae6ea4d6f879eb5832c638e1a PrevFunctionId:func-c9817c7993894919b8463dea1757544e funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName3Handler ----
--- PASS: TestActionForceEntry (0.01s)
PASS
ok      kis-flow/test   0.348s
Enter fullscreen mode Exit fullscreen mode

It is observed that the function in the third layer, funcName3Handler, is executed, but it has no data.

7.4 Action JumpFunc (Flow Jump)

Next, we will implement the JumpFunc Action. JumpFunc allows jumping to a specified FuncName within the current Flow and continuing execution (provided that the target FuncName exists in the current Flow).

Note: JumpFunc can lead to infinite loops, so use it cautiously in business logic.

7.4.1 Adding JumpFunc to Action

First, add a JumpFunc property to the Action. Note that JumpFunc is not a boolean state but a string representing the specific FunctionName to jump to.

kis-flow/kis/action.go

// Action KisFlow execution flow actions
type Action struct {
    // DataReuse indicates whether to reuse data from the previous function
    DataReuse bool

    // By default, Next() will not execute the subsequent function if the current function's result set is empty.
    // ForceEntryNext forces the next function to execute even if the current function's result set is empty.
    ForceEntryNext bool

    // ++++++++++
    // JumpFunc specifies the function to jump to for further execution
    JumpFunc string

    // Abort terminates the flow execution
    Abort bool
}

// ActionJumpFunc returns an ActionFunc function that sets the JumpFunc property in Action
// (Note: Can easily cause flow loops leading to infinite loops)
func ActionJumpFunc(funcName string) ActionFunc {
    return func(act *Action) {
        act.JumpFunc = funcName
    }
}
Enter fullscreen mode Exit fullscreen mode

Then provide a method to modify the JumpFunc configuration ActionJumpFunc(). Note that this method differs from previous ones as it returns an anonymous function and executes it to modify the JumpFunc property in Action.

7.4.2 Capturing the Action

Next, we capture the JumpFunc action by checking if JumpFunc is an empty string.

kis-flow/flow/kis_flow_action.go

// dealAction processes the Action and determines the next step in the Flow
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }

    // ForceEntryNext Action
    if flow.action.ForceEntryNext {
        if err := flow.commitVoidData(ctx); err != nil {
            return nil, err
        }
        flow.abort = false
    }

    // ++++++++++++++++++++++++++++++++
    // JumpFunc Action
    if flow.action.JumpFunc != "" {
        if _, ok := flow.Funcs[flow.action.JumpFunc]; !ok {
            // JumpFunc is not in the flow
            return nil, errors.New(fmt.Sprintf("Flow Jump -> %s is not in Flow", flow.action.JumpFunc))
        }

        jumpFunction := flow.Funcs[flow.action.JumpFunc]
        // Update the previous function
        flow.PrevFunctionId = jumpFunction.GetPrevId()
        fn = jumpFunction

        // If a jump is set, force the jump
        flow.abort = false
    // ++++++++++++++++++++++++++++++++

    } else {

        // Update the previous function ID cursor
        flow.PrevFunctionId = flow.ThisFunctionId
        fn = fn.Next()
    }

    // Abort Action forcibly terminates the flow
    if flow.action.Abort {
        flow.abort = true
    }

    // Clear the Action
    flow.action = kis.Action{}

    return fn, nil
}
Enter fullscreen mode Exit fullscreen mode

If JumpFunc is set, the next function fn pointer needs to be updated accordingly. Otherwise, the normal address fn.Next() is used.

7.4.3 Unit Testing

Next, let's define a function with a jump action configuration as follows:

kis-flow/test/load_conf/func/func-jumpFunc.yml

kistype: func
fname: jumpFunc
fmode: Calculate
source:
  name: User Order Error Rate
  must:
    - order_id
    - user_id
Enter fullscreen mode Exit fullscreen mode

And implement the related function business logic as follows:

kis-flow/test/faas/faas_jump.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func JumpFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call JumpFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionJumpFunc("funcName1"))
}
Enter fullscreen mode Exit fullscreen mode

Here, flow.Next(kis.ActionJumpFunc("funcName1")) specifies the jump to the function named funcName1.

Create a new flow named FlowName5 with the following configuration:

kis-flow/test/load_conf/flow/flow-FlowName5.yml

kistype: flow
status: 1
flow_name: flowName5
flows:
  - fname: funcName1
  - fname: funcName2
  - fname: jumpFunc
Enter fullscreen mode Exit fullscreen mode

Next, implement the unit test case code as follows:

kis-flow/test/kis_action_test.go

func TestActionJumpFunc(t *testing.T) {
    ctx := context.Background()

    // 0. Register Function callback business logic
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // Add jumpFunc business logic

    // 0. Register ConnectorInit and Connector callback business logic
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. Load configuration files and build the flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. Get the flow
    flow1 := kis.Pool().GetFlow("flowName5")

    // 3. Commit raw data
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. Execute flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Change the directory to kis-flow/test/ and execute:

go test -test.v -test.paniconexit0 -test.run TestActionJumpFunc
Enter fullscreen mode Exit fullscreen mode

The result is as follows:

... 
...

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionS, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013680 ThisFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 PrevFunctionId:func-f6ca8010d66744429bf6069c9897a928 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-4faf8f019f4a4a48b84ef27abfad53d1 PrevFunctionId
funcParams
func-4faf8f019f4a4a48b84ef27abfad53d1
func-5800567c4cd842b6b377c2b0c0fd81c2
func-f6ca8010d66744429bf6069c9897a928
fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data
FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]
inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort
action:{DataReuse
ForceEntryNext
JumpFunc: Abort
}}

---> Call JumpFuncHandler ----
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 0
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 1
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 2
KisFunctionV, flow = &{Id
Name
Conf:0xc000028f80 Funcs
funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0
FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013620 ThisFunctionId
PrevFunctionId
funcParams
func-4faf8f019f4a4a48b84ef27abfad53d1
func-5800567c4cd842b6b377c2b0c0fd81c2
func-f6ca8010d66744429bf6069c9897a928
fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data
FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]
inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort
action:{DataReuse
ForceEntryNext
JumpFunc: Abort
}}

---> Call funcName1Handler ----

...
...

Enter fullscreen mode Exit fullscreen mode

We observe that the Flow keeps looping, indicating that our JumpFunc Action has taken effect.

7.5 [V0.6] Source Code

You can find the source code for version 0.6 of the project at:

https://github.com/aceld/kis-flow/releases/tag/v0.6


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications

Top comments (1)

Collapse
 
billy_lee_e4cb8c40f07ee46 profile image
Billy Lee

666666