Flow-based and dataflow programming library for Go (golang)

Overview

GoFlow - Dataflow and Flow-based programming library for Go (golang)

Build Status codecov

Status of this branch (WIP)

Warning: you are currently on v1 branch of GoFlow. v1 is a revisit and refactoring of the original GoFlow code which remained almost unchanged for 7 years. This branch is deep in progress, no stability guaranteed. API also may change.

If your code depends on the old implementation, you can build it using release 0.1.

--

GoFlow is a lean and opinionated implementation of Flow-based programming in Go that aims at designing applications as graphs of components which react to data that flows through the graph.

The main properties of the proposed model are:

  • Concurrent - graph nodes run in parallel.
  • Structural - applications are described as components, their ports and connections between them.
  • Reactive/active - system's behavior is how components react to events or how they handle their lifecycle.
  • Asynchronous/synchronous - there is no determined order in which events happen, unless you demand for such order.
  • Isolated - sharing is done by communication, state is not shared.

Getting started

If you don't have the Go compiler installed, read the official Go install guide.

Use go tool to install the package in your packages tree:

go get github.com/trustmaster/goflow

Then you can use it in import section of your Go programs:

import "github.com/trustmaster/goflow"

Basic Example

Below there is a listing of a simple program running a network of two processes.

Greeter example diagram

This first one generates greetings for given names, the second one prints them on screen. It demonstrates how components and graphs are defined and how they are embedded into the main program.

package main

import (
	"fmt"
	"github.com/trustmaster/goflow"
)

// Greeter sends greetings
type Greeter struct {
	Name           <-chan string // input port
	Res            chan<- string // output port
}

// Process incoming data
func (c *Greeter) Process() {
	// Keep reading incoming packets
	for name := range c.Name {
		greeting := fmt.Sprintf("Hello, %s!", name)
		// Send the greeting to the output port
		c.Res <- greeting
	}
}

// Printer prints its input on screen
type Printer struct {
	Line <-chan string // inport
}

// Process prints a line when it gets it
func (c *Printer) Process() {
	for line := range c.Line {
		fmt.Println(line)
	}
}

// NewGreetingApp defines the app graph
func NewGreetingApp() *goflow.Graph {
	n := goflow.NewGraph()
	// Add processes to the network
	n.Add("greeter", new(Greeter))
	n.Add("printer", new(Printer))
	// Connect them with a channel
	n.Connect("greeter", "Res", "printer", "Line")
	// Our net has 1 inport mapped to greeter.Name
	n.MapInPort("In", "greeter", "Name")
	return n
}

func main() {
	// Create the network
	net := NewGreetingApp()
	// We need a channel to talk to it
	in := make(chan string)
	net.SetInPort("In", in)
	// Run the net
	wait := goflow.Run(net)
	// Now we can send some names and see what happens
	in <- "John"
	in <- "Boris"
	in <- "Hanna"
	// Send end of input
	close(in)
	// Wait until the net has completed its job
	<-wait
}

Looks a bit heavy for such a simple task but FBP is aimed at a bit more complex things than just printing on screen. So in more complex an realistic examples the infractructure pays the price.

You probably have one question left even after reading the comments in code: why do we need to wait for the finish signal? This is because flow-based world is asynchronous and while you expect things to happen in the same sequence as they are in main(), during runtime they don't necessarily follow the same order and the application might terminate before the network has done its job. To avoid this confusion we listen for a signal on network's wait channel which is sent when the network finishes its job.

Terminology

Here are some Flow-based programming terms used in GoFlow:

  • Component - the basic element that processes data. Its structure consists of input and output ports and state fields. Its behavior is the set of event handlers. In OOP terms Component is a Class.
  • Connection - a link between 2 ports in the graph. In Go it is a channel of specific type.
  • Graph - components and connections between them, forming a higher level entity. Graphs may represent composite components or entire applications. In OOP terms Graph is a Class.
  • Network - is a Graph instance running in memory. In OOP terms a Network is an object of Graph class.
  • Port - is a property of a Component or Graph through which it communicates with the outer world. There are input ports (Inports) and output ports (Outports). For GoFlow components it is a channel field.
  • Process - is a Component instance running in memory. In OOP terms a Process is an object of Component class.

More terms can be found in Flow-based Wiki Terms and FBP wiki.

Documentation

Contents

  1. Components
    1. Ports and Events
    2. Process
    3. State
  2. Graphs
    1. Structure definition
    2. Behavior

Package docs

Documentation for the flow package can be accessed using standard godoc tool, e.g.

godoc github.com/trustmaster/goflow

Links

Here are related projects and resources:

TODO

  • Integration with NoFlo-UI/Flowhub (in progress)
  • Distributed networks via TCP/IP and UDP
  • Reflection and monitoring of networks
Comments
  • Add support for the JSON graph definition format

    Add support for the JSON graph definition format

    It would be great if goflow could implement the same JSON format for defining graphs as we use in NoFlo. This would eventually help with utilizing tools like the UI across both systems.

    feature 
    opened by bergie 24
  • Automatic channel creation on network Connect()

    Automatic channel creation on network Connect()

    What annoyed me originally with GoFlow is that you need to create a channel manually every time you connect 2 processes in a graph:

    net.Connect("src", "Out", "dst", "In", make(chan string))
    

    It would be better if GoFlow inferred the channel type using reflection and created it on its own, so the above code could be simplified just to this:

    net.Connect("src", "Out", "dst", "In")
    

    Back in 2012 this was not possible because Go's reflect didn't let you create a bidirectional channel out of a unidirectional. Nowadays it is possible, so we can give it another try.

    enhancement 
    opened by trustmaster 17
  • Feature request: Option to get synchronous execution of processes

    Feature request: Option to get synchronous execution of processes

    Just a placeholder for the idea that @trustmaster came up with during the chat today, to add an option to make a process execute synchronously, thus following the FIFO pattern typical of many FBP systems.

    feature 
    opened by samuell 13
  • Not getting all output, and output order not preserved

    Not getting all output, and output order not preserved

    Hello Vladimir!

    Don't know if this is merely support request, or a possible bug ...

    I'm getting some problems with the following code: https://gist.github.com/samuell/6164115 ... in combination with a few simple components from https://github.com/samuell/blow

    ... and using this file as input: https://gist.github.com/samuell/6164135 (just has to lie in the same folder), which is here of a very simple format, to make it easy to spot errors: One line "AAAA...", and the other line "CCC..." and so on, for 99 lines.

    The program is supposed to simply convert 'A' -> 'T', 'T' -> 'A', 'C' -> 'G' and 'G' -> 'C' (To get the "complementary" DNA base sequence), back and fourth 4 times, which should finally give back the same result as the input file.

    I get very strange results.

    Problem nr 1, I sometimes don't get all the output back. When I run the compiled program and pipes it directly to "wc", I DO get 99 lines, which is the same as the input file:

    [samuel basecompl]$ ./basecompl_blow|wc -l
    99
    

    But if I pipe the output to a separate file, and then counts the lines, I get a varying number of lines of output ... like 63, 73, 88, etc. Seems like the output printing does not have time to finish before the pipe is closed?

    [samuel basecompl]$ ./basecompl_blow > out.txt 
    [samuel basecompl]$ wc -l out.txt 
    68 out.txt
    

    Problem nr 2 is that the output is not in the same order as the input. While the input looks like:

    AAA...
    CCC...
    AAA...
    CCC...
    ... and so on ..
    

    .. the output has changed the order of the lines, in an irregular way. See here for an example: https://gist.github.com/samuell/6164186

    opened by samuell 12
  • TestInternalConnectionIIP test panics randomly: IIP write to a closed channel

    TestInternalConnectionIIP test panics randomly: IIP write to a closed channel

    TestInternalConnectionIIP panics when the IIP happens to be concurrently sent after the graph is already being shut down and the input channel is already closed.

    === RUN   TestInternalConnectionIIP
    --- FAIL: TestInternalConnectionIIP (0.00s)
    panic: send on closed channel
    
    goroutine 142 [running]:
    reflect.chansend(0xc0001ab140, 0xc000126388, 0x0, 0x569c7b)
    	/usr/local/go/src/runtime/chan.go:665 +0x4b
    reflect.Value.send(0x52bf00, 0xc0001ab140, 0x12, 0x52cb00, 0xc000126388, 0x82, 0x0, 0x596920)
    	/usr/local/go/src/reflect/value.go:1524 +0x118
    reflect.Value.Send(0x52bf00, 0xc0001ab140, 0x12, 0x52cb00, 0xc000126388, 0x82)
    	/usr/local/go/src/reflect/value.go:1506 +0x90
    github.com/trustmaster/goflow.(*Graph).sendIIPs.func1(0x52bf00, 0xc0001ab140, 0x12, 0x52cb00, 0xc000126388, 0x82, 0x52bf00)
    	/home/ak/dev/go/src/github.com/trustmaster/goflow/graph_iip.go:78 +0x61
    created by github.com/trustmaster/goflow.(*Graph).sendIIPs
    	/home/ak/dev/go/src/github.com/trustmaster/goflow/graph_iip.go:77 +0xf3
    exit status 2
    FAIL	github.com/trustmaster/goflow	0.008s
    

    The root cause of this issue boils down to the fact that goroutine A (spawned by sendIIPs()) attempts sending to a channel concurrently closed by goroutine B (executing echo component 1: e1). The chan close is triggered by closing the graph in port: close(in) in a goroutine spawned by TestInternalConnectionIIP().

    There is no quick and simple fix for this since it's architecturally inaccurate to just close channels from one of multiple writing goroutines. There is a lot of material around this on the net, for example, good read here.

    Thus, I'd like to open a conversation around this to get understanding of the requirements. For example:

    1. Other than IIPs, do we envision multiple writers to a single input channel in a graph?
    2. Why do we need IIPs to be sent concurrently rather than synchronously at the graph start?
    opened by akokhanovskyi 9
  • What is the current state of this project?

    What is the current state of this project?

    I see that it has many issues that remains open. The last commit was last year. @trustmaster do you have any specific plan for this project?

    Thank you

    opened by claudemirogb 7
  • Array ports

    Array ports

    FBP doesn't allow connecting a single output to multiple inputs, though it allows for explicit replicator components in the network.

    Currently it isn't very easy to write components operating on generic data in Go without writing too many type assertions, so generic components would require a code generator tool.

    Though, an idea of broadcast ports replicating data to multiple subscribers is worth considering.

    feature 
    opened by trustmaster 6
  • Enable godot, drop Capacity from the GraphConfig

    Enable godot, drop Capacity from the GraphConfig

    I decided to enable and fix godot while browsing the code. You will also notice that I got slightly carried away and dropped the Capacity setting from the GraphConfig, which does not feel like a helpful optimization, but rather raises questions as to why all these capacities are created equal. I think Go runtime is more than capable of scaling all containers appropriately.

    To be honest, I'm also not a big fan of the GraphConfig.BufferSize, but let's leave that aside for now.

    I've also tried to slightly improve and add some comments here and there.

    opened by akokhanovskyi 4
  • Races, lots of races

    Races, lots of races

    Output of go test -v -race, I can't even get the tests to finish executing.

    === RUN   TestSingleInput
    --- PASS: TestSingleInput (0.00s)
    === RUN   TestStateLock
    --- PASS: TestStateLock (0.12s)
    === RUN   TestSyncLock
    --- PASS: TestSyncLock (0.03s)
    === RUN   TestInitFinish
    --- PASS: TestInitFinish (0.00s)
    === RUN   TestClose
    --- PASS: TestClose (0.00s)
    === RUN   TestShutdown
    --- PASS: TestShutdown (0.00s)
    === RUN   TestPoolMode
    --- PASS: TestPoolMode (0.00s)
    === RUN   TestStopProc
    ==================
    WARNING: DATA RACE
    Write at 0x00c42023c5d0 by goroutine 178:
      reflect.Value.SetBool()
          /usr/local/go/src/reflect/value.go:1364 +0x60
      github.com/abferm/goflow.RunProc()
          /home/aferm/go/src/github.com/abferm/goflow/component.go:322 +0x18c8
      github.com/abferm/goflow.TestStopProc()
          /home/aferm/go/src/github.com/abferm/goflow/component_test.go:381 +0x2ae
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
    
    Previous read at 0x00c42023c5d0 by goroutine 152:
      reflect.Value.Bool()
          /usr/local/go/src/reflect/value.go:248 +0x52
      github.com/abferm/goflow.RunProc.func5()
          /home/aferm/go/src/github.com/abferm/goflow/component.go:234 +0x14a
      github.com/abferm/goflow.RunProc.func8()
          /home/aferm/go/src/github.com/abferm/goflow/component.go:331 +0x54
    
    Goroutine 178 (running) created at:
      testing.(*T).Run()
          /usr/local/go/src/testing/testing.go:697 +0x543
      testing.runTests.func1()
          /usr/local/go/src/testing/testing.go:882 +0xaa
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
      testing.runTests()
          /usr/local/go/src/testing/testing.go:888 +0x4e0
      testing.(*M).Run()
          /usr/local/go/src/testing/testing.go:822 +0x1c3
      main.main()
          github.com/abferm/goflow/_test/_testmain.go:88 +0x20f
    
    Goroutine 152 (finished) created at:
      github.com/abferm/goflow.RunProc()
          /home/aferm/go/src/github.com/abferm/goflow/component.go:340 +0x192e
      github.com/abferm/goflow.TestStopProc()
          /home/aferm/go/src/github.com/abferm/goflow/component_test.go:361 +0x13e
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
    ==================
    --- FAIL: TestStopProc (0.00s)
    	testing.go:610: race detected during execution of test
    === RUN   TestLooper
    --- PASS: TestLooper (0.00s)
    === RUN   TestFactory
    --- PASS: TestFactory (0.00s)
    === RUN   TestFactoryConnection
    --- PASS: TestFactoryConnection (0.00s)
    === RUN   TestFactorySubgraph
    --- PASS: TestFactorySubgraph (0.00s)
    === RUN   TestRuntimeNetwork
    --- PASS: TestRuntimeNetwork (0.00s)
    === RUN   TestConnection
    --- PASS: TestConnection (0.00s)
    === RUN   TestComposite
    --- PASS: TestComposite (0.00s)
    === RUN   TestMultiOutChannel
    --- PASS: TestMultiOutChannel (0.00s)
    === RUN   TestIIP
    --- PASS: TestIIP (0.00s)
    === RUN   TestStopNet
    ==================
    WARNING: DATA RACE
    Read at 0x00c4200b2c50 by goroutine 120:
      github.com/abferm/goflow.(*Graph).Stop()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:771 +0x57
      github.com/abferm/goflow.TestStopNet()
          /home/aferm/go/src/github.com/abferm/goflow/network_test.go:391 +0x1d6
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
    
    Previous write at 0x00c4200b2c50 by goroutine 78:
      github.com/abferm/goflow.(*Graph).run()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:651 +0x26c
      github.com/abferm/goflow.RunNet.func1()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c
    
    Goroutine 120 (running) created at:
      testing.(*T).Run()
          /usr/local/go/src/testing/testing.go:697 +0x543
      testing.runTests.func1()
          /usr/local/go/src/testing/testing.go:882 +0xaa
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
      testing.runTests()
          /usr/local/go/src/testing/testing.go:888 +0x4e0
      testing.(*M).Run()
          /usr/local/go/src/testing/testing.go:822 +0x1c3
      main.main()
          github.com/abferm/goflow/_test/_testmain.go:88 +0x20f
    
    Goroutine 78 (running) created at:
      github.com/abferm/goflow.RunNet()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:940 +0xdb
      github.com/abferm/goflow.TestStopNet()
          /home/aferm/go/src/github.com/abferm/goflow/network_test.go:380 +0x123
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
    ==================
    ==================
    WARNING: DATA RACE
    Read at 0x00c42023ccf0 by goroutine 83:
      reflect.Value.Bool()
          /usr/local/go/src/reflect/value.go:248 +0x52
      github.com/abferm/goflow.RunProc.func3()
          /home/aferm/go/src/github.com/abferm/goflow/component.go:181 +0xc3
      github.com/abferm/goflow.RunProc.func7()
          /home/aferm/go/src/github.com/abferm/goflow/component.go:299 +0x375
    
    Previous write at 0x00c42023ccf0 by goroutine 78:
      reflect.Value.SetBool()
          /usr/local/go/src/reflect/value.go:1364 +0x60
      github.com/abferm/goflow.RunProc()
          /home/aferm/go/src/github.com/abferm/goflow/component.go:322 +0x18c8
      github.com/abferm/goflow.(*Graph).run()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:648 +0x106f
      github.com/abferm/goflow.RunNet.func1()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c
    
    Goroutine 83 (running) created at:
      github.com/abferm/goflow.RunProc()
          /home/aferm/go/src/github.com/abferm/goflow/component.go:317 +0x19de
      github.com/abferm/goflow.(*Graph).run()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:648 +0x106f
      github.com/abferm/goflow.RunNet.func1()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c
    
    Goroutine 78 (running) created at:
      github.com/abferm/goflow.RunNet()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:940 +0xdb
      github.com/abferm/goflow.TestStopNet()
          /home/aferm/go/src/github.com/abferm/goflow/network_test.go:380 +0x123
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
    ==================
    --- FAIL: TestStopNet (0.00s)
    	testing.go:610: race detected during execution of test
    === RUN   TestReconnection
    ==================
    WARNING: DATA RACE
    Write at 0x00c42012c290 by goroutine 251:
      reflect.Value.Set()
          /usr/local/go/src/reflect/value.go:1355 +0x109
      github.com/abferm/goflow.unsetProcPort()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:465 +0x244
      github.com/abferm/goflow.(*Graph).Disconnect()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:482 +0x1eb
      github.com/abferm/goflow.TestReconnection()
          /home/aferm/go/src/github.com/abferm/goflow/network_test.go:436 +0x9b6
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
    
    Previous read at 0x00c42012c290 by goroutine 257:
      reflect.Value.pointer()
          /usr/local/go/src/reflect/value.go:90 +0x85
      reflect.Select()
          /usr/local/go/src/reflect/value.go:2016 +0x87c
      github.com/abferm/goflow.RunProc.func7()
          /home/aferm/go/src/github.com/abferm/goflow/component.go:295 +0xa4
    
    Goroutine 251 (running) created at:
      testing.(*T).Run()
          /usr/local/go/src/testing/testing.go:697 +0x543
      testing.runTests.func1()
          /usr/local/go/src/testing/testing.go:882 +0xaa
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
      testing.runTests()
          /usr/local/go/src/testing/testing.go:888 +0x4e0
      testing.(*M).Run()
          /usr/local/go/src/testing/testing.go:822 +0x1c3
      main.main()
          github.com/abferm/goflow/_test/_testmain.go:88 +0x20f
    
    Goroutine 257 (running) created at:
      github.com/abferm/goflow.RunProc()
          /home/aferm/go/src/github.com/abferm/goflow/component.go:317 +0x19de
      github.com/abferm/goflow.(*Graph).run()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:648 +0x106f
      github.com/abferm/goflow.RunNet.func1()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c
    ==================
    ==================
    WARNING: DATA RACE
    Read at 0x00c42012c180 by goroutine 253:
      runtime.mapiternext()
          /usr/local/go/src/runtime/hashmap.go:730 +0x0
      github.com/abferm/goflow.(*Graph).run()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:642 +0x238
      github.com/abferm/goflow.RunNet.func1()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c
    
    Previous write at 0x00c42012c180 by goroutine 251:
      runtime.mapassign()
          /usr/local/go/src/runtime/hashmap.go:485 +0x0
      github.com/abferm/goflow.(*Graph).Add()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:191 +0x2a0
      github.com/abferm/goflow.TestReconnection()
          /home/aferm/go/src/github.com/abferm/goflow/network_test.go:441 +0xa0f
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
    
    Goroutine 253 (running) created at:
      github.com/abferm/goflow.RunNet()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:940 +0xdb
      github.com/abferm/goflow.TestReconnection()
          /home/aferm/go/src/github.com/abferm/goflow/network_test.go:420 +0x82c
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
    
    Goroutine 251 (running) created at:
      testing.(*T).Run()
          /usr/local/go/src/testing/testing.go:697 +0x543
      testing.runTests.func1()
          /usr/local/go/src/testing/testing.go:882 +0xaa
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
      testing.runTests()
          /usr/local/go/src/testing/testing.go:888 +0x4e0
      testing.(*M).Run()
          /usr/local/go/src/testing/testing.go:822 +0x1c3
      main.main()
          github.com/abferm/goflow/_test/_testmain.go:88 +0x20f
    ==================
    ==================
    WARNING: DATA RACE
    Write at 0x00c420314080 by goroutine 253:
      github.com/abferm/goflow.(*Graph).run()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:651 +0x26c
      github.com/abferm/goflow.RunNet.func1()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:931 +0x3c
    
    Previous read at 0x00c420314080 by goroutine 251:
      github.com/abferm/goflow.(*Graph).RunProc()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:748 +0x4c
      github.com/abferm/goflow.TestReconnection()
          /home/aferm/go/src/github.com/abferm/goflow/network_test.go:451 +0xb50
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
    
    Goroutine 253 (running) created at:
      github.com/abferm/goflow.RunNet()
          /home/aferm/go/src/github.com/abferm/goflow/network.go:940 +0xdb
      github.com/abferm/goflow.TestReconnection()
          /home/aferm/go/src/github.com/abferm/goflow/network_test.go:420 +0x82c
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
    
    Goroutine 251 (running) created at:
      testing.(*T).Run()
          /usr/local/go/src/testing/testing.go:697 +0x543
      testing.runTests.func1()
          /usr/local/go/src/testing/testing.go:882 +0xaa
      testing.tRunner()
          /usr/local/go/src/testing/testing.go:657 +0x107
      testing.runTests()
          /usr/local/go/src/testing/testing.go:888 +0x4e0
      testing.(*M).Run()
          /usr/local/go/src/testing/testing.go:822 +0x1c3
      main.main()
          github.com/abferm/goflow/_test/_testmain.go:88 +0x20f
    ==================
    
    opened by abferm 4
  • Component with multiple inputs not stopped

    Component with multiple inputs not stopped

    I discovered that a component with 2 (or more) input channels is not stopped properly, even though all input channels are closed.

    I also created a test in this gist.

    In the test I create a simple net:

    ------
    | C1 | \    ------
    ------  \-> |    |
                | C2 |
    ------  /-> |    |
    | C1 | /    ------
    ------
    

    After closing both In ports I would expect that the whole net will be stopped, but the C2 component actually keeps on running.

    Am I doing something wrong, or is this a bug?

    opened by lovromazgon 4
  • How to wait for two inputs?

    How to wait for two inputs?

    Hello Vlad!

    Great to see work going on, on GoFlow! :+1:

    I started looking at it again for some recent use cases, but had one question: How would I implement a component that waits for two inputs, before it does its job? The default way seems to be to implement a On<InportName>() function, but so I would need something that does On<Inport1><Inport2>() :) ... but I'm sure there is a better way for this?

    Best // Samuel

    discussion 
    opened by samuell 4
  • Manipulate a network in running

    Manipulate a network in running

    @trustmaster

    Hello, I would like to know how to dynamically manipulate a network in running.

    I tested with a simple code below. However inserting a new process into the network already initialized doesn't change after the network running. I am not sure whether this is a nature of design for flow based programming.

    In addition, I would like to hear whether implementing such dynamic manipulating a network is meaningful or not.

    type SrcTest struct {
    	Out chan<- byte
    }
    
    type FilterTest1 struct {
    	In <-chan byte
    	Out chan<- byte
    }
    
    type FilterTest2 struct {
    	In <-chan byte
    	Out chan<- byte
    }
    
    type SinkTest struct {
    	In <-chan byte
    }
    
    func (s *SrcTest) Process() {
    	for {
    		s.Out <- 'a'
    		time.Sleep(100 * time.Millisecond)
    	}
    }
    
    func (f *FilterTest1) Process() {
    	for in := range f.In {
    		_ = in
    		f.Out <- 'b'
    	}
    }
    
    func (f *FilterTest2) Process() {
    	for in := range f.In {
    		_ = in
    		log.Infof("f2 f2")
    		f.Out <- 'c'
    	}
    }
    
    func (s *SinkTest) Process() {
    	for in := range s.In {
    		log.Infof("recv: %c", in)
    	}
    }
    
    func TestStreamData(t *testing.T) {
    	n := goflow.NewGraph()
    	n.Add("src", new(SrcTest))
    	n.Add("f1", new(FilterTest1))
    	n.Add("sink", new(SinkTest))
    	n.Connect("src", "Out", "f1", "In")
    	n.Connect("f1", "Out", "sink", "In")
    
    	wait := goflow.Run(n)
    	go func() {
    		time.Sleep(2 * time.Second)
    		log.Infof("insert f2")
    
    
    		n.Add("f2", new(FilterTest2))
    		//n.Connect("f1", "Out", "f2", "In")
    		n.Connect("src", "Out", "f2", "In")
    		n.Connect("f2", "Out", "sink", "In")
    
    	}()
    
    	<-wait
    }
    
    opened by brucekim 0
  • Closing Component channels that are not explicitly marked as ports

    Closing Component channels that are not explicitly marked as ports

    Somewhere under the hood, it seems as though goflow is calling close on any node in its graph if that node has a Out channel field, even if that port is not connected in the graph. For example,

    package main
    
    import (
    	"log"
    
    	"github.com/trustmaster/goflow"
    )
    
    type Receiver struct {
    	name string
    	In   <-chan []byte
    	Out  chan<- []byte
    }
    
    func NewReceiver(name string) *Receiver {
    	return &Receiver{name: name}
    }
    
    func (db *Receiver) Process() {
    	for in := range db.In {
    		log.Printf("Processing Receiver for node %s", db.name)
    		db.Out <- in
    	}
    }
    
    type DataBuffer struct {
    	name string
    	In   <-chan []byte
    	Out  chan<- []byte
    }
    
    func NewDataBuffer(name string) *DataBuffer {
    	return &DataBuffer{name: name}
    }
    
    func (db *DataBuffer) Process() {
    	for in := range db.In {
    		log.Printf("Processing for node %s: %v", db.name, in)
    	}
    }
    
    
    func main() {
    	g := goflow.NewGraph()
    
    	g.Add("greeter", new(Receiver))
    	g.Add("printer", new(DataBuffer))
    
    	g.Connect("greeter", "Out", "printer", "In")
    
    	g.MapInPort("In", "greeter", "In")
    
    	in := make(chan []byte)
    
    	g.SetInPort("In", in)
    
    	wait := goflow.Run(g)
    
    	in <- []byte{1, 0}
    
    	close(in)
    
    	<-wait
    }
    

    outputs

    2009/11/10 23:00:00 Processing Receiver for node 
    2009/11/10 23:00:00 Processing for node : [1 0]
    panic: close of nil channel
    

    ~Changing the name of DataBuffer.Out to DataBuffer.LeaveMeAlone~ Making the field private (or removing it) fixes the issue.

    I'm new to goflow (looks neat!) so maybe this is intended behaviour. From a newbie perspective it is unexpected.

    opened by notmgsk 0
  • Added plugins feature that allows separate binaries as processes

    Added plugins feature that allows separate binaries as processes

    Used the loader.go which was not compiled, I had to brutalize it a bit, to get it to work with plugins. Need a conversation about what the intention was of some of the non-working code in it.

    opened by dahvid 2
  • v1 Design proposal

    v1 Design proposal

    7 years after starting this project, after writing and maintaining some FBP apps in production, after collecting feedback on this project (#48), it's time to propose the design for v1.0 of GoFlow library. The main goal is to make the library itself less complicated and make effort from zero to a real app minimal.

    Steps

    • [x] Updated basic Components and Graphs in pure Go
    • [x] Updated component Registry and Factory
    • [ ] FBP DSL parser and loader written in GoFlow

    Components

    First, to reduce complexity, the following features are removed:

    • Sync/Async mode
    • Worker pool
    • State locks
    • Lifecycle management (initializer/shutdowner)

    It is still be possible to implement all that functionality in the components themselves in pure Go code.

    Structure

    The structure definition of the components will be simplified:

    type ExampleComponent struct {
    	Foo <-chan int  // inport
    	Bar chan<- int  // outport
    }
    

    Process

    Reactive port handlers are removed in favour of more generic Process() function, defining the whole behaviour of the component.

    So, long-running ("looper") components can be implemented like this:

    func (c *ExampleComponent) Process() {
    	for f := range c.Foo {
    		// Do something
    		c.Bar <- data
    	}
    }
    

    A short-running ("non-looper") process would just return immediately after reading the input:

    func (c *ExampleComponent) Process() {
    	f := <-c.Foo
    	// Do something
    	c.Bar <- data
    }
    

    Graphs

    API

    The API for working with Graphs remains the same as before.

    Underlying code is to be refactored, but no dramatic changes are expected. Yes, it will still use reflection to connect nodes in the network - because that brings a lot of convenience and run-time capabilities and only affects performance upon application start.

    FBP files

    One of the goals for v1 is to use FBP DSL as primary way to define graphs. So, instead of many lines of Go code, you could write something like this:

    INPORT=Server.Port:Port
    OUTPORT=Logger.Output:Log
    
    Server(http/Server) Req -> Req Handler(app/Handler) Err -> Logger(log/Logger)
    Middleware(http/Middleware) Funcs -> Middleware Server
    

    Support for FBP files is built into runtime, so it can be called like:

    n, err := flow.LoadGraph("app/main.fbp")
    
    port := make(chan int)
    n.SetInPort("Port", port)
    
    wait := flow.Run(n)
    
    port <- 8000
    

    Runtime

    For v1.0, the runtime should be powerful enough just to load and run .fbp graphs.

    Support for FBP Protocol and Flowhub is to be added later. Though, existing work such as compatibility with JSON graph format will be kept to make future integration easier.

    discussion 
    opened by trustmaster 0
  • Feedback needed

    Feedback needed

    Hello fellow Gophers!

    I apologise as this project slipped out of my scope for several years.

    I still have some ideas and plans of maintaining it, but I need some feedback on how it is going to be used by people who actually tried it. So, I would really appreciate your answers to the following questions in this thread or in any other form (e.g. via email, please find the address in my profile).

    Questions:

    1. What kind of application did you use GoFlow for? (E.g. bioinformatics, ETL, web backend, IoT, etc.)
    2. Did you use it for personal, business, or academic purposes?
    3. Do you prefer working with graphs in visual or text form?
    4. Which visual tools have you used and which ones do you prefer?
    5. Do you prefer a Component to have main Loop(), or do you prefer setting up handler functions on each port (OnPortName())?
    6. Do you prefer processes to stay resident in memory or started and stopped on demand?
    7. Have you ever used State Locks, Synchronous Mode, Worker Pool, or other advanced features of GoFlow?
    8. Please tell me what you liked about GoFlow and what you would like to be added or changed.

    Why this is important

    As you might have noticed, this codebase is a bit dated. In fact, it was written in 2011 and didn't change much ever since. My own views on how an FBP library should work have changed over time. So, I think this library deserves a rewrite.

    My views can be similar or different from yours, while I'm not building this library only for myself. That's why feedback is appreciated so much.

    Thank you for participating!

    discussion 
    opened by trustmaster 9
Owner
Vladimir Sibirov
Tech lead, teamwork amplifier, flow-based programming ambassador. Pushing open source code since 2002.
Vladimir Sibirov
Examples on different options for implementing Flow Based Programming

Flow Based Programming This repository contains fragments and ideas related to Flow Based Programming. It shows different ways of implementing differe

Egon Elbre 10 Sep 22, 2022
A simulated-annealing approach to solving a max-flow removal problem

RESISTANCE IS FUTILE How to run: Install the latest version of golang to your computer (1.16?) Run a postgres instance on your computer attatched to p

Anthony Ebiner 10 Aug 26, 2022
A general-purpose Cadence contract for trading NFTs on Flow

NFT Storefront The NFT storefront is a general-purpose Cadence contract for trading NFTs on Flow. NFTStorefront uses modern Cadence run-time type faci

Flow 98 Dec 24, 2022
Advent of Code is an Advent calendar of small programming puzzles for a variety of skill sets and skill levels that can be solved in any programming language you like.

Advent of Code 2021 Advent of Code is an Advent calendar of small programming puzzles for a variety of skill sets and skill levels that can be solved

Kemal Ogun Isik 0 Dec 2, 2021
Zach Howell 0 Jan 4, 2022
Gec is a minimal stack-based programming language

Gec is a minimal stack-based programming language

aiocat 2 Sep 18, 2022
Functional programming library for Go including a lazy list implementation and some of the most usual functions.

functional A functional programming library including a lazy list implementation and some of the most usual functions. import FP "github.com/tcard/fun

Toni Cárdenas 31 May 21, 2022
FreeSWITCH Event Socket library for the Go programming language.

eventsocket FreeSWITCH Event Socket library for the Go programming language. It supports both inbound and outbound event socket connections, acting ei

Alexandre Fiori 110 Dec 11, 2022
A library for parallel programming in Go

pargo A library for parallel programming in Go Package pargo provides functions and data structures for expressing parallel algorithms. While Go is pr

null 180 Nov 28, 2022
Go library for hardware I/O control, in the programming style of Arduino

hwio Introduction hwio is a Go library for interfacing with hardware I/O, particularly on SoC-based boards such as BeagleBone Black, Raspberry Pi and

Mark Stephens 324 Dec 9, 2022
A modern programming language written in Golang.

MangoScript A modern programming language written in Golang. Here is what I want MangoScript to look like: struct Rectangle { width: number he

PlebusSupremus1234 3 Nov 12, 2021
Functional Programming support for golang.(Streaming API)

Funtional Api for Golang Functional Programming support for golang.(Streaming API) The package can only be used with go 1.18. Do not try in lower vers

Tobias Yin 0 Dec 8, 2021
Lithia is an experimental functional programming language with an implicit but strong and dynamic type system.

Lithia is an experimental functional programming language with an implicit but strong and dynamic type system. Lithia is designed around a few core concepts in mind all language features contribute to.

Valentin Knabel 9 Dec 24, 2022
A stack oriented esoteric programming language inspired by poetry and forth

paperStack A stack oriented esoteric programming language inspired by poetry and forth What is paperStack A stack oriented language An esoteric progra

null 0 Nov 14, 2021
Unit tests generator for Go programming language

GoUnit GoUnit is a commandline tool that generates tests stubs based on source function or method signature. There are plugins for Vim Emacs Atom Subl

Max Chechel 66 Jan 1, 2023
Simple interface to libmagic for Go Programming Language

File Magic in Go Introduction Provides simple interface to libmagic for Go Programming Language. Table of Contents Contributing Versioning Author Copy

Krzysztof Wilczyński 12 Dec 22, 2021
The Gorilla Programming Language

Gorilla Programming Language Gorilla is a tiny, dynamically typed, multi-engine programming language It has flexible syntax, a compiler, as well as an

null 29 Apr 16, 2022
Elastic is an Elasticsearch client for the Go programming language.

Elastic is an Elasticsearch client for the Go programming language.

Oliver Eilhard 7.1k Jan 9, 2023
👩🏼‍💻A simple compiled programming language

The language is written in Go and the target language is C. The built-in library is written in C too

paco 28 Nov 29, 2022