A Golang based high performance, scalable and distributed workflow framework

Overview

Go-Flow

A Golang based high performance, scalable and distributed workflow framework

It allows to programmatically author distributed workflow as Directed Acyclic Graph (DAG) of tasks. GoFlow executes your tasks on an array of Flow workers by uniformly distributing the loads

Build GoDoc

Install It

Install GoFlow

go mod init myflow
go get github.com/s8sg/goflow

Write First Flow

Library to Build Flow github.com/s8sg/goflow/flow

GoDoc

Make a flow.go file

package main

import (
	"fmt"
	"github.com/s8sg/goflow"
	flow "github.com/s8sg/goflow/flow"
)

// Workload function
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
	return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
}

// Define provide definition of the workflow
func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
	f.SyncNode().Apply("test", doSomething)
	return nil
}

func main() {
	fs := &goflow.FlowService{
		Port:                8080,
		RedisURL:            "localhost:6379",
		OpenTraceUrl:        "localhost:5775",
		WorkerConcurrency:   5,
	}
	fs.Register("myflow", DefineWorkflow)
	fs.Start()
}

Start() runs a HTTP Server that listen on the provided port and as a flow worker that handles the workload

Run It

Start redis

docker run --name redis -p 6379:6379 -d redis

Run the Flow

go build -o goflow
./goflow

Invoke It

curl -d hallo localhost:8080/myflow

Scale It

GoFlow scale horizontally, you can distribute the load by just adding more instances.

Worker Mode

Alternatively you can start your GoFlow in worker mode. As a worker GoFlow only handles the workload, and if required you can only scale the workers

fs := &goflow.FlowService{
    RedisURL:            "localhost:6379",
    OpenTraceUrl:        "localhost:5775",
    WorkerConcurrency:   5,
}
fs.Register("myflow", DefineWorkflow)
fs.StartWorker()

Register Multiple Flow

Register() allows user to bind multiple flows onto single flow service. This way a server and or a worker can be used for more than one flows

fs.Register("createUser", DefineCreateUserFlow)
fs.Register("deleteUser", DefineDeleteUserFlow)

Execute It

Using Client

Using the client you can requests the flow directly. The requests are always async and gets queued for the worker to pick up

fs := &goflow.FlowService{
    RedisURL: "localhost:6379",
}
fs.Execute("myflow", &goflow.Request{
    Body: []byte("hallo")
})

Using Redis

For testing, it is helpful to use the redis-cli program to insert jobs onto the Redis queue:

redis-cli -r 100 RPUSH goflow:queue:myflow '{"class":"GoFlow","args":["hallo"]}'

this will insert 100 jobs for the GoFlow worker onto the myflow queue

Currently redis queue based job only take one argument as string


Creating More Complex DAG

Gopher staring_at flow

The initial example is a single vertex DAG. Single vertex DAG (referred as SyncNode) are great for synchronous task

Using GoFlow's DAG construct one can achieve more complex compositions with multiple vertexes and connect them using edges. A multi-vertex flow is always asynchronous in nature where each nodes gets distributed across the workers

Below is an example of a simple multi vertex flow to validate a KYC image of a user and mark the user according to the result. This is a asynchronous process consist of batch jobs

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-kyc-image").Apply('load-profile', loadProfile)
       .Apply("get-image-url", getPresignedURLForImage)
    dag.Node("face-detect").Apply("face-detect", detectFace)
    dag.Node("mark-profile").Apply("mark-profile", markProfileBasedOnStatus)
    dag.Edge("get-kyc-image", "face-detect")
    dag.Edge("face-detect", "mark-profile")
    return nil
}

Async Flow

Branching

Branching are great for parallelizing independent workloads in separate branches

Branching can be achieved with simple vertex and edges. GoFlow provides a special operator Aggregator to aggregate result of multiple branch on a converging node

We are extending our earlier example to include a new requirement to match the face with existing data and we are performing the operation in parallel to reduce time

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-kyc-image").Apply("load-profile", loadProfile)
       .Apply("get-image-url", getPresignedURLForImage)
    dag.Node("face-detect").Apply("face-detect", detectFace)
    dag.Node("face-match").Apply("face-match", matchFace)
    // Here mark-profile depends on the result from face-detect and face-match, 
    // we are using a aggregator to create unified results
    dag.Node("mark-profile", flow.Aggregator(func(responses map[string][]byte) ([]byte, error) {
       status := validateResults(responses["face-detect"],  responses["face-match"])
       return []byte(status), nil
    })).Apply("mark-profile", markProfileBasedOnStatus)
    dag.Edge("get-kyc-image", "face-detect")
    dag.Edge("get-kyc-image", "face-match")
    dag.Edge("face-detect", "mark-profile")
    dag.Edge("face-match", "mark-profile")
    return nil
}

Branching

Subdag

Subdag allows to reuse existing DAG by embedding it into DAG with wider functionality

SubDag is available as a GoFlow DAG construct which takes a separate DAG as an input and composite it within a vertex, where the vertex completion depends on the embedded DAG's completion

func (currentDag *Dag) SubDag(vertex string, dag *Dag)

Say we have a separate flow that needs the same set of steps to validate a user. With our earlier example we can separate out the validation process into subdag and put it in a library that can be shared across different flows

func KycImageValidationDag() *flow.Dag {
    dag := flow.NewDag()
    dag.Node("verify-url").Appply("verify-image-url", s3DocExists)
    dag.Node("face-detect").Apply("face-detect", detectFace)
    dag.Node("face-match").Apply("face-match", matchFace)
    dag.Node("generate-result", flow.Aggregator(func(responses map[string][]byte) ([]byte, error) {
           status := validateResults(responses["face-detect"],  responses["face-match"])
           status = "failure"
           if status {
              status = "success"
           }
           return []byte(status), nil
        })).Apply("generate-result", func(data []byte, option map[string][]string) ([]byte, error) {
           return data, nil
        })
    dag.Edge("verify-url", "face-detect")
    dag.Edge("verify-url", "face-match")
    dag.Edge("face-detect", "generate-result")
    dag.Edge("face-match", "generate-result")
    return dag
}

Our existing flow embeds the library DAG

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-image").Apply("load-profile", loadProfile)
           .Apply("get-image-url", getPresignedURLForImage)
    dag.SubDag("verify-image", common.KycImageValidationDag)
    dag.Node("mark-profile").Apply("mark-profile", markProfileBasedOnStatus)
    dag.Edge("get-image", "verify-image")
    dag.Edge("verify-image", "mark-profile")
    return nil
}

Subdag

Conditional Branching

Conditional branching is a great way to choose different execution path dynamically

GoFlow provides a DAG component called ConditionalBranch. ConditionalBranch creates a vertex that composites different conditional branches as an individual subdags, each identified with a unique key resemble the condition

func (currentDag *Dag) ConditionalBranch(vertex string, conditions []string, condition sdk.Condition,
    options ...BranchOption) (conditiondags map[string]*Dag)

Condition is a special handler that allows user to dynamically choose one or more execution path based on the result from earlier node and return a set of condition Keys

User gets the condition branches as a response where each branch specific dags are mapped against the specific condition. User can farther define each branch using the DAG constructs

Below is the updated example with a conditional Branch where we are trying to call face-match only when face-detect passes

func KycImageValidationDag() *flow.Dag {
    dag := flow.NewDag()
    dag.Node("verify-url").Apply("verify-image-url", s3DocExists)
    dag.Node("face-detect").Apply("face-detect", detectFace)
    // here face match happen only when face-detect is success
    branches = dag.ConditionalBranch("handle-face-detect-response", []string{"pass"}, func(response []byte) []string {
        response := ParseFaceDetectResponse(response)
        if response[0] == "pass" { return []string{"pass"}  }
        return []string{}
    })
    // On the pass branch we are performing the `face-match`
    // As defined condition `pass` is not matched execution of next node `generate-result` is continued
    branches["pass"].Node("face-match").Apply("face-match", matchFace)
  
    dag.Node("generate-result", generateResult)
    dag.Edge("verify-url", "face-detect")
    dag.Edge("face-detect", "handle-face-detect-response")
    dag.Edge("handle-face-detect-response", "generate-result")
    return dag
}

Conditional

You can also have multiple conditional branch in a workflow and different nodes corresponding to each branch

Below is the updated example with two conditional Branches where we are trying to call face-match or create-user based on response from previous node

func KycImageValidationDag() *flow.Dag {
    dag := flow.NewDag()
    dag.Node("verify-url").Apply("verify-image-url", s3DocExists)
    dag.Node("face-detect").Apply("face-detect", detectFace)
    // here face match happen only when face-detect is success
    // otherwise create-user is called
    branches = dag.ConditionalBranch("handle-face-detect-response", []string{"pass", "fail"}, func(response []byte) []string {
        response := ParseFaceDetectResponse(response)
        if response[0] == "pass" { return []string{"pass"}  }
        return []string{"fail"}
    })
    // On the pass branch we are performing the `face-match`
    branches["pass"].Node("face-match").Apply("face-match", matchFace)
    // on the fail branch we are performing `create-user`
    branches["fail"].Node("create-user").Apply("create-user", createUser)
  
    dag.Node("generate-result", generateResult)
    dag.Edge("verify-url", "face-detect")
    dag.Edge("face-detect", "handle-face-detect-response")
    dag.Edge("handle-face-detect-response", "generate-result")
    return dag
}

Foreach Branching

Foreach branching allows user to iteratively perform a certain set of task for a range of values

GoFlow provides a DAG component called ForEachBranch. ForEachBranch creates a vertex composites of a subdag that defines the flow within the iteration

func (currentDag *Dag) ForEachBranch(vertex string, foreach sdk.ForEach, options ...BranchOption) (dag *Dag)

ForEach is a special handler that allows user to dynamically return a set of key and values. For each of the items in the returned set, the user defined dag will get executed

User gets the foreach branch as a response and can define the flow using the DAG constructs

We are updating our flow to execute over a set of user that has been listed for possible fraud

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("get-users").Apply("get-listed-users", getListedUsers)
    verifyDag = dag.ForEachBranch("for-each-user-verify", func(data []byte) map[string][]byte {
       users := ParseUsersList(data)
       forEachSet := make(map[string][]byte)
       for _, user := range users {
           forEachSet[user.id] = []byte(user.GetKycImageUrl())
       }
       return forEachSet
    })
    verifyDag.SubDag("verify-image", KycImageValidationDag)
    verifyDag.Node("mark-profile").Apply("mark-profile", markProfileBasedOnStatus)
    verifyDag.Edge("verify-image", "mark-profile")

    dag.Edge("get-users", "for-each-user-verify")
    return nil
}

Foreach

Comments
  • [QUERY] Examples for conditionalBranch

    [QUERY] Examples for conditionalBranch

    Hey @s8sg I was checking the framework to build an aggregator service which calls multiple services and combine results received from each one of them. I was wondering if i can use this framework to achieve below use case lets say i have three nodes arranged in series. I want to store the result of intermediate nodes so that on the basis of result, i would decide if i want to proceed to the next node or not. I saw dag.ConditionalBranch can achieve so by adding list of conditions. my question is how would you parse response of intermediate nodes? How can we fetch that response from previous node is one defined in my conditionList I would appreciate if you can give more detailed example of ConditionalBranch

    opened by Shradha131 13
  • [bug] when enable tracing support, I find that there are concurrent operations with map

    [bug] when enable tracing support, I find that there are concurrent operations with map

    error message

    fatal error: concurrent map writes
    fatal error: concurrent map writes
    
    goroutine 67 [running]:
    runtime.throw(0x14d49a2, 0x15)
            /usr/local/go/src/runtime/panic.go:1116 +0x72 fp=0xc00038b6e0 sp=0xc00038b6b0 pc=0x10343f2
    runtime.mapassign_faststr(0x1439380, 0xc0000df950, 0x14cdfa3, 0x4, 0x1)
            /usr/local/go/src/runtime/map_faststr.go:291 +0x3de fp=0xc00038b748 sp=0xc00038b6e0 pc=0x1014f5e
    github.com/s8sg/goflow/eventhandler.(*TraceHandler).StartOperationSpan(0xc0002e2b40, 0xc000227090, 0x8, 0xc000376800, 0x14, 0x14cdfa3, 0x4)
            /Users/apple/go/pkg/mod/github.com/s8sg/[email protected]/eventhandler/trace_handler.go:124 +0x223 fp=0xc00038b7d8 sp=0xc00038b748 pc=0x13107d3
    github.com/s8sg/goflow/eventhandler.(*FaasEventHandler).ReportOperationStart(0xc00009c100, 0x14cdfa3, 0x4, 0xc000227090, 0x8, 0xc000376800, 0x14)
            /Users/apple/go/pkg/mod/github.com/s8sg/[email protected]/eventhandler/faas_event_handler.go:66 +0x6b fp=0xc00038b820 sp=0xc00038b7d8 pc=0x130f4bb
    github.com/faasflow/sdk/executor.(*FlowExecutor).executeNode(0xc00038bb98, 0xc0002268d0, 0x5, 0x8, 0xc00019a1a8, 0x1841590, 0x1, 0x0, 0x0)
            /Users/apple/go/pkg/mod/github.com/faasflow/[email protected]/executor/executor.go:350 +0x702 fp=0xc00038b968 sp=0xc00038b820 pc=0x11338f2
    github.com/faasflow/sdk/executor.(*FlowExecutor).Execute(0xc00038bb98, 0xc000380a00, 0x14d1ee9, 0xf, 0x18ac040, 0x203000, 0x203000)
            /Users/apple/go/pkg/mod/github.com/faasflow/[email protected]/executor/executor.go:1266 +0xa8a fp=0xc00038bb00 sp=0xc00038b968 pc=0x113cbba
    github.com/faasflow/runtime/controller/handler.ExecuteFlowHandler(0xc00038bcc8, 0xc0001f21e0, 0x15781e0, 0xc000128790, 0x0, 0x0)
            /Users/apple/go/pkg/mod/github.com/faasflow/[email protected]/controller/handler/execute_flow_handler.go:28 +0x295 fp=0xc00038bc60 sp=0xc00038bb00 pc=0x1388395
    github.com/s8sg/goflow/runtime.(*FlowRuntime).handleNewRequest(0xc000168100, 0xc0001f21e0, 0x6, 0xc0001f21e0)
            /Users/apple/go/pkg/mod/github.com/s8sg/[email protected]/runtime/flow_runtime.go:204 +0xf2 fp=0xc00038bd08 sp=0xc00038bc60 pc=0x13d1bf2
    github.com/s8sg/goflow/runtime.(*FlowRuntime).queueReceiver(0xc000168100, 0xc0000900a0, 0x15, 0xc00037e180, 0x6, 0x6, 0x2554c7bf5, 0x18800a0)
            /Users/apple/go/pkg/mod/github.com/s8sg/[email protected]/runtime/flow_runtime.go:178 +0x37f fp=0xc00038bdd8 sp=0xc00038bd08 pc=0x13d195f
    github.com/s8sg/goflow/runtime.(*FlowRuntime).queueReceiver-fm(0xc0000900a0, 0x15, 0xc00037e180, 0x6, 0x6, 0xc0001803c0, 0x0)
            /Users/apple/go/pkg/mod/github.com/s8sg/[email protected]/runtime/flow_runtime.go:160 +0x5c fp=0xc00038be28 sp=0xc00038bdd8 pc=0x13d391c
    github.com/benmanns/goworker.(*worker).run(0xc000208280, 0xc000066100, 0xc0000a2130)
            /Users/apple/go/pkg/mod/github.com/benmanns/[email protected]/worker.go:154 +0x29f fp=0xc00038bed8 sp=0xc00038be28 pc=0x13852cf
    github.com/benmanns/goworker.(*worker).work.func1(0xc0002260a0, 0xc000208280, 0xc0000a41e0)
            /Users/apple/go/pkg/mod/github.com/benmanns/[email protected]/worker.go:108 +0x12a fp=0xc00038bfc8 sp=0xc00038bed8 pc=0x138708a
    runtime.goexit()
            /usr/local/go/src/runtime/asm_amd64.s:1373 +0x1 fp=0xc00038bfd0 sp=0xc00038bfc8 pc=0x1064091
    created by github.com/benmanns/goworker.(*worker).work
            /Users/apple/go/pkg/mod/github.com/benmanns/[email protected]/worker.go:93 +0x1b4
    
    
    Process finished with exit code 2
    
    

    test code

    package main
    
    import (
    	"fmt"
    	"os"
    	"time"
    
    	"github.com/s8sg/goflow"
    	flow "github.com/s8sg/goflow/flow"
    )
    
    // Workload function
    func doSomething(data []byte, option map[string][]string) ([]byte, error) {
    	fmt.Println("doSomething")
    	time.Sleep(time.Second)
    	return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
    }
    
    // Define provide definition of the workflow
    func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    	f.SyncNode().Apply("test", doSomething).Apply("test", doSomething).Apply("test", doSomething).Apply("test", doSomething)
    	return nil
    }
    
    func main() {
    	os.Setenv("enable_tracing", "true")
    	for i := 0; i < 10; i++ {
    		go func() {
    			time.Sleep(10*time.Second)
    			fmt.Println("start!")
    			fs := &goflow.FlowService{
    				RedisURL: "127.0.0.1:6379",
    			}
    			fs.Execute("myflow", &goflow.Request{
    				Body: []byte("hallo"),
    			})
    		}()
    	}
    	fs := &goflow.FlowService{
    		Port:              8080,
    		RedisURL:          "127.0.0.1:6379",
    		OpenTraceUrl:      "127.0.0.1:5775",
    		WorkerConcurrency: 5,
    	}
    	err := fs.Register("myflow", DefineWorkflow)
    	if err != nil {
    	    panic(err)
    	}
    	err = fs.Start()
    	if err != nil {
    		panic(err)
    	}
    	
    	select {}
    }
    
    

    Maybe TraceHandler's operationSpans concurrent operate.

    bug help wanted 
    opened by LinkinStars 4
  • Is there any example to get started with goflow?

    Is there any example to get started with goflow?

    I tried to implement your library. But I can't for Creating More Complex DAG examples as mentioned on READ.md.

    I tried following but the functions never executed:

    package main
    
    import (
    	"fmt"
    	"github.com/faasflow/goflow"
    	flow "github.com/faasflow/lib/goflow"
    	"log"
    )
    
    // Workload function
    func doSomething(data []byte, option map[string][]string) ([]byte, error) {
    	log.Println(fmt.Sprintf("you said \"%s\"", string(data)))
    	return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
    }
    
    // Workload function
    func loadProfile(data []byte, option map[string][]string) ([]byte, error) {
    	log.Println(fmt.Sprintf("load profile \"%s\"", string(data)))
    	return []byte(fmt.Sprintf("load profile \"%s\"", string(data))), nil
    }
    // Workload function
    func getPresignedURLForImage(data []byte, option map[string][]string) ([]byte, error) {
    	log.Println(fmt.Sprintf("image url \"%s\"", string(data)))
    	return []byte(fmt.Sprintf("image url \"%s\"", string(data))), nil
    }
    // Workload function
    func detectFace(data []byte, option map[string][]string) ([]byte, error) {
    	log.Println(fmt.Sprintf("detect face \"%s\"", string(data)))
    	return []byte(fmt.Sprintf("detect face \"%s\"", string(data))), nil
    }
    // Workload function
    func markProfileBasedOnStatus(data []byte, option map[string][]string) ([]byte, error) {
    	log.Println(fmt.Sprintf("mask profile \"%s\"", string(data)))
    	return []byte(fmt.Sprintf("mask profile \"%s\"", string(data))), nil
    }
    
    // Define provide definition of the workflow
    func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    	dag := f.Dag()
    	dag.Node("get-kyc-image").Apply("load-profile", loadProfile).
    		Apply("get-image-url", getPresignedURLForImage).
    		Apply("face-detect", detectFace).
    		Apply("mark-profile", markProfileBasedOnStatus)
    	dag.Edge("get-kyc-image", "face-detect")
    	dag.Edge("face-detect", "mark-profile")
    	return nil
    }
    
    func main() {
    	fs := &goflow.FlowService{
    		Port:                8080,
    		RedisURL:            "localhost:6379",
    		OpenTraceUrl:        "localhost:5775",
    		WorkerConcurrency:   5,
    	}
    	fs.Start("myflow", DefineWorkflow)
    }
    
    question resolved 
    opened by sujit-baniya 4
  • Nice framework

    Nice framework

    I have been contemplating ideas around a workflow library for some time, this looks good. I think one question I'd have is, what would a Flow interface look like? We want to define that abstraction potentially in Micro as a way of doing orchestration. Once you have enough services this makes sense.

    opened by asim 2
  • Unit testing and coverage are missing

    Unit testing and coverage are missing

    Goflow: Unit testing and coverage

    Goal

    Provide a 95% unit test mocking where it's opportune with gomock. No tests are present in the current code base.

    opened by giorgiozoppi 0
  • Support for a UI dag builder

    Support for a UI dag builder

    Goflow DAG builder: Provide a UI DAG builder

    Goal

    The goal of this story is to make it easier to build dag from a data engineer prospective. A data engineer shall not be able to be proficent in Go and basically we want to provide an UI where he/she is able to draw a dag. We want to leverage the https://github.com/AlexImb/vue-dag library to obtain that scope.

    Requirement of the story:

    Functional requirement

    As end user i want to:

    1. Create a graph by the UI.
    2. Select for each node the workflow code and edit that.
    3. Save/Resume the workflow

    Tecnical requirements:

    So the scope of this story is: 1 . Create a UI that is able to draw/save a graph and schedule its execution. 2.. Provide a REST api that is able to submit a transpiled graph.

    opened by giorgiozoppi 0
  • abstract the queue mechanism to support other mechanisms.

    abstract the queue mechanism to support other mechanisms.

    Goflow Queue Abstraction.

    Goal

    We want to support multi distributed queue support other then a Redis:

    • support for RedPanda
    • support for RabbitMQ

    So we might want to abstract the queue mechanism of this workflow manager to allow this extensibility.

    opened by giorgiozoppi 0
  • Pause, Resume and Stop don't work fine

    Pause, Resume and Stop don't work fine

    Expect: Pause can pause the specified workflow, and Resume can resume it. Stop can stop active workflow and set its status to Finished.

    Actually:

    1. After running Pasue, the workflow remains executed until the end.
    ...
    2022/09/09 17:12:45 [request `my_test_flow-1662714755`] intermediate result from node 0_1_Node-1 to 0_2_Node-2 stored as 0_1_Node-1--0_2_Node-2
    2022/09/09 17:12:45 [request `my_test_flow-1662714755`] performing request for Node 0_2_Node-2, indegree count is 1
    2022/09/09 17:12:46 [request `my_test_flow-1662714755`] request submitted for Node 0_2_Node-2
    2022/09/09 17:12:46 [request `my_test_flow-1662714755`] partial request received
    2022/09/09 17:12:46 [request `my_test_flow-1662714755`] intermediate result from Node 0_1_Node-1 to Node 0_2_Node-2 retrieved from 0_1_Node-1--0_2_Node-2
    2022/09/09 17:12:46 [request `my_test_flow-1662714755`] executing node 0_2_Node-2
    Test Node: O_O "TestData"
    2022/09/09 17:12:47 Pausing request my_test_flow-1662714755 of flow my_test_flow
    Node End
    2022/09/09 17:12:56 [request `my_test_flow-1662714755`] completed execution of node 0_2_Node-2
    2022/09/09 17:12:56 [request `my_test_flow-1662714755`] intermediate result from node 0_2_Node-2 to 0_3_Node-3 stored as 0_2_Node-2--0_3_Node-3
    2022/09/09 17:12:56 [request `my_test_flow-1662714755`] performing request for Node 0_3_Node-3, indegree count is 1
    2022/09/09 17:12:56 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_3_Node-3
    2022/09/09 17:12:56 [request `my_test_flow-1662714755`] request submitted for Node 0_3_Node-3
    2022/09/09 17:12:57 [request `my_test_flow-1662714755`] partial request received
    2022/09/09 17:12:57 [request `my_test_flow-1662714755`] intermediate result from Node 0_2_Node-2 to Node 0_3_Node-3 retrieved from 0_2_Node-2--0_3_Node-3
    2022/09/09 17:12:57 [request `my_test_flow-1662714755`] executing node 0_3_Node-3
    Test Node: O_O "O_O "TestData""
    Node End
    2022/09/09 17:13:07 [request `my_test_flow-1662714755`] completed execution of node 0_3_Node-3
    2022/09/09 17:13:07 [request `my_test_flow-1662714755`] intermediate result from node 0_3_Node-3 to 0_4_Node-4 stored as 0_3_Node-3--0_4_Node-4
    2022/09/09 17:13:07 [request `my_test_flow-1662714755`] performing request for Node 0_4_Node-4, indegree count is 1
    2022/09/09 17:13:07 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_4_Node-4
    2022/09/09 17:13:07 [request `my_test_flow-1662714755`] request submitted for Node 0_4_Node-4
    2022/09/09 17:13:08 [request `my_test_flow-1662714755`] partial request received
    2022/09/09 17:13:08 [request `my_test_flow-1662714755`] intermediate result from Node 0_3_Node-3 to Node 0_4_Node-4 retrieved from 0_3_Node-3--0_4_Node-4
    2022/09/09 17:13:08 [request `my_test_flow-1662714755`] executing node 0_4_Node-4
    ...
    
    1. Panic when exec Stop :
    2022/09/09 17:15:03 Pausing request my_test_flow for flow my_test_flow-1662714890
    Node End
    2022/09/09 17:15:11 [request `my_test_flow-1662714890`] completed execution of node 0_2_Node-2
    2022/09/09 17:15:11 [request `my_test_flow-1662714890`] failed to obtain pipeline state, error failed to get key core.my_test_flow.my_test_flow-1662714890.request-state, nil
    2022/09/09 17:15:11 [request `my_test_flow-1662714890`] pipeline is not active
    panic: [request `my_test_flow-1662714890`] Pipeline is not active
    
    goroutine 30 [running]:
    github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).findNextNodeToExecute(0xc0002c7c78)
            /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:609 +0x614
    github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc0002c7c78, 0xc0002c7c58)
            /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:1266 +0x9c8
    github.com/s8sg/goflow/core/runtime/controller/handler.PartialExecuteFlowHandler(0xc0002c7dd0, 0xc00028ecc0?, {0x14771f8, 0xc0002a6410})
            /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/runtime/controller/handler/partial_execute_flow_handler.go:25 +0xe5
    github.com/s8sg/goflow/runtime.(*FlowRuntime).handlePartialRequest(0xc00012af70, 0xc00028ecc0)
            /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:457 +0x1c6
    github.com/s8sg/goflow/runtime.(*FlowRuntime).handleRequest(0xc0002e6380?, 0xc0002e21e0?, {0xc0002a43b0?, 0x13260a0?})
            /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:414 +0xc8
    github.com/s8sg/goflow/runtime.(*FlowRuntime).Consume(0xc00012af70, {0x1473a98, 0xc0002320e0})
            /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:394 +0x2ae
    github.com/adjust/rmq/v4.(*redisQueue).consumerConsume(0xc0001d4000, {0x14701a0, 0xc00012af70})
            /Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:282 +0x9f
    created by github.com/adjust/rmq/v4.(*redisQueue).AddConsumer
            /Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:260 +0xe5
    
    

    Here is the test code:

    package main
    
    import (
    	"flag"
    	"fmt"
    	"time"
    
    	flow "github.com/s8sg/goflow/flow/v1"
    	goflow "github.com/s8sg/goflow/v1"
    )
    
    const (
    	flowName = "my_test_flow"
    )
    
    var (
    	server = flag.Bool("s", false, "start server")
    	op     = flag.String("o", "exec", "operation")
    	rid    = flag.String("i", "", "request id")
    )
    
    func workload(data []byte, option map[string][]string) ([]byte, error) {
    	fmt.Printf("Test Node: %s\n", string(data))
    	time.Sleep(10 * time.Second)
    	fmt.Println("Node End")
    	return []byte(fmt.Sprintf("O_O \"%s\"", string(data))), nil
    }
    
    func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
    	dag := workflow.Dag()
    	dag.Node("Node-1", workload)
    	dag.Node("Node-2", workload)
    	dag.Node("Node-3", workload)
    	dag.Node("Node-4", workload)
    	dag.Node("Node-5", workload)
    	dag.Node("Node-6", workload)
    	dag.Node("Node-7", workload)
    	dag.Node("Node-8", workload)
    	dag.Edge("Node-1", "Node-2")
    	dag.Edge("Node-2", "Node-3")
    	dag.Edge("Node-3", "Node-4")
    	dag.Edge("Node-4", "Node-5")
    	dag.Edge("Node-5", "Node-6")
    	dag.Edge("Node-6", "Node-7")
    	dag.Edge("Node-7", "Node-8")
    	return nil
    }
    
    func startServer() {
    	fs := &goflow.FlowService{
    		Port:              8088,
    		RedisURL:          "127.0.0.1:6379",
    		WorkerConcurrency: 2,
    		RetryCount:        1,
    		DebugEnabled:      true,
    	}
    	err := fs.Register(flowName, DefineWorkflow)
    	if err != nil {
    		panic(err)
    	}
    
    	err = fs.Start()
    	if err != nil {
    		panic(err)
    	}
    }
    
    func ExecOp(op string) string {
    	fs := &goflow.FlowService{
    		RedisURL: "127.0.0.1:6379",
    	}
    
    	reqId := *rid
    	if op == "exec" {
    		reqId = fmt.Sprintf("%s-%d", flowName, time.Now().Unix())
    	}
    	if len(reqId) == 0 {
    		panic("request id is empty")
    	}
    
    	switch op {
    	case "exec":
    		err := fs.Execute(flowName, &goflow.Request{
    			Body:      []byte("TestData"),
    			RequestId: reqId,
    		})
    		if err != nil {
    			panic(err)
    		}
    		return reqId
    
    	case "pause":
    		err := fs.Pause(flowName, reqId)
    		if err != nil {
    			panic(err)
    		}
    	case "resume":
    		err := fs.Resume(flowName, reqId)
    		if err != nil {
    			panic(err)
    		}
    	case "stop":
    		err := fs.Stop(flowName, reqId)
    		if err != nil {
    			panic(err)
    		}
    	}
    
    	return reqId
    }
    
    func main() {
    	flag.Parse()
    
    	if *server {
    		startServer()
    		return
    	}
    
    	reqId := ExecOp(*op)
    	fmt.Println(reqId)
    }
    

    Start the server:

    ./wf -s
    

    Client:

    ./wf -o exec 
    ./wf -o pause -i [request_id]
    ./wf -o resume -i [request_id]
    ./wf -o stop -i [request_id]
    
    opened by danli001 0
Releases(v0.1.0)
Owner
Vanu
I'm in love creating software and thrilled to be able to work on it full time and then some.
Vanu
A golang CTF competition platform with high-performance, security and low hardware requirements.

CTFgo - CTF Platform written in Golang A golang CTF competition platform with high-performance, security and low hardware requirements. Live Demo • Di

CTFgo 2 Oct 20, 2022
topolvm operator provide kubernetes local storage which is light weight and high performance

Topolvm-Operator Topolvm-Operator is an open source cloud-native local storage orchestrator for Kubernetes, which bases on topolvm. Supported environm

Alauda.io 24 Nov 24, 2022
An high performance and ops-free local storage solution for Kubernetes.

Carina carina 是一个CSI插件,在Kubernetes集群中提供本地存储持久卷 项目状态:开发测试中 CSI Version: 1.3.0 Carina architecture 支持的环境 Kubernetes:1.20 1.19 1.18 Node OS:Linux Filesys

BoCloud 10 May 18, 2022
Carina: an high performance and ops-free local storage for kubernetes

Carina English | 中文 Background Storage systems are complex! There are more and more kubernetes native storage systems nowadays and stateful applicatio

null 514 Nov 25, 2022
Workflow Orchestrator

Adagio - A Workflow Orchestrator This project is currently in a constant state of flux. Don't expect it to work. Thank you o/ Adagio is a workflow exe

George 88 Sep 2, 2022
Devtron is an open source software delivery workflow for kubernetes written in go.

Devtron is an open source software delivery workflow for kubernetes written in go.

Devtron Labs 2.7k Nov 28, 2022
Export GitHub Action Workflow data as traces via OTLP

Github Action to OTLP NOTE: This is still work in progress This action outputs Github Action workflows and jobs details to OTLP via gRPC. Inputs endpo

alrex 21 Nov 1, 2022
A reverse engineered github actions compatible self-hosted runner using nektos/act to execute your workflow steps

github-act-runner A reverse engineered github actions compatible self-hosted runner using nektos/act to execute your workflow steps. Unlike the offici

null 101 Nov 26, 2022
Workflow engine for Kubernetes

What is Argo Workflows? Argo Workflows is an open source container-native workflow engine for orchestrating parallel jobs on Kubernetes. Argo Workflow

Argo Project 12.1k Nov 24, 2022
A batch scheduler of kubernetes for high performance workload, e.g. AI/ML, BigData, HPC

kube-batch kube-batch is a batch scheduler for Kubernetes, providing mechanisms for applications which would like to run batch jobs leveraging Kuberne

Kubernetes SIGs 1k Nov 14, 2022
A high-performance Directed-Acyclic-Graph JIT in Go

GAG - A Directed-Acyclic-Graph JIT in Go GAG is a library I created while developing https://isobot.io to experiment with different ways of implementi

V-X 3 Mar 16, 2022
StoneWork is a high-performance, all-(CNFs)-in-one network solution.

StoneWork, high-performance dataplane, modular control-plane solution StoneWork is used by PANTHEON.tech to integrate its CNFs on top of a single shar

PANTHEON.tech 18 Oct 18, 2022
A high performance online bookstore system.

HPOB 高性能网上书店 A high performance online bookstore system. Introduction 介绍 一个基于Gin、gorm、viper、zap等库的web服务器,实现了网上书店相关接口。 Summary 概要 使用go语言编写的,基于gin、gorm、

邹永赫 2 Apr 27, 2022
High-performance GitHub webhook events toolset for Go :rocket:

githubevents GitHub webhook events toolset for Go githubevents is a webhook events toolset for the Go programming language inspired by octokit/webhook

Christian Bargmann 35 Nov 7, 2022
Grafana Tempo is a high volume, minimal dependency distributed tracing backend.

Grafana Tempo is an open source, easy-to-use and high-scale distributed tracing backend. Tempo is cost-efficient, requiring only object storage to ope

Grafana Labs 2.6k Nov 28, 2022
An open-source, distributed, cloud-native CD (Continuous Delivery) product designed for developersAn open-source, distributed, cloud-native CD (Continuous Delivery) product designed for developers

Developer-oriented Continuous Delivery Product ⁣ English | 简体中文 Table of Contents Zadig Table of Contents What is Zadig Quick start How to use? How to

null 0 Oct 19, 2021
FaaSNet: Scalable and Fast Provisioning of Custom Serverless Container Runtimes at Alibaba Cloud Function Compute (USENIX ATC'21)

FaaSNet FaaSNet is the first system that provides an end-to-end, integrated solution for FaaS-optimized container runtime provisioning. FaaSNet uses l

LeapLab @ CS_GMU 39 Nov 17, 2022
Tigris is a modern, scalable backend for building real-time websites and apps.

Tigris Data Getting started These instructions will get you through setting up Tigris Data locally as Docker containers. Prerequisites Make sure that

Tigris Data Inc 383 Nov 21, 2022
Resilient, scalable Brainf*ck, in the spirit of modern systems design

Brainf*ck-as-a-Service A little BF interpreter, inspired by modern systems design trends. How to run it? docker-compose up -d bash hello.sh # Should p

Serge Zaitsev 145 Nov 22, 2022