DEV Community

Aceld
Aceld

Posted on • Updated on

(Part 1) Golang Framework Hands-on - KisFlow Streaming Computing Framework - Overview

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Twitter: https://twitter.com/Aceld_

Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling

To be continued.

1.1 Why KisFlow is Needed

In many large-scale B2B enterprise projects, there is a significant need for business data, much of which requires real-time streaming computing capabilities. However, many companies are not yet capable of supporting a data warehouse solution like Flink + Hadoop/HBase, etc. Yet, the demand for real-time computing of business data persists, leading most enterprises to delegate this computational work to business engineers.

However, directly querying the business database can indirectly affect business capabilities, and using scheduled tasks/scripts for periodic calculations is not a good solution. I have personally experienced a large-scale system with as many as a thousand business data fields that needed to be computed. Early on, due to poor planning, there were over 1000 scripts running on a schedule, which eventually led to mutual interference between the scripts, causing inaccuracies in the data. This frequently resulted in data reporting errors in business data. For example, consider a scenario where the correct value for a business calculation field is 100, but due to the complexity of the legacy code, multiple scripts are attempting to patch and correct this value. These conflicting scripts can cause data value fluctuations within a certain time interval, leading to temporary inaccuracies in the business data. Although one patch may eventually correct the value, during this period, the business data is incorrect, which can be very frustrating for users.

Image1

KisFlow is designed to address scenarios where enterprises lack the computational capabilities of a data warehouse platform but still require real-time data processing. It enables business engineers to engage in stream computing tasks and reuse common and general computational logic.

1.2 Key Capabilities Supported by KisFlow

Stream Computing

  • Distributed batch consumption capabilities (based on upstream ODS consumption configurations: such as Binlog, Kafka, etc.)
  • Stateful Function capability, allowing the splicing of stateful stream computing nodes and horizontal and vertical scaling of stream computing.
  • Data stream monitoring and repair capabilities, including consumer service monitoring.
  • Multi-stream splicing and third-party middleware storage plug-in support.

Distributed Task Scheduling

  • Distributed scheduled task scheduling, log monitoring, and task scheduling status.
  • Visualized scheduling platform.

1.3 KisFlow System Positioning

KisFlow serves as the business upstream computing layer. It interfaces with the data warehouse/other business-side ODS layers upstream and connects to the local business storage data center downstream.

Levels Level Explanation Sub-modules
Flowing Computation Layer The upstream computing layer for KisFlow, which directly connects to business storage and the ODS (Operational Data Store) layer of data warehouses. The upstream can be MySQL Binlog, logs, interface data, etc., and it supports a passive consumption mode, providing KisFlow with real-time computing capabilities. KisFlow: Distributed batch consumer; a KisFlow is composed of multiple KisFunctions.

KisConnectors: Computing data stream intermediate state persistence and connectors.

KisFunctions: Supports operator expression splicing, connector integration, strategy configuration, Stateful Function mode, and Slink stream splicing.

KisConfig: Binding of flow processing policies for KisFunctions, allowing Functions to have fixed independent processing capabilities.

KisSource: Interface for connecting to ODS data sources.
Task Scheduling Layer Timed task scheduling and execution business logic, including task scheduling platform, executor management, scheduling logs, and user management. Provides KisFlow's timed task, statistics, and aggregation calculation capabilities. The task scheduling platform has a visual interface.:ncludes running reports, scheduling reports, success rate, task management, configuration management, and GLUE IDE as visual management platforms.

Executor management KisJobs: Golang SDK, custom business logic, executor automatic registration, task triggering, termination, and removal.

Executor scenarios KisScenes: Logical task sets divided according to business needs.

Scheduling logs and user management: Collection of task scheduling logs, detailed scheduling, and scheduling process traces.

KisFlow

  1. A KisFlow can be composed of any KisFunction(s), and the length of a KisFlow can be dynamically adjusted.

  2. A KisFunction can be dynamically added to a specific KisFlow at any time, and the relationship between KisFlows can be dynamically adjusted through the addition of KisFunction's Load and Save nodes for parallel and branching actions.

  3. In programming behavior, KisFlow has shifted from data business programming to function-based single computing logic development, approaching the FaaS (Function as a Service) system.

1.4 Quick Start Use KisFlow

KisFlow Github: https://github.com/aceld/kis-flow

Case Source Code: https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config

Project

├── Makefile
├── conf
│   ├── flow-CalStuAvgScore.yml
│   ├── func-AvgStuScore.yml
│   └── func-PrintStuAvgScore.yml
├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go
Enter fullscreen mode Exit fullscreen mode

Flow

Config

(1) Flow Config

conf/flow-CalStuAvgScore.yml

kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
    - fname: AvgStuScore
    - fname: PrintStuAvgScore
Enter fullscreen mode Exit fullscreen mode

(2) Function1 Config

conf/func-AvgStuScore.yml

kistype: func
fname: AvgStuScore
fmode: Calculate
source:
    name: StudentScore
    must:
        - stu_id

Enter fullscreen mode Exit fullscreen mode

(3) Function2(Slink) Config

conf/func-PrintStuAvgScore.yml

kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
    name: StudentScore
    must:
        - stu_id

Enter fullscreen mode Exit fullscreen mode

Main

main.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
)

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flow1 := kis.Pool().GetFlow("CalStuAvgScore")
    if flow1 == nil {
        panic("flow1 is nil")
    }

    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)

    // Run the flow
    if err := flow1.Run(ctx); err != nil {
        fmt.Println("err: ", err)
    }

    return
}
Enter fullscreen mode Exit fullscreen mode

Function1

faas_stu_score_avg.go

package main

import (
    "context"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn struct {
    serialize.DefaultSerialize
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
    Score3 int `json:"score_3"`
}

type AvgStuScoreOut struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
    for _, row := range rows {

        out := AvgStuScoreOut{
            StuId:    row.StuId,
            AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
        }

        // Submit the result data
        _ = flow.CommitRow(out)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

Function2

faas_stu_score_avg_print.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

type PrintStuAvgScoreOut struct {
    serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {

    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

OutPut

Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]
Enter fullscreen mode Exit fullscreen mode

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling


Top comments (0)