DEV Community

Aceld
Aceld

Posted on • Updated on

(Part 3)Golang Framework Hands-on - KisFlow Stream Computing Framework- Data Stream

#go

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling

To be continued.


3.1 Data Type Definition

In KisFlow, various types of data can be passed as the data source for the Flow. Moreover, KisFlow supports batch data streaming calculations.

Firstly, we need to define the basic data types supported internally by KisFlow. We will place this definition code in the kis-flow/common/ directory, in the data_type.go file.

kis-flow/common/data_type.go

package common

// KisRow represents a single data row
type KisRow interface{}

// KisRowArr represents a batch of data for a single business operation
type KisRowArr []KisRow

/*
    KisDataMap holds all the data carried by the current Flow,
    key :  Function ID where the data resides
    value: Corresponding KisRowArr
*/
type KisDataMap map[string]KisRowArr
Enter fullscreen mode Exit fullscreen mode
  • KisRow: Represents a single data row, which can be of any data type, such as a string, JSON string, serialized binary data, protobuf, YAML string, etc.
  • KisRowArr: Represents multiple data rows, i.e., a batch of data submitted at once. It is a collection of KisRow arrays.
  • KisDataMap: Represents all the data carried by the current Flow. It is of type map[string]KisRowArr, where the key is the Function ID where the data resides, and the value is the data.

3.2 KisFlow Data Stream Handling

In the KisFlow module, we add some members to store data, as follows:

kis-flow/flow/kis_flow.go

// KisFlow is used to traverse the entire context of the streaming computation
type KisFlow struct {
    // Basic information
    Id   string                // Distributed instance ID of the Flow (used by KisFlow to distinguish different instances)
    Name string                // Readable name of the Flow
    Conf *config.KisFlowConfig // Flow configuration strategy

    // Function list
    Funcs          map[string]kis.Function // All managed Function objects of the current flow, key: FunctionID
    FlowHead       kis.Function            // Head of the Function list owned by the current Flow
    FlowTail       kis.Function            // Tail of the Function list owned by the current Flow
    flock          sync.RWMutex            // Lock for managing linked list insertions and reads
    ThisFunction   kis.Function            // KisFunction object currently being executed in the Flow
    ThisFunctionId string                  // ID of the Function currently being executed (Strategy configuration ID)
    PrevFunctionId string                  // ID of the previous layer Function where the execution is currently at (Strategy configuration ID)

    // Function list parameters
    funcParams map[string]config.FParam // Custom fixed configuration parameters of the flow in the current Function, Key: function instance KisID, value: FParam
    fplock     sync.RWMutex             // Lock for managing funcParams reads and writes

    // ++++++++ Data ++++++++++
    buffer common.KisRowArr  // Internal buffer for temporarily storing input byte data, one data is interface{}, multiple data is []interface{}, also known as KisBatch
    data   common.KisDataMap // Data sources for streaming computation at various levels
    inPut  common.KisRowArr  // Input data for the current Function's computation
}
Enter fullscreen mode Exit fullscreen mode
  • buffer: Internal buffer for temporarily storing input byte data, one data is interface{}, multiple data is []interface{}, also known as KisBatch.
  • data: Data sources for streaming computation at various levels.
  • inPut: Input data for the current Function's computation.

These members will be used in the subsequent chapters and are introduced here for understanding purposes.

Since data is of type map, it needs to be initialized in NewKisFlow():

kis-flow/flow/kis_flow.go

// NewKisFlow creates a KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
    flow := new(KisFlow)
    // Instance Id
    flow.Id = id.KisID(common.KisIdTypeFlow)

    // Basic information
    flow.Name = conf.FlowName
    flow.Conf = conf

    // Function list
    flow.Funcs = make(map[string]kis.Function)
    flow.funcParams = make(map[string]config.FParam)

    // ++++++++ Data +++++++
    flow.data = make(common.KisDataMap)

    return flow
}
Enter fullscreen mode Exit fullscreen mode

3.2.2 Business Data Submission Interface

When developers are writing business logic for KisFlow, they can submit business source data through the flow instance. Therefore, we need to add an interface for submitting data to the Flow abstraction layer:

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
)

type Flow interface {
    // Run schedules the Flow and executes the Functions in the Flow in sequence
    Run(ctx context.Context) error
    // Link connects the Functions in the Flow according to the configuration file
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow  ++++++ Submit Flow data to the upcoming Function layer ++++
    CommitRow(row interface{}) error
}
Enter fullscreen mode Exit fullscreen mode

We add the interface CommitRow(any interface{}) error.

Implement this interface in kis-flow/flow/kis_flow_data.go.

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) CommitRow(row interface{}) error {

    flow.buffer = append(flow.buffer, row)

    return nil
}
Enter fullscreen mode Exit fullscreen mode

CommitRow() is used to submit Flow data, one row at a time. If it is batch data, it can be submitted multiple times. All submitted data will be temporarily stored in the flow.buffer member as a buffer.

3.2.3 KisFlow Internal Data Submission

Now developers can use CommitRow() to submit data to the buffer. However, within KisFlow, an internal interface is needed to submit the buffer to KisFlow's data, serving as the context data for all Functions in the current Flow. So, we need to provide two more interfaces here: commitSrcData() for the initial data submission and commitCurData() for intermediate data submission.

A. Initial Data Submission

kis-flow/flow/kis_flow_data.go

// commitSrcData submits the data source of the current Flow, indicating the initial submission of the original data source of the current Flow
// Submits the temporary data buffer of the flow to the flow's data (data is the source data backup for each Function level)
// Clears all previous flow data
func (flow *KisFlow) commitSrcData(ctx context.Context) error {

    // Create batch data
    dataCnt := len(flow.buffer)
    batch := make(common.KisRowArr, 0, dataCnt)

    for _, row := range flow.buffer {
        batch = append(batch, row)
    }

    // Clear all previous data
    flow.clearData(flow.data)

    // Initial submission, record flow original data
    // Because it is the initial submission, PrevFunctionId is FirstVirtual because there is no previous Function
    flow.data[common.FunctionIdFirstVirtual] = batch

    // Clear the buffer
    flow.buffer = flow.buffer[0:0]

    log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}

//ClearData clears all data of the flow
func (flow *KisFlow) clearData(data common.KisDataMap) {
    for k := range data {
        delete(data, k)
    }
}
Enter fullscreen mode Exit fullscreen mode

In practice, commitSrcData() is executed only once throughout the entire Flow runtime and serves as the original data source for the current Flow.

The ultimate goal of commitSrcData() is to submit the buffer data to data[FunctionIdFirstVirtual]. Note that FunctionIdFirstVirtual is a virtual fid, serving as the upstream Function ID for all Functions. After the initial submission, the data in flow.buffer will be cleared.

B. Intermediate Data Submission

kis-flow/flow/kis_flow_data.go

//commitCurData submits the result data of the current executing Function of the Flow
func (flow *KisFlow) commitCurData(ctx context.Context) error {

    //Check if this layer of computation has result data, if not, exit this Flow Run loop
    if len(flow.buffer) == 0 {
        return nil
    }

    // Create batch data
    batch := make(common.KisRowArr, 0, len(flow.buffer))

    // If strBuf is empty, no data has been added
    for _, row := range flow.buffer {
        batch = append(batch, row)
    }

    //Submit the buffer data of this layer's computation to the result data of this layer
    flow.data[flow.ThisFunctionId] = batch

    //Clear the buffer
    flow.buffer = flow.buffer[0:0]

    log.Logger().DebugFX(ctx, " ====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}
Enter fullscreen mode Exit fullscreen mode

commitCurData() is executed each time after a Function's computation to submit the result data of the current Function. commitCurData() is executed multiple times during the Flow's streaming computation process.

The ultimate goal of commitCurData() is to submit the buffer data to data[flow.ThisFunctionId]. ThisFunctionId is the ID of the currently executing Function and is also the upstream Function ID for the next layer to execute.

After submission, the data in flow.buffer will be cleared.

3.2.4 Obtaining Source Data of the Executing Function

To obtain the source data for each layer of Function, we can use the getCurData() method. We index it through PrevFunctionId because obtaining the source data of the current Function is the result data of the previous layer of Function. So, we get the ID of the previous layer of Function through PrevFunctionId, and the data source can be obtained from data[PrevFunctionId].

kis-flow/flow/kis_flow_data.go

// getCurData gets the input data of the current Function layer of the flow
func (flow *KisFlow) getCurData() (common.KisRowArr, error) {
    if flow.PrevFunctionId == "" {
        return nil, errors.New(fmt.Sprintf("flow.PrevFunctionId is not set"))
    }

    if _, ok := flow.data[flow.PrevFunctionId]; !ok {
        return nil, errors.New(fmt.Sprintf("[%s] is not in flow.data", flow.PrevFunctionId))
    }

    return flow.data[flow.PrevFunctionId], nil
}
Enter fullscreen mode Exit fullscreen mode

3.2.5 Data Stream Chained Scheduling

Now we need to add data stream handling actions in the flow.Run() method.

kis-flow/flow/kis_flow.go

// Run starts the streaming computation of KisFlow, executing the flow starting from the initial Function
func (flow *KisFlow) Run(ctx context.Context) error {

    var fn kis.Function

    fn = flow.FlowHead

    if flow.Conf.Status == int(common.FlowDisable) {
        // Flow is configured to be disabled
        return nil
    }

    // ========= Data Stream Addition ===========
    // Because no Function has been executed at this time, PrevFunctionId is FirstVirtual because there is no previous Function
    flow.PrevFunctionId = common.FunctionIdFirstVirtual

    // Submit the original data of the data stream
    if err := flow.commitSrcData(ctx); err != nil {
        return err
    }
    // ========= Data Stream Addition ===========

    // Streaming chained call
    for fn != nil {

        // ========= Data Stream Addition ===========
        // Record the currently executing Function of the flow
        fid := fn.GetId()
        flow.ThisFunction = fn
        flow.ThisFunctionId = fid

        // Get the source data that the current Function needs to process
        if inputData, err := flow.getCurData(); err != nil {
            log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
            return err
        } else {
            flow.inPut = inputData
        }
        // ========= Data Stream Addition ===========

        if err := fn.Call(ctx, flow); err != nil {
            // Error
            return err
        } else {
            // Success

            // ========= Data Stream Addition ===========
            if err := flow.commitCurData(ctx); err != nil {
                return err
            }

            // Update the cursor of the previous layer FuncitonId
            flow.PrevFunctionId = flow.ThisFunctionId
            // ========= Data Stream Addition ===========

            fn = fn.Next()
        }
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode
  • At the start of run(), initialize PrevFunctionId to FunctionIdFirstVirtual.
  • At the start of run(), execute commitSrcData() to submit the buffer data assigned by the business to data[FunctionIdFirstVirtual].
  • Enter the loop, when executing each Function, use getCurData() to get the source data of the current Function and place it in the flow.inPut member.
  • Enter the loop, correct ThisFunctionId cursor to the current Function ID.
  • Enter the loop, after each Funciton execution, use commitCurData() to submit the result data generated by the Function and change PrevFunctionId to the current FunctionID, entering the next layer.

Clearly, we also need to provide a Flow interface for developers to get Input data.

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
)

type Flow interface {
    // Run schedules the Flow and executes the Functions in the Flow in sequence
    Run(ctx context.Context) error
    // Link connects the Functions in the Flow according to the configuration file
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow submits the Flow data to the upcoming Function layer
    CommitRow(row interface{}) error

    // ++++++++++++++++++++++
    // Input gets the input source data of the current executing Function of the flow
    Input() common.KisRowArr
}
Enter fullscreen mode Exit fullscreen mode

Implement as follows:

kis-flow/flow/kis_flow_data.go

// Input gets the input source data of the current executing Function of the flow
func (flow *KisFlow) Input() common.KisRowArr {
    return flow.inPut
}
Enter fullscreen mode Exit fullscreen mode

3.3 Data Stream Processing in KisFunction

Since our Function scheduling module has not been implemented yet, the business calculation logic when executing the Call() method of the Function can only be temporarily hardcoded in the KisFlow framework. In the next chapter, we will open up this part of the calculation logic for developers to register their own business.

Now that the Flow has passed the data to each layer of Function, let's simulate the basic business calculation logic in the Function below.

We temporarily modify the Call() code of the KisFunctionC and KisFunctionE modules.
Assuming KisFunctionC is the upper layer of KisFunctionE.

kis-flow/function/kis_function_c.go

type KisFunctionC struct {
    BaseFunction
}

func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
    log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow)

    //TODO Call the specific Function execution method
    //Process business data
    for i, row := range flow.Input() {
        fmt.Printf("In KisFunctionC, row = %+v\n", row)

        // Submit the result data of this layer's calculation
        _ = flow.CommitRow("Data From KisFunctionC, index " + " " + fmt.Sprintf("%d", i))
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/function/kis_function_e.go

type KisFunctionE struct {
    BaseFunction
}

func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
    log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow)

    // TODO Call the specific Function execution method
    //Process business data
    for _, row := range flow.Input() {
        fmt.Printf("In KisFunctionE, row = %+v\n", row)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

3.4 Data Stream Unit Testing

Now let's simulate a simple business calculation to test whether each layer of Function can receive data and pass the calculation results to the next layer.

kis-flow/test/kis_flow_test.go

func TestNewKisFlowData(t *testing.T) {
    ctx := context.Background()

    // 1. Create 2 KisFunction configuration instances
    source1 := config.KisSource{
        Name: "Public Account Douyin Mall User Order Data",
        Must: []string{"order_id", "user_id"},
    }

    source2 := config.KisSource{
        Name: "User Order Error Rate",
        Must: []string{"order_id", "user_id"},
    }

    myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
    if myFuncConfig1 == nil {
        panic("myFuncConfig1 is nil")
    }

    myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil)
    if myFuncConfig2 == nil {
        panic("myFuncConfig2 is nil")
    }

    // 2. Create a KisFlow configuration instance
    myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)

    // 3. Create a KisFlow object
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // 4. Link Functions to the Flow
    if err := flow1.Link(myFuncConfig1, nil); err != nil {
        panic(err)
    }
    if err := flow1.Link(myFuncConfig2, nil); err != nil {
        panic(err)
    }

    // 5. Submit original data
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 6. Execute flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Here we submit 3 rows of data through flow.CommitRow(), each row of data is a string. Of course, the data format and type can be arbitrary, and only need to be aligned by the business logic of each layer's Function.

Navigate to kis-flow/test/ and execute the command:

go test -test.v -test.paniconexit0 -test.run TestNewKisFlowData
Enter fullscreen mode Exit fullscreen mode

The result is as follows:

=== RUN   TestNewKisFlowData
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionC, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000138190 ThisFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

In KisFunctionC, row = This is Data1 from Test
In KisFunctionC, row = This is Data2 from Test
In KisFunctionC, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index  0 Data From KisFunctionC, index  1 Data From KisFunctionC, index  2]]

KisFunctionE, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001381e0 ThisFunctionId:func-2182fa1a049f4c1c9eeb641f5292f09f PrevFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index  0 Data From KisFunctionC, index  1 Data From KisFunctionC, index  2]] inPut:[Data From KisFunctionC, index  0 Data From KisFunctionC, index  1 Data From KisFunctionC, index  2]}

In KisFunctionE, row = Data From KisFunctionC, index  0
In KisFunctionE, row = Data From KisFunctionC, index  1
In KisFunctionE, row = Data From KisFunctionC, index  2
--- PASS: TestNewKisFlowData (0.00s)
PASS
ok      kis-flow/test   0.636s
Enter fullscreen mode Exit fullscreen mode

After detailed log verification, the result meets our expectations.

Now, the simplest version of the data stream has been implemented. In the next chapter, we will open up the business logic of the Function to developers, rather than writing it in the KisFlow framework.

3.5 【V0.2】Source Code

https://github.com/aceld/kis-flow/releases/tag/v0.2


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling


Top comments (0)