Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Download KisFlow Source
$go get github.com/aceld/kis-flow
KisFlow Developer Documentation
Source Code Example
https://github.com/aceld/kis-flow-usage/tree/main/6-flow_in_goroutines
If you need the same Flow to run concurrently in multiple Goroutines, you can use the flow.Fork()
function to clone a Flow instance with isolated memory but the same configuration. Each Flow instance can then be executed in different Goroutines to compute their respective data streams.
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/file"
"github.com/aceld/kis-flow/kis"
"sync"
)
func main() {
ctx := context.Background()
// Get a WaitGroup
var wg sync.WaitGroup
// Load Configuration from file
if err := file.ConfigImportYaml("conf/"); err != nil {
panic(err)
}
// Get the flow
flow1 := kis.Pool().GetFlow("CalStuAvgScore")
if flow1 == nil {
panic("flow1 is nil")
}
// Fork the flow
flowClone1 := flow1.Fork(ctx)
// Add to WaitGroup
wg.Add(2)
// Run Flow1
go func() {
defer wg.Done()
// Submit a string
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
// Submit a string
_ = flow1.CommitRow(`{"stu_id":1001, "score_1":100, "score_2":70, "score_3":60}`)
// Run the flow
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
}()
// Run FlowClone1
go func() {
defer wg.Done()
// Submit a string
_ = flowClone1.CommitRow(`{"stu_id":201, "score_1":100, "score_2":90, "score_3":80}`)
// Submit a string
_ = flowClone1.CommitRow(`{"stu_id":2001, "score_1":100, "score_2":70, "score_3":60}`)
if err := flowClone1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
}()
// Wait for Goroutines to finish
wg.Wait()
fmt.Println("All flows completed.")
return
}
func init() {
// Register functions
kis.Pool().FaaS("VerifyStu", VerifyStu)
kis.Pool().FaaS("AvgStuScore", AvgStuScore)
kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}
In this code snippet, we start two Goroutines to run Flow1 and its clone (FlowClone1) concurrently to calculate the final average scores for students 101, 1001, 201, and 2001.
Author: Aceld
GitHub: https://github.com/aceld
KisFlow Open Source Project Address: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (0)