DEV Community

Aceld
Aceld

Posted on

(Part 6)Golang Framework Hands-on - KisFlow Stream Computing Framework-Configuration Import and Export

#go

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export

To be continued.


6.1 Configuration Import

Currently, establishing a Flow and Functions requires a series of cumbersome additions each time, which is not very convenient. Next, we can construct the structure of KisFlow and also export the structure of KisFlow to local files by batch reading and writing configuration files. Currently, we will use file format for configuration persistence, but developers can also choose to do persistence with databases or remote configurations in the future.

6.1.1 Creating Configuration Files

First, we create the necessary configuration files for KisFlow business under kis-flow/test/load_conf/. Within kis-flow/test/load_conf/, we create three folders: conn/, flow/, and func/ to respectively store the configuration information for Connector, Flow, and Function.

├── conn
│   └── conn-ConnName1.yml
├── flow
│   └── flow-FlowName1.yml
└── func
    ├── func-FuncName1.yml
    ├── func-FuncName2.yml
    └── func-FuncName3.yml
Enter fullscreen mode Exit fullscreen mode

Create some yml files within these folders. The specific content is as follows:

A. Function

kis-flow/test/load_conf/func/func-FuncNam1.yml

kistype: func
fname: funcName1
fmode: Verify
source:
  name: Public account Douyin mall user order data
  must:
    - order_id
    - user_id
Enter fullscreen mode Exit fullscreen mode

kis-flow/test/load_conf/func/func-FuncNam2.yml

kistype: func
fname: funcName2
fmode: Save
source:
  name: User order error rate
  must:
    - order_id
    - user_id
option:
  cname: ConnName1
Enter fullscreen mode Exit fullscreen mode

kis-flow/test/load_conf/func/func-FuncNam3.yml

kistype: func
fname: funcName3
fmode: Calculate
source:
  name: User order error rate
  must:
    - order_id
    - user_id
Enter fullscreen mode Exit fullscreen mode

B. Connector

kis-flow/test/load_conf/func/func-ConnName1.yml

kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
  args1: value1
  args2: value2
load: null
save:
  - funcName2
Enter fullscreen mode Exit fullscreen mode

C. Flow

kis-flow/test/load_conf/func/func-FlowName1.yml

kistype: flow
status: 1
flow_name: flowName1
flows:
  - fname: funcName1
  - fname: funcName2
  - fname: funcName3

Enter fullscreen mode Exit fullscreen mode

6.1.2 Configuration Parsing

Create the kis-flow/file/ directory and the kis-flow/file/config_import.go file within it.

First, define an interface allConfig that can hold all configurations:

kis-flow/file/config_import.go

type allConfig struct {
    Flows map[string]*config.KisFlowConfig
    Funcs map[string]*config.KisFuncConfig
    Conns map[string]*config.KisConnConfig
}
Enter fullscreen mode Exit fullscreen mode

Where the key serves as the Name field for each module.
Next, define methods for parsing Flow, Function, and Connector configurations. For YAML parsing, we will use the "gopkg.in/yaml.v3" library.

A. Flow Configuration Parsing

kis-flow/file/config_import.go

// kisTypeFlowConfigure parses Flow configuration files in YAML format
func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
    flow := new(config.KisFlowConfig)
    if ok := yaml.Unmarshal(confData, flow); ok != nil {
        return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
    }

    // Do not load Flow configuration if the status is disabled
    if common.KisOnOff(flow.Status) == common.FlowDisable {
        return nil
    }

    if _, ok := all.Flows[flow.FlowName]; ok {
        return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName))
    }

    // Add the configuration to the collection
    all.Flows[flow.FlowName] = flow

    return nil
}
Enter fullscreen mode Exit fullscreen mode
  • confData: the file's binary data
  • fileName: the file path
  • kistype: the configuration file type

kisTypeFlowConfigure will parse the configuration information into the Flows member of allConfig.
Similarly, the methods for Function and Connector parsing are as follows.

B. Function Configuration Parsing

kis-flow/file/config_import.go

// kisTypeFuncConfigure parses Function configuration files in YAML format
func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
    function := new(config.KisFuncConfig)
    if ok := yaml.Unmarshal(confData, function); ok != nil {
        return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
    }
    if _, ok := all.Funcs[function.FName]; ok {
        return errors.New(fmt.Sprintf("%s set repeat function_id:%s", fileName, function.FName))
    }

    // Add the configuration to the collection
    all.Funcs[function.FName] = function

    return nil
}
Enter fullscreen mode Exit fullscreen mode

C. Connector Configuration Parsing

kis-flow/file/config_import.go

// kisTypeConnConfigure parses Connector configuration files in YAML format
func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
    conn := new(config.KisConnConfig)
    if ok := yaml.Unmarshal(confData, conn); ok != nil {
        return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType))
    }

    if _, ok := all.Conns[conn.CName]; ok {
        return errors.New(fmt.Sprintf("%s set repeat conn_id:%s", fileName, conn.CName))
    }

    // Add the configuration to the collection
    all.Conns[conn.CName] = conn

    return nil
}

Enter fullscreen mode Exit fullscreen mode

6.1.3 Traversing Files

Below is an implementation to traverse all yml and yaml type files under a given path loadPath and parse the configuration information into allConfig according to the kistype category.

kis-flow/file/config_import.go

// parseConfigWalkYaml parses all configuration files in YAML format and stores the configuration information in allConfig
func parseConfigWalkYaml(loadPath string) (*allConfig, error) {

    all := new(allConfig)

    all.Flows = make(map[string]*config.KisFlowConfig)
    all.Funcs = make(map[string]*config.KisFuncConfig)
    all.Conns = make(map[string]*config.KisConnConfig)

    err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error {
        // Validate file extension
        if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" {
            return nil
        }

        // Read file content
        confData, err := ioutil.ReadFile(filePath)
        if err != nil {
            return err
        }

        confMap := make(map[string]interface{})

        // Validate YAML format
        if err := yaml.Unmarshal(confData, confMap); err != nil {
            return err
        }

        // Check if kisType exists
        if kisType, ok := confMap["kistype"]; !ok {
            return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath))
        } else {
            switch kisType {
            case common.KisIdTypeFlow:
                return kisTypeFlowConfigure(all, confData, filePath, kisType)

            case common.KisIdTypeFunction:
                return kisTypeFuncConfigure(all, confData, filePath, kisType)

            case common.KisIdTypeConnnector:
                return kisTypeConnConfigure(all, confData, filePath, kisType)

            default:
                return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))
            }
        }
    })

    if err != nil {
        return nil, err
    }

    return all, nil
}

Enter fullscreen mode Exit fullscreen mode

6.1.4 Import Method

Below is a public method ConfigImportYaml that imports files from a given root path.

kis-flow/file/config_import.go

// ConfigImportYaml parses all configuration files in YAML format
func ConfigImportYaml(loadPath string) error {

    all, err := parseConfigWalkYaml(loadPath)
    if err != nil {
        return err
    }

    for flowName, flowConfig := range all.Flows {

        // Build a Flow
        newFlow := flow.NewKisFlow(flowConfig)

        for _, fp := range flowConfig.Flows {
            if err := buildFlow(all, fp, newFlow, flowName); err != nil {
                return err
            }
        }

        // Add the flow to the FlowPool
        kis.Pool().AddFlow(flowName, newFlow)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

First, it calls parseConfigWalkYaml() to load all configuration information into memory.
Then, it traverses all Flows one by one to construct them. Finally, the flow is added to the Pool. The specific construction process is as follows:

kis-flow/file/config_import.go

func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error {
    // Load Functions that the current Flow depends on
    if funcConfig, ok := all.Funcs[fp.FuncName]; !ok {
        return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName))
    } else {
        // Link Flow to Functions
        if funcConfig.Option.CName != "" {
            // Load Connectors that the current Function depends on
            if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok {
                return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName))
            } else {
                // Associate Function Configurations with Connector Configurations
                _ = funcConfig.AddConnConfig(connConf)
            }
        }

        // Link Flow to Functions
        if err := newFlow.Link(funcConfig, fp.Params); err != nil {
            return err
        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

6.2 Configure Import Unit Testing

Create a unit test file kis-flow/test/kis_config_import_test.go.

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestConfigImportYmal(t *testing.T) {
    ctx := context.Background()

    // 0. Register Function callbacks
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. Register ConnectorInit and Connector callbacks
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. Load configuration file and build Flow
    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. Get Flow
    flow1 := kis.Pool().GetFlow("flowName1")

    // 3. Commit raw data
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. Execute flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

Enter fullscreen mode Exit fullscreen mode

First, register the business methods. Then load the configuration via ConfigImportYaml, get the flow instance from the pool, submit data, and run it.

Note that the absolute path to the configuration file is provided here.

Navigate to the kis-flow/test/ directory and execute the command:

go test -test.v -test.paniconexit0 -test.run TestConfigImportYmal    
Enter fullscreen mode Exit fullscreen mode

The expected result is as follows:

=== RUN   TestConfigImportYmal
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1

context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114960 ThisFunctionId:func-37c7070f45144529891d433ae9c4ebfc PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionS, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001149c0 ThisFunctionId:func-5315301ffbbb4ae4be021729ddff1569 PrevFunctionId:func-37c7070f45144529891d433ae9c4ebfc funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}

---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]]

KisFunctionC, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114a20 ThisFunctionId:func-89a6a662729b4a0895e849c40bf29892 PrevFunctionId:func-5315301ffbbb4ae4be021729ddff1569 funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}

---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 2
--- PASS: TestConfigImportYmal (0.01s)
PASS
ok      kis-flow/test   0.517s

Enter fullscreen mode Exit fullscreen mode

The expected result is consistent with our expectations. Now we can load and build KisFlow through configuration files.

6.3 Configure Export

6.3.1 Implementation of Export

kis-flow/file/config_export.go

package file

import (
    "errors"
    "fmt"
    "gopkg.in/yaml.v3"
    "io/ioutil"
    "kis-flow/common"
    "kis-flow/kis"
)

// ConfigExportYaml exports flow configurations and stores them locally
func ConfigExportYaml(flow kis.Flow, savePath string) error {

    if data, err := yaml.Marshal(flow.GetConfig()); err != nil {
        return err
    } else {
        // Flow
        err := ioutil.WriteFile(savePath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)
        if err != nil {
            return err
        }

        // Function
        for _, fp := range flow.GetConfig().Flows {
            fConf := flow.GetFuncConfigByName(fp.FuncName)
            if fConf == nil {
                return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName))
            }

            if fdata, err := yaml.Marshal(fConf); err != nil {
                return err
            } else {
                if err := ioutil.WriteFile(savePath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil {
                    return err
                }
            }

            // Connector
            if fConf.Option.CName != "" {
                cConf, err := fConf.GetConnConfig()
                if err != nil {
                    return err
                }
                if cdata, err := yaml.Marshal(cConf); err != nil {
                    return err
                } else {
                    if err := ioutil.WriteFile(savePath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {
                        return err
                    }
                }
            }
        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

6.3.2 New Interfaces in Flow

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
)

type Flow interface {
    // Run schedules the Flow, sequentially dispatches and executes Functions in the Flow
    Run(ctx context.Context) error
    // Link connects the Functions in the Flow according to the configuration file
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow submits Flow data to the upcoming Function layer
    CommitRow(row interface{}) error
    // Input gets the input source data for the currently executing Function of the flow
    Input() common.KisRowArr
    // GetName gets the name of the Flow
    GetName() string
    // GetThisFunction gets the currently executing Function of the Flow
    GetThisFunction() Function
    // GetThisFuncConf gets the configuration of the currently executing Function of the Flow
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector gets the Connector of the currently executing Function of the Flow
    GetConnector() (Connector, error)
    // GetConnConf gets the configuration of the Connector of the currently executing Function of the Flow
    GetConnConf() (*config.KisConnConfig, error)

    // +++++++++++++++++++++++++++++++
    // GetConfig gets the configuration of the current Flow
    GetConfig() *config.KisFlowConfig
    // GetFuncConfigByName gets the configuration of the current Flow by Function name
    GetFuncConfigByName(funcName string) *config.KisFuncConfig
    // +++++++++++++++++++++++++++++++
}
Enter fullscreen mode Exit fullscreen mode

Flow's new interfaces are implemented as follows:

kis-flow/flow/kis_flow.go

func (flow *KisFlow) GetConfig() *config.KisFlowConfig {
    return flow.Conf
}

// GetFuncConfigByName gets the configuration of the current Flow by Function name
func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig {
    if f, ok := flow.Funcs[funcName]; ok {
        return f.GetConfig()
    } else {
        log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName)
        return nil
    }
}
Enter fullscreen mode Exit fullscreen mode

6.3.3 Fix in KisFlow's Funcs

There was a typo here before.

kis-flow/flow/kis_flow.go

// KisFlow is used to pass the context environment for the entire stream computing
type KisFlow struct {
    // Basic information
    Id   string                // Flow's distributed instance ID (used by KisFlow internally to distinguish different instances)
    Name string                // Flow's readable name
    Conf *config.KisFlowConfig // Flow configuration strategy

    // Function list
    Funcs          map[string]kis.Function // All managed Function objects of the current flow, key: FunctionName
    FlowHead       kis.Function            // Function list head owned by the current Flow
    FlowTail       kis.Function            // Function list tail owned by the current Flow
    flock          sync.RWMutex            // Lock for managing linked list insertion and reading and writing
    ThisFunction   kis.Function            // Currently executing KisFunction object of the Flow
    ThisFunctionId string                  // ID of the currently executing Function
    PrevFunctionId string                  // ID of the Function above the currently executing Function

    // Function list parameters
    funcParams map[string]config.FParam // Custom fixed configuration parameters of the flow in the current Function, Key: KisID of the function instance, value: FParam
    fplock     sync.RWMutex             // Lock for managing funcParams reading and writing

    // Data
    buffer common.KisRowArr  // Internal buffer used to temporarily store input byte data, one data is interface{}, multiple data is []interface{}, i.e., KisBatch
    data   common.KisDataMap // Data sources at various levels of stream computing
    inPut  common.KisRowArr  // Input data for the current Function calculation
}
Enter fullscreen mode Exit fullscreen mode

The Funcs member here, its key meaning, was previously defined as KisID, and now it should be corrected to mean FunctionName.
Below is a simple correction to the code where Funcs member is assigned:

// appendFunc adds Function to Flow, linked list operation
func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error {
    // ... ... 


    // Adds the detailed hash relationship of Function Name to the flow object
    flow.Funcs[function.GetConfig().FName] = function

    // ... ... 
}

Enter fullscreen mode Exit fullscreen mode

6.3.4 Addition in KisPool

kis-flow/kis/pool.go

// GetFlows gets all the Flows
func (pool *kisPool) GetFlows() []Flow {
    pool.flowLock.RLock() // Read lock
    defer pool.flowLock.RUnlock()

    var flows []Flow

    for _, flow := range pool.flowRouter {
        flows = append(flows, flow)
    }

    return flows
}
Enter fullscreen mode Exit fullscreen mode

KisPool adds a method to get all Flows to support the export module.

6.4 Configuration Export Unit Testing

Create the kis_config_export_test.go file in kis-flow/test/:

package test

import (
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestConfigExportYmal(t *testing.T) {
    // 0. Register Function callback business
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. Register ConnectorInit and Connector callback business
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. Load configuration file and build Flow
    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. Export the configured in-memory KisFlow structure to files
    flows := kis.Pool().GetFlows()
    for _, flow := range flows {
        if err := file.ConfigExportYaml(flow, "/Users/gopath/src/kis-flow/test/export_conf/"); err != nil {
            panic(err)
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Navigate to kis-flow/test/ and execute:

go test -test.v -test.paniconexit0 -test.run  TestConfigExportYmal 
Enter fullscreen mode Exit fullscreen mode

This will generate exported configurations under kis-flow/test/export_conf/:

├── export_conf
│   ├── conn-ConnName1.yaml
│   ├── flow-flowName1.yaml
│   ├── func-funcName1.yaml
│   ├── func-funcName2.yaml
│   └── func-funcName3.yaml
Enter fullscreen mode Exit fullscreen mode

6.5 [V0.5] Source Code

https://github.com/aceld/kis-flow/releases/tag/v0.5


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export


Top comments (0)