Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
9.1 Multi-Replica Capability
If KisFlow needs to be used concurrently by multiple Goroutines while executing flows, it may require creating multiple flows with the same configuration to match multiple concurrent computation flows. Therefore, Flow needs the ability to create replicas. This chapter will implement this capability.
9.1.1 Adding Fork Interface to Flow
First, add a new interface Fork()
to the Flow abstraction layer with the following prototype:
kis-flow/kis/flow.go
type Flow interface {
// Run schedules the Flow, sequentially dispatching and executing the Functions within the Flow
Run(ctx context.Context) error
// Link connects the Functions within the Flow according to the configuration file
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow submits data to be executed to the Function layer of the Flow
CommitRow(row interface{}) error
// Input gets the input source data of the currently executing Function in the Flow
Input() common.KisRowArr
// GetName gets the name of the Flow
GetName() string
// GetThisFunction gets the currently executing Function
GetThisFunction() Function
// GetThisFuncConf gets the configuration of the currently executing Function
GetThisFuncConf() *config.KisFuncConfig
// GetConnector gets the Connector of the currently executing Function
GetConnector() (Connector, error)
// GetConnConf gets the configuration of the Connector of the currently executing Function
GetConnConf() (*config.KisConnConfig, error)
// GetConfig gets the configuration of the current Flow
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName gets the configuration of the Function by name in the current Flow
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next performs the Action for the next Function in the Flow
Next(acts ...ActionFunc) error
// GetCacheData gets the cached data of the current Flow
GetCacheData(key string) interface{}
// SetCacheData sets the cached data of the current Flow
SetCacheData(key string, value interface{}, Exp time.Duration)
// GetMetaData gets the temporary data of the current Flow
GetMetaData(key string) interface{}
// SetMetaData sets the temporary data of the current Flow
SetMetaData(key string, value interface{})
// GetFuncParam gets a key-value pair of the default parameters for the currently executing Function in the Flow
GetFuncParam(key string) string
// GetFuncParamAll gets all key-value pairs of the default parameters for the currently executing Function in the Flow
GetFuncParamAll() config.FParam
// +++++++++++++++++++++++++
// Fork gets a deep copy of the Flow
Fork(ctx context.Context) Flow
}
Fork()
will clone a new KisFlow instance based on an existing KisFlow instance, creating a resource-isolated but identically configured KisFlow instance. The implementation method is as follows:
kis-flow/flow/kis_flow.go
// Fork gets a deep copy of the Flow
func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
config := flow.Conf
// Create a new Flow using the previous configuration
newFlow := NewKisFlow(config)
for _, fp := range flow.Conf.Flows {
if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok {
// The current Function has no Params configured
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil)
} else {
// The current Function has Params configured
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params)
}
}
log.Logger().DebugFX(ctx, "=====>Flow Fork, oldFlow.funcParams = %+v\n", flow.funcParams)
log.Logger().DebugFX(ctx, "=====>Flow Fork, newFlow.funcParams = %+v\n", newFlow.GetFuncParamsAllFuncs())
return newFlow
}
In Fork()
, a new KisFlow instance is created based on the configuration information of the existing flow, and the associated Params and other configuration information are copied along with it. Finally, the newly created Functions are linked to the new Flow using Link()
.
To facilitate debugging, a new interface GetFuncParamsAllFuncs()
has been added to the Flow to print the information of all FuncParams
. The implementation method is as follows:
kis-flow/kis/flow.go
type Flow interface {
// ... ...
// ... ...
// GetFuncParamsAllFuncs retrieves all key-value pairs of the FuncParams for all Functions in the Flow
GetFuncParamsAllFuncs() map[string]config.FParam
// ... ...
}
kis-flow/flow/kis_flow_data.go
// GetFuncParamsAllFuncs retrieves all key-value pairs of the FuncParams for all Functions in the Flow
func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
return flow.funcParams
}
9.2 Unit Testing
Next, we will test the Fork capability with the following unit test code:
kis-flow/test/kis_fork_test.go
func TestForkFlow(t *testing.T) {
ctx := context.Background()
// 0. Register Function callback businesses
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. Register ConnectorInit and Connector callback businesses
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. Load configuration files and build Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. Get Flow
flow1 := kis.Pool().GetFlow("flowName1")
flow1Clone1 := flow1.Fork(ctx)
// 3. Submit original data
_ = flow1Clone1.CommitRow("This is Data1 from Test")
_ = flow1Clone1.CommitRow("This is Data2 from Test")
_ = flow1Clone1.CommitRow("This is Data3 from Test")
// 4. Execute flow1
if err := flow1Clone1.Run(ctx); err != nil {
panic(err)
}
}
First, we create an instance of the flow flowName1
and then get flowClone1
using Fork()
. After that, we execute the scheduling process of flowClone1
.
Navigate to the kis-flow/test/
directory and execute:
go test -test.v -test.paniconexit0 -test.run TestForkFlow
The result is as follows:
=== RUN TestForkFlow
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode=Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
context.Background
=====>Flow Fork, oldFlow.funcParams = map[func-6b00f430fe494302a384c2ae09eb019c:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-bf9df5fc16684200b78f32985d073012:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2] func-c0f1ae9850174f81b994a2e98fb34109:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]]
=====>Flow Fork, newFlow.funcParams = map[func-6b00f430fe494302a384c2ae09eb019c:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-bf9df5fc16684200b78f32985d073012:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2] func-c0f1ae9850174f81b994a2e98fb34109:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]]
===>FaaS Demo1, row.Data = This is Data1 from Test
===>FaaS Demo1, row.Data = This is Data2 from Test
===>FaaS Demo1, row.Data = This is Data3 from Test
===>FaaS Demo2, row.Data = This is Data1 from Test
===>FaaS Demo2, row.Data = This is Data2 from Test
===>FaaS Demo2, row.Data = This is Data3 from Test
===>FaaS Demo3, row.Data = This is Data1 from Test
===>FaaS Demo3, row.Data = This is Data2 from Test
===>FaaS Demo3, row.Data = This is Data3 from Test
--- PASS: TestForkFlow (0.01s)
PASS
ok command-line-arguments (cached)
Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
To be continued.
8.1 Flow Cache - Data Stream Caching
KisFlow also provides shared caching in stream computing, using a simple local cache for developers to use as needed. For third-party local cache technology dependencies, refer to: https://github.com/patrickmn/go-cache.
8.1.1 go-cache
(1) Installation
go get github.com/patrickmn/go-cache
(2) Usage
import (
"fmt"
"github.com/patrickmn/go-cache"
"time"
)
func main() {
// Create a cache with a default expiration time of 5 minutes, and which
// purges expired items every 10 minutes
c := cache.New(5*time.Minute, 10*time.Minute)
// Set the value of the key "foo" to "bar", with the default expiration time
c.Set("foo", "bar", cache.DefaultExpiration)
// Set the value of the key "baz" to 42, with no expiration time
// (the item won't be removed until it is re-set, or removed using
// c.Delete("baz")
c.Set("baz", 42, cache.NoExpiration)
// Get the string associated with the key "foo" from the cache
foo, found := c.Get("foo")
if found {
fmt.Println(foo)
}
// Since Go is statically typed, and cache values can be anything, type
// assertion is needed when values are being passed to functions that don't
// take arbitrary types, (i.e. interface{}). The simplest way to do this for
// values which will only be used once--e.g. for passing to another
// function--is:
foo, found := c.Get("foo")
if found {
MyFunction(foo.(string))
}
// This gets tedious if the value is used several times in the same function.
// You might do either of the following instead:
if x, found := c.Get("foo"); found {
foo := x.(string)
// ...
}
// or
var foo string
if x, found := c.Get("foo"); found {
foo = x.(string)
}
// ...
// foo can then be passed around freely as a string
// Want performance? Store pointers!
c.Set("foo", &MyStruct, cache.DefaultExpiration)
if x, found := c.Get("foo"); found {
foo := x.(*MyStruct)
// ...
}
}
For detailed reference: https://github.com/patrickmn/go-cache
8.1.2 KisFlow Integration with go-cache
(1) Flow Provides Abstract Interface
Flow provides interfaces for cache operations as follows:
kis-flow/kis/flow.go
type Flow interface {
// Run schedules the Flow, sequentially scheduling and executing Functions within the Flow
Run(ctx context.Context) error
// Link connects Functions within the Flow according to the configuration file
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow submits Flow data to the Function layer about to be executed
CommitRow(row interface{}) error
// Input gets the input source data for the currently executing Function in the Flow
Input() common.KisRowArr
// GetName gets the name of the Flow
GetName() string
// GetThisFunction gets the currently executing Function
GetThisFunction() Function
// GetThisFuncConf gets the configuration of the currently executing Function
GetThisFuncConf() *config.KisFuncConfig
// GetConnector gets the Connector of the currently executing Function
GetConnector() (Connector, error)
// GetConnConf gets the configuration of the Connector for the currently executing Function
GetConnConf() (*config.KisConnConfig, error)
// GetConfig gets the configuration of the current Flow
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName gets the configuration of the Function by its name
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next advances the currently executing Function to the next Function with specified Action
Next(acts ...ActionFunc) error
// ++++++++++++++++++++++++++++++++++++++++
// GetCacheData gets the cache data of the current Flow
GetCacheData(key string) interface{}
// SetCacheData sets the cache data of the current Flow
SetCacheData(key string, value interface{}, Exp time.Duration)
}
SetCacheData()
sets the local cache, with Exp as the expiration time. If Exp is 0, it is permanent.
GetCacheData()
reads the local cache.
(2) Providing Constants
Provide some constants related to cache expiration time.
kis-flow/common/const.go
// cache
const (
// DeFaultFlowCacheCleanUp is the default cache cleanup interval for Flow objects in KisFlow, in minutes
DeFaultFlowCacheCleanUp = 5 // in minutes
// DefaultExpiration is the default GoCache time, permanently saved
DefaultExpiration time.Duration = 0
)
(3) Adding and Initializing Members in KisFlow
kis-flow/flow/kis_flow.go
// KisFlow represents the context environment for stream computing
type KisFlow struct {
// ... ...
// ... ...
// Local cache for the flow
cache *cache.Cache // Temporary cache context environment for Flow
}
// NewKisFlow creates a new KisFlow
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
flow := new(KisFlow)
// ... ...
// ... ...
// Initialize local cache
flow.cache = cache.New(cache.NoExpiration, common.DeFaultFlowCacheCleanUp*time.Minute)
return flow
}
(4) Implementing the Interface
Finally, implement the two interfaces for cache read and write operations as follows:
kis-flow/flow/kis_flow_data.go
func (flow *KisFlow) GetCacheData(key string) interface{} {
if data, found := flow.cache.Get(key); found {
return data
}
return nil
}
func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) {
if Exp == common.DefaultExpiration {
flow.cache.Set(key, value, cache.DefaultExpiration)
} else {
flow.cache.Set(key, value, Exp)
}
}
8.2 MetaData Temporary Cache Parameters
MetaData is defined as a map[string]interface{}
structure available at each level of Flow, Function, and Connector to store temporary data. The lifespan of this data is consistent with the lifespan of each instance.
8.2.1 Adding MetaData to Flow
First, add the metaData map[string]interface{}
member and corresponding read-write lock to KisFlow.
kis-flow/flow/kis_flow.go
// KisFlow represents the context environment throughout the entire stream computing
type KisFlow struct {
// ... ...
// ... ...
// +++++++++++++++++++++++++++++++++++++++++++
// metaData for the flow
metaData map[string]interface{} // Custom temporary data for Flow
mLock sync.RWMutex // Read-write lock to manage metaData
}
Also, initialize the metaData
member in the KisFlow constructor as follows:
kis-flow/flow/kis_flow.go
// NewKisFlow creates a KisFlow
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
flow := new(KisFlow)
// ... ...
// ... ...
// ++++++++++++++++++++++++++++++++++++++
// Initialize temporary data
flow.metaData = make(map[string]interface{})
return flow
}
Next, add the read and write interfaces for MetaData to the Flow as follows:
kis-flow/kis/flow.go
type Flow interface {
// Run schedules the Flow, sequentially scheduling and executing Functions within the Flow
Run(ctx context.Context) error
// Link connects Functions within the Flow according to the configuration file
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow submits Flow data to the Function layer about to be executed
CommitRow(row interface{}) error
// Input gets the input source data for the currently executing Function in the Flow
Input() common.KisRowArr
// GetName gets the name of the Flow
GetName() string
// GetThisFunction gets the currently executing Function
GetThisFunction() Function
// GetThisFuncConf gets the configuration of the currently executing Function
GetThisFuncConf() *config.KisFuncConfig
// GetConnector gets the Connector of the currently executing Function
GetConnector() (Connector, error)
// GetConnConf gets the configuration of the Connector for the currently executing Function
GetConnConf() (*config.KisConnConfig, error)
// GetConfig gets the configuration of the current Flow
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName gets the configuration of the Function by its name
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next advances the currently executing Function to the next Function with specified Action
Next(acts ...ActionFunc) error
// GetCacheData gets the cache data of the current Flow
GetCacheData(key string) interface{}
// SetCacheData sets the cache data of the current Flow
SetCacheData(key string, value interface{}, Exp time.Duration)
// ++++++++++++++++++++++++++++
// GetMetaData gets the temporary data of the current Flow
GetMetaData(key string) interface{}
// SetMetaData sets the temporary data of the current Flow
SetMetaData(key string, value interface{})
}
Define the GetMetaData()
and SetMetaData()
interfaces for reading and writing respectively. Finally, implement these interfaces as follows:
kis-flow/flow/kis_flow_data.go
// GetMetaData retrieves the temporary data of the current Flow object
func (flow *KisFlow) GetMetaData(key string) interface{} {
flow.mLock.RLock()
defer flow.mLock.RUnlock()
data, ok := flow.metaData[key]
if !ok {
return nil
}
return data
}
// SetMetaData sets the temporary data of the current Flow object
func (flow *KisFlow) SetMetaData(key string, value interface{}) {
flow.mLock.Lock()
defer flow.mLock.Unlock()
flow.metaData[key] = value
}
8.2.2 Adding MetaData to Function
First, add the metaData
member to BaseFunction
as follows:
kis-flow/function/kis_base_function.go
type BaseFunction struct {
// Id, KisFunction instance ID, used to distinguish different instance objects within KisFlow
Id string
Config *config.KisFuncConfig
// flow
flow kis.Flow // Context environment KisFlow
// connector
connector kis.Connector
// ++++++++++++++++++++++++
// Custom temporary data for Function
metaData map[string]interface{}
// Read-write lock to manage metaData
mLock sync.RWMutex
// link
N kis.Function // Next stream computing Function
P kis.Function // Previous stream computing Function
}
In the Function constructor, each specific Function needs a constructor to initialize the metaData
member. The changes are as follows:
kis-flow/function/kis_base_function.go
func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
var f kis.Function
// Factory produces generalized objects
// ++++++++++++++
switch common.KisMode(config.FMode) {
case common.V:
f = NewKisFunctionV() // +++
case common.S:
f = NewKisFunctionS() // +++
case common.L:
f = NewKisFunctionL() // +++
case common.C:
f = NewKisFunctionC() // +++
case common.E:
f = NewKisFunctionE() // +++
default:
// LOG ERROR
return nil
}
// Generate random unique instance ID
f.CreateId()
// Set basic information properties
if err := f.SetConfig(config); err != nil {
panic(err)
}
// Set Flow
if err := f.SetFlow(flow); err != nil {
panic(err)
}
return f
}
Each constructor is as follows:
kis-flow/function/kis_function_c.go
func NewKisFunctionC() kis.Function {
f := new(KisFunctionC)
// Initialize metaData
f.metaData = make(map[string]interface{})
return f
}
kis-flow/function/kis_function_v.go
func NewKisFunctionV() kis.Function {
f := new(KisFunctionV)
// Initialize metaData
f.metaData = make(map[string]interface{})
return f
}
kis-flow/function/kis_function_e.go
func NewKisFunctionE() kis.Function {
f := new(KisFunctionE)
// Initialize metaData
f.metaData = make(map[string]interface{})
return f
}
kis-flow/function/kis_function_s.go
func NewKisFunctionS() kis.Function {
f := new(KisFunctionS)
// Initialize metaData
f.metaData = make(map[string]interface{})
return f
}
kis-flow/function/kis_function_l.go
func NewKisFunctionL() kis.Function {
f := new(KisFunctionL)
// Initialize metaData
f.metaData = make(map[string]interface{})
return f
}
Next, add interfaces to access the metaData member in the Function abstraction layer as follows:
type Function interface {
// Call executes the stream computing logic
Call(ctx context.Context, flow Flow) error
// SetConfig configures the strategy for the current Function instance
SetConfig(s *config.KisFuncConfig) error
// GetConfig retrieves the configuration strategy of the current Function instance
GetConfig() *config.KisFuncConfig
// SetFlow sets the Flow instance that the current Function instance depends on
SetFlow(f Flow) error
// GetFlow retrieves the Flow instance that the current Function instance depends on
GetFlow() Flow
// AddConnector adds a Connector to the current Function instance
AddConnector(conn Connector) error
// GetConnector retrieves the Connector associated with the current Function instance
GetConnector() Connector
// CreateId generates a random instance KisID for the current Function instance
CreateId()
// GetId retrieves the FID of the current Function
GetId() string
// GetPrevId retrieves the FID of the previous Function node of the current Function
GetPrevId() string
// GetNextId retrieves the FID of the next Function node of the current Function
GetNextId() string
// Next returns the next layer computing stream Function, or nil if it is the last layer
Next() Function
// Prev returns the previous layer computing stream Function, or nil if it is the last layer
Prev() Function
// SetN sets the next Function instance
SetN(f Function)
// SetP sets the previous Function instance
SetP(f Function)
// ++++++++++++++++++++++++++++++++++
// GetMetaData retrieves the temporary data of the current Function
GetMetaData(key string) interface{}
// SetMetaData sets the temporary data of the current Function
SetMetaData(key string, value interface{})
}
Implement the above two interfaces in the BaseFunction.
kis-flow/function/kis_base_function.go
// GetMetaData retrieves the temporary data of the current Function
func (base *BaseFunction) GetMetaData(key string) interface{} {
base.mLock.RLock()
defer base.mLock.RUnlock()
data, ok := base.metaData[key]
if !ok {
return nil
}
return data
}
// SetMetaData sets the temporary data of the current Function
func (base *BaseFunction) SetMetaData(key string, value interface{}) {
base.mLock.Lock()
defer base.mLock.Unlock()
base.metaData[key] = value
}
8.2.3 Adding MetaData to Connector
First, add the metaData
member to KisConnector
as follows:
kis-flow/conn/kis_connector.go
type KisConnector struct {
// Connector ID
CId string
// Connector Name
CName string
// Connector Config
Conf *config.KisConnConfig
// Connector Init
onceInit sync.Once
// ++++++++++++++
// Custom temporary data for KisConnector
metaData map[string]interface{}
// Read-write lock to manage metaData
mLock sync.RWMutex
}
// NewKisConnector creates a KisConnector based on the configuration strategy
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
conn := new(KisConnector)
conn.CId = id.KisID(common.KisIdTypeConnector)
conn.CName = config.CName
conn.Conf = config
// +++++++++++++++++++++++++++++++++++
conn.metaData = make(map[string]interface{})
return conn
}
Initialize metaData
in the constructor.
Next, add interfaces to access and set MetaData in the Connector abstraction layer as follows:
kis-flow/kis/connector.go
type Connector interface {
// Init initializes the links of the storage engine associated with the Connector
Init() error
// Call invokes the read and write operations of the external storage logic of the Connector
Call(ctx context.Context, flow Flow, args interface{}) error
// GetId retrieves the ID of the Connector
GetId() string
// GetName retrieves the name of the Connector
GetName() string
// GetConfig retrieves the configuration information of the Connector
GetConfig() *config.KisConnConfig
// GetMetaData retrieves the temporary data of the current Connector
// +++++++++++++++++++++++++++++++
GetMetaData(key string) interface{}
// SetMetaData sets the temporary data of the current Connector
SetMetaData(key string, value interface{})
}
Finally, implement the above two interfaces in KisConnector
as follows:
kis-flow/conn/kis_connector.go
// GetMetaData retrieves the temporary data of the current Connector
func (conn *KisConnector) GetMetaData(key string) interface{} {
conn.mLock.RLock()
defer conn.mLock.RUnlock()
data, ok := conn.metaData[key]
if !ok {
return nil
}
return data
}
// SetMetaData sets the temporary data of the current Connector
func (conn *KisConnector) SetMetaData(key string, value interface{}) {
conn.mLock.Lock()
defer conn.mLock.Unlock()
conn.metaData[key] = value
}
8.3 Configuration File Parameters
KisFlow allows developers to define default parameters (Params) for configuring Flow, Function, Connector, etc., in the configuration file. Here are some examples:
Function:
kistype: func
fname: funcName1
fmode: Verify
source:
name: Official Account Douyin Mall Order Data
must:
- order_id
- user_id
option:
default_params:
default1: funcName1_param1
default2: funcName1_param2
Flow:
kistype: flow
status: 1
flow_name: flowName1
flows:
- fname: funcName1
params:
myKey1: flowValue1-1
myKey2: flowValue1-2
- fname: funcName2
params:
myKey1: flowValue2-1
myKey2: flowValue2-2
- fname: funcName3
params:
myKey1: flowValue3-1
myKey2: flowValue3-2
Connector:
kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
args1: value1
args2: value2
load: null
save:
- funcName2
Developers can provide Params for each defined module. Params provided in Flow will also be added to the Functions.
In the previous steps, we already read these parameters into each module's memory, but we did not expose an interface for developers.
8.3.1 Adding Param Retrieval Interface to Flow
First, we provide an interface for Flow to query Params:
kis-flow/kis/flow.go
type Flow interface {
// ... ...
// ... ...
// GetFuncParam retrieves a key-value pair of the default parameters for the currently executing Function in the Flow
GetFuncParam(key string) string
// GetFuncParamAll retrieves all key-value pairs of the default parameters for the currently executing Function in the Flow
GetFuncParamAll() config.FParam
}
Implementation:
kis-flow/flow/kis_flow_data.go
// GetFuncParam retrieves a key-value pair of the default parameters for the currently executing Function in the Flow
func (flow *KisFlow) GetFuncParam(key string) string {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
if param, ok := flow.funcParams[flow.ThisFunctionId]; ok {
if value, vok := param[key]; vok {
return value
}
}
return ""
}
// GetFuncParamAll retrieves all key-value pairs of the default parameters for the currently executing Function in the Flow
func (flow *KisFlow) GetFuncParamAll() config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
param, ok := flow.funcParams[flow.ThisFunctionId]
if !ok {
return nil
}
return param
}
GetFuncParam()
and GetFuncParamAll()
retrieve a single key or all parameters respectively, but both fetch the Params for the currently executing Function.
8.3.2 Unit Testing
We add some parameters to each Function in flowName1
.
kis-flow/test/load_conf/flow-FlowName1.yml
kistype: flow
status: 1
flow_name: flowName1
flows:
- fname: funcName1
params:
myKey1: flowValue1-1
myKey2: flowValue1-2
- fname: funcName2
params:
myKey1: flowValue2-1
myKey2: flowValue2-2
- fname: funcName3
params:
myKey1: flowValue3-1
myKey2: flowValue3-2
Then configure some default custom parameters for each associated Function:
kis-flow/test/load_conf/func/func-FuncName1.yml
kistype: func
fname: funcName1
fmode: Verify
source:
name: Official Account Douyin Mall Order Data
must:
- order_id
- user_id
option:
default_params:
default1: funcName1_param1
default2: funcName1_param2
kis-flow/test/load_conf/func/func-FuncName2.yml
kistype: func
fname: funcName2
fmode: Save
source:
name: User Order Error Rate
must:
- order_id
- user_id
option:
cname: ConnName1
default_params:
default1: funcName2_param1
default2: funcName2_param2
kis-flow/test/load_conf/func/func-FuncName3.yml
kistype: func
fname: funcName3
fmode: Calculate
source:
name: User Order Error Rate
must:
- order_id
- user_id
option:
default_params:
default1: funcName3_param1
default2: funcName3_param2
We also configure some Param parameters for the Connector associated with FuncName2
:
kis-flow/test/load_conf/conn/conn-ConnName1.yml
kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
args1: value1
args2: value2
load: null
save:
- funcName2
To verify that our configuration parameters can be correctly retrieved during the execution of Functions, we modified each Function and Connector business function to print their Params:
kis-flow/test/faas/faas_demo1.go
func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName1Handler ----")
// ++++++++++++++++
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for index, row := range flow.Input() {
// Print data
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
// Calculate result data
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// Commit result data
_ = flow.CommitRow(resultStr)
}
return nil
}
kis-flow/test/faas/faas_demo2.go
func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName2Handler ----")
// ++++++++++++++++
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for index, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
conn, err := flow.GetConnector()
if err != nil {
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error())
return err
}
if conn.Call(ctx, flow, row) != nil {
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
return err
}
// Calculate result data
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// Commit result data
_ = flow.CommitRow(resultStr)
}
return nil
}
kis-flow/test/faas/faas_demo3.go
func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName3Handler ----")
// ++++++++++++++++
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return nil
}
kis-flow/test/caas/caas_demo1.go
func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)
// +++++++++++
fmt.Printf("Params = %+v\n", conn.GetConfig().Params)
fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)
return nil
}
Finally, we write the unit test cases:
kis-flow/test/kis_params_test.go
package test
import (
"context"
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestParams(t *testing.T) {
ctx := context.Background()
// 0. Register Function callback businesses
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. Register ConnectorInit and Connector callback businesses
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. Load configuration files and build Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. Get Flow
flow1 := kis.Pool().GetFlow("flowName1")
// 3. Submit original data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
Navigate to the kis-flow/test/
directory and execute:
go test -test.v -test.paniconexit0 -test.run TestParams
=== RUN TestParams
....
....
---> Call funcName1Handler ----
Params = map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]
...
...
---> Call funcName2Handler ----
Params = map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...
---> Call funcName3Handler ----
Params = map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2]
...
...
--- PASS: TestParams (0.01s)
PASS
ok kis-flow/test 0.433s
As we can see, we can now correctly retrieve the Params configuration parameters at each level.
8.4 [V0.7] Source Code
https://github.com/aceld/kis-flow/releases/tag/v0.7
Author: Aceld
GitHub: https://github.com/aceld
KisFlow Open Source Project Address: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (0)