Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Next, we will enhance the Function in KisFlow to better focus on processing business data. We will change the previous Function implementation:
func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName3Handler ----")
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return nil
}
In this implementation, raw data is obtained from flow.Input()
. We will modify it so that the business can directly obtain the specific data structure type it wants without assertions and type conversions. The modified Function extended parameter usage is roughly as follows:
proto
type StuScores struct {
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
type StuAvgScore struct {
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
FaaS
type AvgStuScoreIn struct {
serialize.DefaultSerialize
proto.StuScores
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
proto.StuAvgScore
}
// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
avgScore := proto.StuAvgScore{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// Commit the result data
_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
}
return nil
}
In this way, we can directly obtain the desired target output structure data through the third parameter rows
, without needing assertions and conversions, thereby focusing more on the business-side development efficiency.
Of course, if you want to obtain the raw data, you can still get it from flow.Input()
.
This chapter will implement the above functionality in KisFlow.
11.1 Self-Describing FaaS Business Callback Functions
In this section, we will complete the conceptual transformation for self-describing FaaS. Previously, the FaaS callback was defined as:
type FaaS func(context.Context, Flow) error
We need a structure to describe this function's properties, including its name, address, number of parameters, parameter types, return type, etc.
11.1.1 FaaSDesc: Self-Describing Callback Type
Create a new file faas.go under kis-flow/kis/
and define the following structure:
kis-flow/kis/faas.go
// FaaS: Function as a Service
// Change
// type FaaS func(context.Context, Flow) error
// to
// type FaaS func(context.Context, Flow, ...interface{}) error
// to allow data transmission through arbitrary input types in variadic parameters
type FaaS interface{}
// FaaSDesc: Description of the FaaS callback business function
type FaaSDesc struct {
FnName string // Function name
f interface{} // FaaS function
fName string // Function name
ArgsType []reflect.Type // List of function parameter types
ArgNum int // Number of function parameters
FuncType reflect.Type // Function type
FuncValue reflect.Value // Function value (function address)
}
The previous FaaS type is improved to interface{}
, and FaaSDesc
now has some attributes.
-
FnName
: Indicates the name of the current function, such as "funcDemo1" in our previous examples. This is used to identify the function in KisFlow. -
f
: The defined FaaS function. -
fName
: The name of the function defined by f. ArgsType: The list of parameter types of the defined f function, which is a slice. -
ArgNum
: The number of input parameters of the defined f function. -
FuncType
: The data type of the defined f function. -
FuncValue
: The value of the definedf function (address of the schedulable function)
.
11.1.2 Create a New FaaSDesc Object
Below is a constructor function for creating a FaaSDesc object. The parameter types are the FunctionName in KisFlow and the defined FaaS function:
kis-flow/kis/faas.go
// NewFaaSDesc creates a FaaSDesc instance based on the registered FnName and FaaS callback function
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
// The callback function FaaS, function value (function address)
funcValue := reflect.ValueOf(f)
// The type of the callback function FaaS
funcType := funcValue.Type()
// Check if the provided FaaS pointer is a function type
if !isFuncType(funcType) {
return nil, fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
}
// Check if the provided FaaS function has exactly one return value of type error
if funcType.NumOut() != 1 || funcType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
return nil, errors.New("function must have exactly one return value of type error")
}
// The list of parameter types of the FaaS function
argsType := make([]reflect.Type, funcType.NumIn())
// Get the name of the FaaS function
fullName := runtime.FuncForPC(funcValue.Pointer()).Name()
// Ensure the parameter list of FaaS func(context.Context, Flow, ...interface{}) error includes context.Context and kis.Flow
// Check if the parameter list includes kis.Flow
containsKisFlow := false
// Check if the parameter list includes context.Context
containsCtx := false
// Iterate over the parameter types of the FaaS function
for i := 0; i < funcType.NumIn(); i++ {
// Get the type of the i-th parameter
paramType := funcType.In(i)
if isFlowType(paramType) {
// Check if the parameter list includes kis.Flow
containsKisFlow = true
} else if isContextType(paramType) {
// Check if the parameter list includes context.Context
containsCtx = true
} else if isSliceType(paramType) {
// Check if the parameter list includes a slice type
// Get the element type of the current slice parameter
itemType := paramType.Elem()
// If the current parameter is a pointer type, get the type of the structure it points to
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem() // Get the type of the structure it points to
}
} else {
// Other types are not supported...
}
// Append the current parameter type to the argsType list
argsType[i] = paramType
}
if !containsKisFlow {
// If the parameter list does not include kis.Flow, return an error
return nil, errors.New("function parameters must have kis.Flow param, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
}
if !containsCtx {
// If the parameter list does not include context.Context, return an error
return nil, errors.New("function parameters must have context, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
}
// Return the FaaSDesc instance
return &FaaSDesc{
FnName: fnName,
f: f,
fName: fullName,
ArgsType: argsType,
ArgNum: len(argsType),
FuncType: funcType,
FuncValue: funcValue,
}, nil
}
Here, we use reflection to get the related attribute values from the f
function and store them in FaaSDesc
. To ensure that the provided FaaS
function meets the following format:
type FaaS func(context.Context, Flow, ...interface{}) error
We perform strict type checks on the context.Context
and Flow parameters. The checking methods are as follows:
kis-flow/kis/faas.go
// isFuncType checks if the provided paramType is a function type
func isFuncType(paramType reflect.Type) bool {
return paramType.Kind() == reflect.Func
}
// isFlowType checks if the provided paramType is of type kis.Flow
func isFlowType(paramType reflect.Type) bool {
var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()
return paramType.Implements(flowInterfaceType)
}
// isContextType checks if the provided paramType is of type context.Context
func isContextType(paramType reflect.Type) bool {
typeName := paramType.Name()
return strings.Contains(typeName, "Context")
}
// isSliceType checks if the provided paramType is a slice type
func isSliceType(paramType reflect.Type) bool {
return paramType.Kind() == reflect.Slice
}
In NewFaaSDesc()
, we use two boolean variables, containsKisFlow
and containsCtx
, to check whether the parameter list includes Context
and Flow types. The following code ensures compatibility when the parameter type is a structure pointer:
// ... ...
// Get the current parameter type
itemType := paramType.Elem()
// If the current parameter is a pointer type, get the type of the structure it points to
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem() // Get the type of the structure it points to
}
// ... ...
For example, the developer might define the FaaS function prototype as follows:
func MyFaaSDemo(ctx context.Context, flow Flow, []*A) error
or:
func MyFaaSDemo(ctx context.Context, flow Flow, []A) error
11.1.3 Registering FaaS Functions
Next, we will modify the method for registering FaaS
functions in the kisPool
module to register a FaaSDesc description. The modified registration method is as follows:
kis-flow/kis/pool.go
// FaaS registers a Function's business logic, indexed and registered by Function Name
func (pool *kisPool) FaaS(fnName string, f FaaS) {
// When registering a FaaS logic callback, create a FaaSDesc description object
faaSDesc, err := NewFaaSDesc(fnName, f)
if err != nil {
panic(err)
}
pool.fnLock.Lock() // Write lock
defer pool.fnLock.Unlock()
if _, ok := pool.fnRouter[fnName]; !ok {
// Register the FaaSDesc description object into fnRouter
pool.fnRouter[fnName] = faaSDesc
} else {
errString := fmt.Sprintf("KisPool FaaS Repeat FuncName=%s", fnName)
panic(errString)
}
log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
}
Now, the key in fnRouter
remains the FunctionName, but the value is the FaaSDesc description object for the current FaaS function.
11.1.4 Dispatching FaaSDesc in kisPool
Finally, when scheduling a function, use FaaSDesc to retrieve the function address and parameter list for scheduling. The modified CallFunction()
is as follows:
kis-flow/kis/pool.go
// CallFunction dispatches a Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
if funcDesc, ok := pool.fnRouter[fnName]; ok {
// Parameter list for the scheduled function
params := make([]reflect.Value, 0, funcDesc.ArgNum)
for _, argType := range funcDesc.ArgsType {
// If it's a Flow type parameter, pass the value of flow
if isFlowType(argType) {
params = append(params, reflect.ValueOf(flow))
continue
}
// If it's a Context type parameter, pass the value of ctx
if isContextType(argType) {
params = append(params, reflect.ValueOf(ctx))
continue
}
// If it's a Slice type parameter, pass the value of flow.Input()
if isSliceType(argType) {
params = append(params, value)
continue
}
// If the parameter is neither Flow, Context, nor Slice type, give it the zero value
params = append(params, reflect.Zero(argType))
}
// Invoke the logic of the current function
retValues := funcDesc.FuncValue.Call(params)
// Retrieve the first return value; if nil, return nil
ret := retValues[0].Interface()
if ret == nil {
return nil
}
// If the return value is of error type, return the error
return retValues[0].Interface().(error)
}
log.Logger().ErrorFX(ctx, "FuncName: %s Cannot find in KisPool, Not Added.\n", fnName)
return errors.New("FuncName: " + fnName + " Cannot find in KisPool, Not Added.")
}
The overall scheduling logic of the function is roughly as follows:
First, use fnName to route to the corresponding FaaSDesc from fnRouter. Iterate over the parameter list of FaaSDesc:
Extract the Context and Flow objects, extract the custom slice parameters passed in, and if the parameter is neither Flow, Context, nor Slice type, give it the zero value as shown below:
params = append(params, reflect.Zero(argType))
Finally, execute the function scheduling:
retValues := funcDesc.FuncValue.Call(params)
Obtain the value of the first return value error; if it is nil, return nil, otherwise return the error type.
In this way, we have successfully established the self-describing scheduling mode for FaaS. With this functionality, what can KisFlow do? In the next section, we can serialize the custom parameter data types passed in when scheduling FaaSDesc to obtain the data types expected by the developer.
11.2 Custom Data Type Serialization for FaaS Parameters
11.2.1 Serialize Interface
First, let's define a data serialization interface. Create a file named serialize.go
under kis-flow/kis/
as follows:
kis-flow/kis/serialize.go
// Serialize Data Serialization Interface
type Serialize interface {
// UnMarshal is used to deserialize KisRowArr into a specified type value.
UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
// Marshal is used to serialize a specified type value into KisRowArr.
Marshal(interface{}) (common.KisRowArr, error)
}
Here, KisRowArr
is the data slice that we pass to each function in KisFlow, previously defined in kis-flow/common/data_type.go
:
package common
// KisRow represents a row of data
type KisRow interface{}
// KisRowArr represents a batch of data for a single business process
type KisRowArr []KisRow
/*
KisDataMap holds all data carried by the current flow
key : the Function ID where the data resides
value: the corresponding KisRow
*/
type KisDataMap map[string]KisRowArr
The Serialize
interface provides two methods:
-
UnMarshal
: Used to deserializeKisRowArr
into a specified type value. -
Marshal
: Used to serialize a specified type value intoKisRowArr
. KisFlow will provide a defaultSerialize
implementation for eachFaaS
function, but developers can also customize their own Serialize implementations to perform custom data serialization actions onFaaS
parameters.
11.2.2 Default Serialization in KisFlow
KisFlow
provides a default Serialize implementation, primarily in JSON format. Create a serialize folder under kis-flow/
, and then create a file named serialize_default.go
under kis-flow/serialize/
with the following code for serialization and deserialization:
kis-flow/serialize/serialize_default.go
package serialize
import (
"encoding/json"
"fmt"
"kis-flow/common"
"reflect"
)
type DefaultSerialize struct{}
// UnMarshal is used to deserialize KisRowArr into a specified type value.
func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
// Ensure the input type is a slice
if r.Kind() != reflect.Slice {
return reflect.Value{}, fmt.Errorf("r must be a slice")
}
slice := reflect.MakeSlice(r, 0, len(arr))
// Iterate over each element and attempt deserialization
for _, row := range arr {
var elem reflect.Value
var err error
// Attempt to assert as a struct or pointer
elem, err = unMarshalStruct(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
continue
}
// Attempt to directly deserialize a string
elem, err = unMarshalJsonString(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
continue
}
// Attempt to serialize to JSON and then deserialize
elem, err = unMarshalJsonStruct(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
} else {
return reflect.Value{}, fmt.Errorf("failed to decode row: %v", err)
}
}
return slice, nil
}
// Marshal is used to serialize a specified type value into KisRowArr (JSON serialization).
func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
var arr common.KisRowArr
switch reflect.TypeOf(i).Kind() {
case reflect.Slice, reflect.Array:
slice := reflect.ValueOf(i)
for i := 0; i < slice.Len(); i++ {
// Serialize each element to a JSON string and add it to the slice.
jsonBytes, err := json.Marshal(slice.Index(i).Interface())
if err != nil {
return nil, fmt.Errorf("failed to marshal element to JSON: %v", err)
}
arr = append(arr, string(jsonBytes))
}
default:
// If not a slice or array type, serialize the entire structure to a JSON string.
jsonBytes, err := json.Marshal(i)
if err != nil {
return nil, fmt.Errorf("failed to marshal element to JSON: %v", err)
}
arr = append(arr, string(jsonBytes))
}
return arr, nil
}
Some helper functions are defined as follows:
kis-flow/serialize/serialize_default.go
// Attempt to assert as a struct or pointer
func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// Check if row is a struct or struct pointer type
rowType := reflect.TypeOf(row)
if rowType == nil {
return reflect.Value{}, fmt.Errorf("row is nil pointer")
}
if rowType.Kind() != reflect.Struct && rowType.Kind() != reflect.Ptr {
return reflect.Value{}, fmt.Errorf("row must be a struct or struct pointer type")
}
// If row is a pointer type, get its underlying type
if rowType.Kind() == reflect.Ptr {
// Null pointer
if reflect.ValueOf(row).IsNil() {
return reflect.Value{}, fmt.Errorf("row is nil pointer")
}
// Dereference
row = reflect.ValueOf(row).Elem().Interface()
// Get the type after dereferencing
rowType = reflect.TypeOf(row)
}
// Check if row can be asserted to elemType (target type)
if !rowType.AssignableTo(elemType) {
return reflect.Value{}, fmt.Errorf("row type cannot be asserted to elemType")
}
// Convert row to reflect.Value and return
return reflect.ValueOf(row), nil
}
// Attempt to directly deserialize a string (deserialize JSON string to struct)
func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// Check if the source data can be asserted as a string
str, ok := row.(string)
if !ok {
return reflect.Value{}, fmt.Errorf("not a string")
}
// Create a new struct instance to store the deserialized value
elem := reflect.New(elemType).Elem()
// Attempt to deserialize the JSON string into the struct.
if err := json.Unmarshal([]byte(str), elem.Addr().Interface()); err != nil {
return reflect.Value{}, fmt.Errorf("failed to unmarshal string to struct: %v", err)
}
return elem, nil
}
// Attempt to serialize to JSON and then deserialize (convert struct to JSON string, then deserialize JSON string to struct)
func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// Serialize row to JSON string
jsonBytes, err := json.Marshal(row)
if err != nil {
return reflect.Value{}, fmt.Errorf("failed to marshal row to JSON: %v", err)
}
// Create a new struct instance to store the deserialized value
elem := reflect.New(elemType).Interface()
// Deserialize the JSON string into the struct
if err := json.Unmarshal(jsonBytes, elem); err != nil {
return reflect.Value{}, fmt.Errorf("failed to unmarshal JSON to element: %v", err)
}
return reflect.ValueOf(elem).Elem(), nil
}
-
UnMarshal()
: First checks if the parameter is a slice. If it is, it serializes each element in the slice. It first tries to deserialize usingunMarshalStruct()
, thenunMarshalJsonString()
, and finallyunMarshalJsonStruct()
if the previous attempts fail. -
Marshal()
: Serializes any type into a JSON binary string stored in KisRowArr.
Note: The current default serialization in KisFlow only implements JSON serialization. Developers can refer to
DefaultSerialize{}
to implement their own serialization and deserialization for other formats.
11.2.3 Default Serialize Instance
Define a global default serialization instance, defaultSerialize
, in the serialize
interface definition.
kis-flow/kis/serialize.go
// defaultSerialize is the default serialization implementation provided by KisFlow (developers can customize)
var defaultSerialize = &serialize.DefaultSerialize{}
Also, provide a method to check if a data type implements the Serialize interface:
kis-flow/kis/serialize.go
// isSerialize checks if the passed paramType implements the Serialize interface
func isSerialize(paramType reflect.Type) bool {
return paramType.Implements(reflect.TypeOf((*Serialize)(nil)).Elem())
}
11.2.4 Implementing the Serialize Interface for FaaSDesc
Next, we will extend FaaSDesc
to implement the Serialize
interface. When scheduling a FaaSDesc
, the input parameters passed to it will be serialized to obtain the corresponding specific type parameters. The definition is as follows:
kis-flow/kis/faas.go
// FaaSDesc describes the FaaS callback computation business function
type FaaSDesc struct {
// +++++++
Serialize // Serialization implementation for the data input and output of the current function
// +++++++
FnName string // Function name
f interface{} // FaaS function
fName string // Function name
ArgsType []reflect.Type // Collection of function parameter types
ArgNum int // Number of function parameters
FuncType reflect.Type // Function type
FuncValue reflect.Value // Function value (function address)
}
Then, in the constructor method NewFaaSDesc()
, add a check for custom parameters. Determine whether the passed custom parameters implement the two serialization interfaces of Serialize
. If they do, use the custom serialization interface; if not, use the default DefaultSerialize{}
instance.
kis-flow/kis/faas.go
// NewFaaSDesc creates an FaaSDesc description instance based on the registered FnName and FaaS callback function
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
// ++++++++++
// Input/output serialization instance
var serializeImpl Serialize
// ++++++++++
// ... ...
// ... ...
// Iterate over the parameter types of the FaaS
for i := 0; i < funcType.NumIn(); i++ {
// Get the type of the i-th formal parameter
paramType := funcType.In(i)
if isFlowType(paramType) {
// Check if it contains a parameter of type kis.Flow
containsKisFlow = true
} else if isContextType(paramType) {
// Check if it contains a parameter of type context.Context
containsCtx = true
} else if isSliceType(paramType) {
// Get the element type of the current parameter slice
itemType := paramType.Elem()
// If the current parameter is a pointer type, get the type pointed to by the pointer
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem() // Get the type pointed to by the pointer
}
// +++++++++++++++++++++++++++++
// Check if f implements the Serialize interface
if isSerialize(itemType) {
// If the current parameter implements the Serialize interface, use the serialization implementation of the current parameter
serializeImpl = reflect.New(itemType).Interface().(Serialize)
} else {
// If the current parameter does not implement the Serialize interface, use the default serialization implementation
serializeImpl = defaultSerialize // Use global default implementation
}
// +++++++++++++++++++++++++++++++
} else {
// Other types are not supported
}
// Append the current parameter type to the argsType collection
argsType[i] = paramType
}
// ... ...
// ... ...
// Return the FaaSDesc description instance
return &FaaSDesc{
Serialize: serializeImpl,
FnName: fnName,
f: f,
fName: fullName,
ArgsType: argsType,
ArgNum: len(argsType),
FuncType: funcType,
FuncValue: funcValue,
}, nil
}
11.2.5 Completing FaaS Data Serialization During Scheduling
Finally, when scheduling FaaSDesc, if it is a custom slice parameter, deserialize the raw data of flow.Input()
into the structure data required by the developer. Implement it as follows:
kis-flow/kis/pool.go
// CallFunction schedules the function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
if funcDesc, ok := pool.fnRouter[fnName]; ok {
// List of parameters for the scheduled function
params := make([]reflect.Value, 0, funcDesc.ArgNum)
for _, argType := range funcDesc.ArgsType {
// If it is a Flow type parameter, pass the value of flow
if isFlowType(argType) {
params = append(params, reflect.ValueOf(flow))
continue
}
// If it is a Context type parameter, pass the value of ctx
if isContextType(argType) {
params = append(params, reflect.ValueOf(ctx))
continue
}
// If it is a Slice type parameter, pass the value of flow.Input()
if isSliceType(argType) {
// +++++++++++++++++++
// Deserialize the raw data in flow.Input() into the data of argType type
value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
if err != nil {
log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam err=%v", err)
} else {
params = append(params, value)
continue
}
// +++++++++++++++++++
}
// If the passed parameter is neither Flow type, nor Context type, nor Slice type, give the default zero value
params = append(params, reflect.Zero(argType))
}
// Call the computation logic of the current function
retValues := funcDesc.FuncValue.Call(params)
// Get the first return value; if it is nil, return nil
ret := retValues[0].Interface()
if ret == nil {
return nil
}
// If the return value is of type error, return the error
return retValues[0].Interface().(error)
}
log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}
This completes the integration of data serialization with the FaaSDesc
module. Next, we will write a unit test to test this capability.
11.3 Unit Test for Custom Parameter Serialization
11.3.1 Definition of Flow and Function Configuration Files
For unit testing, we create two Function configurations as follows:
kis-flow/test/load_conf/func/func-avgStuScore.yml
kistype: func
fname: AvgStuScore
fmode: Calculate
source:
name: Student Average Score
must:
- stu_id
kis-flow/test/load_conf/func/func-PrintStuAvgScore.yml
kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
name: Student Average Score
must:
- stu_id
Next, we define a Flow to link the two functions together:
kis-flow/test/load_conf/flow/flow-StuAvg.yml
kistype: flow
status: 1
flow_name: StuAvg
flows:
- fname: AvgStuScore
- fname: PrintStuAvgScore
11.3.2 Definition of Custom Base Data Proto
In the kis-flow/test/
directory, create a proto/
folder and a custom base data proto for future data protocol reuse:
kis-flow/test/proto/stu_score.go
package proto
// Student Scores
type StuScores struct {
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
// Student's Average Score
type StuAvgScore struct {
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
11.3.3 Define Two FaaS Callback Functions
Define two FaaS functions: one to calculate a student's average score and one to print the student's average score:
kis-flow/test/faas/faas_stu_score_avg.go
package faas
import (
"context"
"kis-flow/kis"
"kis-flow/serialize"
"kis-flow/test/proto"
)
type AvgStuScoreIn struct {
serialize.DefaultSerialize
proto.StuScores
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
proto.StuAvgScore
}
// AvgStuScore(FaaS) calculates the student's average score
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
avgScore := proto.StuAvgScore{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// Submit the result data
_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
}
return nil
}
The AvgStuScore()
function is our improved FaaS function, where the third parameter rows []*AvgStuScoreIn
is a custom serialized parameter. Previously, we used flow.Input()
to get the raw data and then traversed it. Although this method still works, it requires developers to manually assert and judge in the FaaS function, which increases development costs. Now, developers can describe a parameter's data through AvgStuScoreIn and use rows to get the already serialized structure, improving code readability and reducing development costs.
The implementation for printing the average score FaaS is as follows:
kis-flow/test/faas/faas_stu_score_avg_print.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
"kis-flow/serialize"
"kis-flow/test/proto"
)
type PrintStuAvgScoreIn struct {
serialize.DefaultSerialize
proto.StuAvgScore
}
type PrintStuAvgScoreOut struct {
serialize.DefaultSerialize
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
for _, row := range rows {
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
}
return nil
}
Similar to the previous function, we use custom input parameters for logic development.
11.3.4 Unit Test Case
Next, we write a test case for the above Flow:
kis-flow/test/kis_auto_inject_param_test.go
package test
import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/file"
"kis-flow/flow"
"kis-flow/kis"
"kis-flow/test/faas"
"kis-flow/test/proto"
"testing"
)
func TestAutoInjectParamWithConfig(t *testing.T) {
ctx := context.Background()
kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore)
kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore)
// 1. Load the configuration files and build the Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
panic(err)
}
// 2. Get the Flow
flow1 := kis.Pool().GetFlow("StuAvg")
if flow1 == nil {
panic("flow1 is nil")
}
// 3. Submit raw data
_ = flow1.CommitRow(&faas.AvgStuScoreIn{
StuScores: proto.StuScores{
StuId: 100,
Score1: 1,
Score2: 2,
Score3: 3,
},
})
_ = flow1.CommitRow(faas.AvgStuScoreIn{
StuScores: proto.StuScores{
StuId: 100,
Score1: 1,
Score2: 2,
Score3: 3,
},
})
// Submit raw data (JSON string)
_ = flow1.CommitRow(`{"stu_id":101}`)
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
When submitting raw data, we use the default serialization method, which supports JSON deserialization. In CommitRow()
, we submit three pieces of data: the first two are structure data, and the last one is a JSON string. All of them are supported.
Navigate to kis-flow/test/
and execute:
$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig
The result is as follows:
$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig
...
...
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
...
...
Add FlowRouter FlowName=StuAvg
context.Background
====> After CommitSrcData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]]
KisFunctionC, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023af80 ThisFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]] inPut:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
context.Background
====> After commitCurData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]]
KisFunctionE, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023b000 ThisFunctionId:func-7f308d00f4fa49488760ff1dfb85dc46 PrevFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]] inPut:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
stuid: [100], avg score: [2]
stuid: [100], avg score: [2]
stuid: [101], avg score: [0]
--- PASS: TestAutoInjectParamWithConfig (0.01s)
PASS
ok kis-flow/test 0.030s
11.4 [V1.0] Source Code
https://github.com/aceld/kis-flow/releases/tag/v1.0
Author: Aceld
GitHub: https://github.com/aceld
KisFlow Open Source Project Address: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (0)