Glow is an easy-to-use distributed computation system written in Go, similar to Hadoop Map Reduce, Spark, Flink, Storm, etc. I am also working on another similar pure Go system, , which is more flexible and more performant.



Build Status GoDoc


Glow is providing a library to easily compute in parallel threads or distributed to clusters of machines. This is written in pure Go.

I am also working on another pure-Go system, , which is more flexible and more performant.


$ go get
$ go get

One minute tutorial

Simple Start

Here is a simple full example:

package main

import (


func main() {

		"/etc/passwd", 3,
	).Filter(func(line string) bool {
		return !strings.HasPrefix(line, "#")
	}).Map(func(line string, ch chan string) {
		for _, token := range strings.Split(line, ":") {
			ch <- token
	}).Map(func(key string) int {
		return 1
	}).Reduce(func(x int, y int) int {
		return x + y
	}).Map(func(x int) {
		println("count:", x)

Try it.

  $ ./word_count

It will run the input text file, '/etc/passwd', in 3 go routines, filter/map/map, and then reduced to one number in one goroutine (not exactly one goroutine, but let's skip the details for now.) and print it out.

This is useful already, saving lots of idiomatic but repetitive code on channels, sync wait, etc, to fully utilize more CPU cores.

However, there is one more thing! It can run across a Glow cluster, which can be run multiple servers/racks/data centers!

Scale it out

To setup the Glow cluster, we do not need experts on Zookeeper/HDFS/Mesos/YARN etc. Just build or download one binary file.

Setup the cluster

  # Fetch and install via go, or just download it from somewhere.
  $ go get
  # Run a script from the root directory of the repo to start a test cluster.
  $ etc/

Glow Master and Glow Agent run very efficiently. They take about 6.5MB and 5.5MB memory respectively in my environments. I would recommend set up agents on any server you can find. You can tap into the computing power whenever you need to.

Start the driver program

To leap from one computer to clusters of computers, add this line to the import list:

	_ ""

And put this line as the first statement in the main() function:


This will "steroidize" the code to run in cluster mode!

$ ./word_count -glow -glow.leader="localhost:8930"

The word_count program will become a driver program, dividing the execution into a directed acyclic graph(DAG), and send tasks to agents.

Visualize the flow

To understand how each executor works, you can visualize the flow by generating a dot file of the flow, and render it to png file via "dot" command provided from graphviz.

$ ./word_count -glow -glow.flow.plot >
$ dot -Tpng -otestSelfJoin.png

Glow Hello World Execution Plan

Read More

  1. Wiki page:
  2. Mailing list:!forum/glow-user-discussion
  3. Examples:

Docker container

Docker is not required. But if you like docker, here are instructions.

# Cross compile artefact for docker
$ GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build .
# build container
$ docker build -t glow .

See examples/ directory for docker-compose setups.


Start using it! And report or fix any issue you have seen, add any feature you want.

Fork it, code it, and send pull requests. Better first discuss about the feature you want on the mailing list.!forum/glow-user-discussion


  • Read size invalid argument - expected data input?

    Read size invalid argument - expected data input?

    I have Glow running on 1 machine just fine, but when trying to simulate the glow cluster system on my local machine, via: glow master --address glow agent --dir="/Users/andrew/Desktop/GlowFolder" --port=8931 --master="" --memory=4096 --clean.restart --cpu.level=4

    And start the app via: myapp -glow -glow.leader=""

    1. If I don't have my executable in the Desktop/GlowFolder, i get an issue saying Failed to start command ./myapp under /Users/andrewt/Desktop/GlowFolder: fork/exec ./myapp: no such file or directory I thought the --dir flag was just for temp documents, do I need to copy the app binary to that folder as well?

    2. Read size: If I run from a folder containing myapp's binary, then I can run, but the glow agent outputs the following error: 2017/03/21 09:41:24 Read size from -ct-0-ds-0-shard-4 offset 1054782852: read /Users/andrew/Desktop/GlowFolder/-ct-0-ds-0-shard-4-8931.dat: invalid argument How is read size determined and expected?

    opened by andrewrt 7
  • centos6.4 word count in Distributed Mode  run error

    centos6.4 word count in Distributed Mode run error

    ////////////////////////////Distributed Mode Conf//////////////////////////// ./glow master ./glow agent --dir data --max.executors=16 --memory=2048 --master="localhost:8930" --port 8931 ./glow agent --dir data1 --max.executors=16 --memory=2048 --master="localhost:8930" --port 8932 go run word_count.go -glow -glow.leader="localhost:8930" -glow.related.files="passwd"

    ///////////////////////////////word_count.go///////////////////////////////////////// `package main

    import ( "flag" "fmt" "strings" "strconv" "time" "sync" "encoding/gob" _ "" "" )

    type WordCountResult struct { Addr string Info MemInfo }

    type MemInfo struct { Addr string Size int Count int }

    func init() { gob.Register(MemInfo{}) }

    func goStart(wg *sync.WaitGroup, fn func()) { wg.Add(1) go func() { defer wg.Done() fn() }() }

    func testWordCount1() {

    flowOut1 := make(chan WordCountResult)  
    f1       := flow.New()  
        "passwd", 2,
    ).Map(func(line string, ch chan MemInfo) {
        words:=strings.Split(line, ":")
        if s, err := strconv.ParseInt(words[1], 16, 0); err == nil {
            ch <- MemInfo{words[0], int(s), 1}
    }).Map(func(ws MemInfo) (string, MemInfo) {
        return ws.Addr, ws
    }).ReduceByKey(func(x MemInfo, y MemInfo) (MemInfo) {
        return MemInfo{x.Addr,x.Size+y.Size,x.Count+y.Count}
    startTime := time.Now().UnixNano()
    var wg sync.WaitGroup
    goStart(&wg, func() {
    goStart(&wg, func() {
        for t := range flowOut1 {
            fmt.Printf("%s size:%-8d count:%-8d\n",
    endTime := time.Now().UnixNano()
    fmt.Printf("UseTime:%d\n",(endTime - startTime) / 1000000)


    func main() { flag.Parse() testWordCount1() }`

    /////////////////////////////////console output/////////////////////// [::1]:8931>2016/02/28 10:34:06 receive error:read tcp [::1]:46225->[::1]:8931: read: connection reset by peer

    /////////////////////////////////passwd/////////////////////////////////// 0x001b8aa0:00000012 0x001b8aa0:00000012 0x001b8aa0:00000020 0x001b8aa0:00000012 0x001b8aa0:00000400 0x001b8aa0:00000096 0x001b8aa0:00000064 0x001b8aa0:00000012 0x001b8aa0:00000020 0x001b8aa0:00000008 0x001b8aa0:00000012 0x001b8aa0:00000020 0x001b8aa0:00000016 0x001b8aa0:00000012 0x001b8aa0:00000020 0x001b8aa0:00000016 0x001b8aa0:00000021 0x76fb9640:00000008 0x001b8aa0:00000020 0x001b8aa0:00000020 0x001b8aa0:00000020 0x001b8aa0:00000032 0x001b8aa0:00000016 0x001b8aa0:00000012 0x001b8aa0:00000020 0x001b8ab8:00000512 0x76fb9640:00000008 0x76fb9640:00000008 0x76fb9640:00000008 0x76fb9640:00000008 0x7688b540:00000057 0x76fb9640:00000008 .......

    opened by adeagle 7
  • Windows: undefined: syscall.SIGINFO (reopened)

    Windows: undefined: syscall.SIGINFO (reopened)


    I'm reopening this issue as glow/flow didn't install properly this time neither. A regression might be introduced.

    As glow was installed (first step go get was successful), I tried installing only flow ( go get ), but this produced the same result as before.

    Then, I manually uninstalled glow and reinstalled it with success. But this time, the second step yields:

    > go get
    C:\Go\src\\chrislusf\glow\flow\signal_handling.go:21: undefined: syscall.SIGINFO
    C:\Go\src\\chrislusf\glow\flow\signal_handling.go:29: undefined: syscall.SIGINFO
    C:\Go\src\\chrislusf\glow\flow\signal_handling_windows.go:11: OnInterrupt redeclared in this block
    C:\Go\src\\chrislusf\glow\flow\signal_handling_windows.go:26: OnInterrupt.func1 redeclared in this block
    previous declaration at C:\Go\src\\chrislusf\glow\flow\signal_handling.go:26
    previous declaration at C:\Go\src\\chrislusf\glow\flow\signal_handling.go:11
    C:\Go\src\\chrislusf\glow\flow\signal_handling_windows.go:29: undefined: syscall.SIGINFO

    Hope this is useful.

    opened by jgranduel 7
  • managing clusters and all that

    managing clusters and all that

    just use consol

    it incorporates allot of what you need and is written in go. i use it for micro services, and for this project its great because it wil tel you the appress of all the things you depend on too. Like the file servr or mongodb or any other data source or sink.

    opened by joeblew99 7
  • Make test compile

    Make test compile

    Minor changes in test code to make them compile and green. data_store_test:TestReadOffsets is deactivated as it is constantly failing with deadlocks. I don't think the test setup is correct. Please correct me if I'm wrong but I couldn't find anything writing to the store for the test.

    opened by alpe 5
  • Is there a means of teeing the flow?

    Is there a means of teeing the flow?

    This isn't a great example but I'm looking for a way to do something like this:

    stream := flow.New().Source(source, parts).Map(mapper)
    flow_a, flow_b := stream.Tee() // <-- this is what i really want
    go flow_a.Filter(a_filter).

    basically, I want to be able to filter the same stream multiple ways without iterating the stream multiple times.

    opened by nicerobot 3
  • Consider reduce the number of Travis CI builds

    Consider reduce the number of Travis CI builds

    Right now, the Travis CI builds for 12 environments, based a combination of OS and Go versions. The config shows:


    • linux
    • osx
    • windows
    • freebsd


    • 1.5
    • 1.6
    • tip

    Maybe we can reduce the build combinations to only for Linux, and only build on the previous major go version + tip; that reduces the builds from 12 to 2.

    The benefit is that the build time will be reduce dramatically. The down side is that we lose the testing coverage on some OSes. IMHO, this trade-off favors reduced test time; mainly because the extra OSes does not seem have much presence for Glow's intended use cases, i.e., parallel batch processing.


    opened by justicezyx 3
  • TeamMaster#findServers creates incorrect copy of the ComputeRequests

    TeamMaster#findServers creates incorrect copy of the ComputeRequests

    opened by radike 3
  • [WIP] Dockerize mongodb source example

    [WIP] Dockerize mongodb source example

    Provides docker-compose example as quick start to use the example.

    Don't merge, yet.


    • [ ] Seed mongoDB with test data

    @chrislusf can you provide me the example data?

    opened by alpe 3
  • http server address and port should be configurable

    http server address and port should be configurable

    In is a random port selected which makes it very hard when access is restricted via firewalls or in a container environment where ports are exposed. The remote address which is used with the heartbeat can be a local one. Then it's impossible for any client outside to connect. I had these problems when I tried to connect to a local docker cluster from the host system.

    opened by alpe 3
  • Installation steps

    Installation steps

    There's no "go get" command to show how to install, but I guessed with:

    go get -u

    which returned no errors, but when I did a go run on the first sample in, I get this:

    $ go run glow1.go 
    ../src/ cannot find package "" in any of:
        /usr/local/go/src/ (from $GOROOT)
        /home/cecil/Workspace/src/ (from $GOPATH)

    Looks like there may be dependencies? Could you update the README to provide install instructions?

    opened by mandolyte 3
  • glow run block when read big file data to mysql

    glow run block when read big file data to mysql

    flow.New().Source(func(chVipDataGlow chan [][]string) {
    	defer close(chVipDataGlow)
    	file, err := os.Open(fileName)
    	fileInfo, err := os.Stat(fileName)
    	if err != nil || fileInfo.Size() == 0 {
    	defer func() {
    	if err != nil {
    		log.Info(cc).Msgf("企业%d, 打开SFTP文件失败: %v 路径是:%s", enterpriseId, err, fileName)
    	buf := bufio.NewReader(file)
    	num := 2000
    	// var tmp = make([][]string, 0, num
    	var tmp [][]string
    	var i = 0
    	for {
    		b, err := buf.ReadString('\n')
    		if err != nil && err != io.EOF {
    			log.Info(cc).Msgf("数据读取报错了%+v\n", err)
    		cell := strings.Split(b, ",")
    		if i == 1 && cell[0] == "数据更新时间" {
    		if len(cell) <= 1 {
    			fileCountNumbers += 1
    		tmp = append(tmp, cell)
    		if len(tmp) == num {
    			fileCountNumbers += num
    			chVipDataGlow <- tmp
    			tmp = [][]string{}
    		if err != nil || err == io.EOF {
    			log.Info(cc).Msgf("数据读完了或者报错了%+v", err)
    	if len(tmp) > 0 {
    		fileCountNumbers += len(tmp)
    		chVipDataGlow <- tmp
    		tmp = [][]string{}
    }, 10).Map(func(params [][]string) [][]string {
    	defer func() {
    		if errs := recover(); errs != nil {
    			log.Error(cc).Msgf("数据库插入执行异常了%+v", errs)
    			errDefer = append(errDefer, params...)
    	if len(params) > 0 {
    		sucn, ern, _, errData := c.readDataToTableOneByOne(cc, params, enterpriseId)
    		errNums += ern
    		successNums += sucn
    		return errData
    	return params
    }).Map(func(errItems [][]string) {

    11 2 3

    opened by huanmengerkong 2
  • Has it been used in the commercial production environment so far?

    Has it been used in the commercial production environment so far?

    I am interesting in this project. The project is surprising. It turns out that golang has a similar implementation similar to Hadoop and Flink. Has been used in the commercial production environment so far? Is there a company using supporting and developing it? As we known,Hadoop and Flink are very mature. Do I want to know that the Glow can provide the same stable function? Or this project is a toy project based your interest?

    opened by CodingByteFly 0
  • Doing partial reduceByKey in Flow created in func init()

    Doing partial reduceByKey in Flow created in func init()


    I apologize if I have missed something obvious, but I am using glow to map and reduce time series. I would like to do a reduce or reduceByKey on every time slices (for instance, reduceByKey on all events received in the last minute.

    Right now, I am setting the code up to be distributed and, following the tutorial, have put my flows in the func init() section so that they are "statically -- (instantiated only once the same way)" on every nodes.

    The data is coming from an unlimited stream (i.e, not from a bounded file). So I have something like this:

    func init() {
                    // ... some more maps and filters
    // letsDoIt uses mapRecordsToMetadata to map and reduces all events for a given key during a time slice
    func letsDoIt(streamedEvents chan []string) chan GroupedEventsByKeyChan{
      out := make (chan GroupedEventsByKeyChan)
      go func() {
          for evt := range streamedEvents {
              mapRecordsToMetadataInput <- evt
      go func() {
          for evt := range mapRecordsToMetadataOutput {
              out <- evt
     return out

    I have simplified a bit, but hopefully this is enough to get the idea. Now, reduceByKey is blocking until I close mapRecordsToMetadataInput input channel (makes sense). However, if I do this, I can't really use my flow mapRecordsToMetadata anymore (is there a way to replace the input channel and restart it?).

    Conceptually, I would "close" my input flow (mapRecordsToMetadataInput every "time slices" where I want the aggregate to run (i.e every 30 seconds) so that my reduceByKey would run on that intervals of inputs.

    My only option seems to make the "map" operations in the init() section (i.e mapRecordsToMetadataInput) and the reduceByKey() operation in a dynamic flow, recreating the dynamic flow every 30 seconds in my case.

    Something like this:

    func init() {
                    // ... some more maps and filters
                    // Removed the Reduce By Key 
    func letsDoIt(streamedEvents chan []string) chan GroupedEventsByKeyChan{
      out := make (chan GroupedEventsByKeyChan)
      go func() {
          for evt := range streamedEvents {
              mapRecordsToMetadataInput <- evt
      go func() {
          nextInterval := time.Now().Add(30 * time.SECOND)
          for {
             reduceFlow := flow.New()
             reduceInChan := make(chan EventsByKeychan)
             for evt := range mapRecordsToMetadataOutput {
                reduceInChan  <- evt
                if (evt.Time.After(nextInterval) {
                    //flush and reduce for that interval
                    nextInterval := nextInterval.Add(30 * time.SECOND)
     return out

    Is this the "right" canonical way to proceed? Does that scale? Or are we missing a small feature that would allow to "flush" our static flows at fixed intervals or on demand so that we can operate on streaming use cases in a more streamline fashion?

    opened by jdelamar 3
  • Fix the timing out flakiness revealed in dataset_map_test.go

    Fix the timing out flakiness revealed in dataset_map_test.go

    Frequent test timing out happened in the Travis CI. The error message usually looks like:

    cogroup testing start...
    panic: test timed out after 1m30s

    Find the root cause and eliminate the flakiness.

    opened by justicezyx 2
  • Add unit tests for moderately complex APIs across the code base

    Add unit tests for moderately complex APIs across the code base

    The lack of unit tests greatly slows down the pace to understand, discuss, and modify the code. I think the lack of unit tests blocks my work of integrating Glow with Kubernets: I have little idea of how code suppose to work, and there is virtually no check on the correctness of changes. It also greatly increase the effort to understand the code base.

    I think it makes sense to add unit tests to cover the relatively obvious/common use cases, which should be able to finish quickly and brings a lot of benefits.

    I plan to go through the code base, and add test along the way. Hopefully it also accelerate my rate of ramping up with the code base.

    Any thoughts, comments, or objections?

    opened by justicezyx 8
Chris Lu SeaweedFS the distributed file system and object store for billions of small files ...
Chris Lu
*DEPRECATED* Please use (

Redsync.go This package is being replaced with I will continue to maintain this package for a while so that its users do

Mahmud Ridwan 302 Nov 20, 2022
Distributed lock manager. Warning: very hard to use it properly. Not because it's broken, but because distributed systems are hard. If in doubt, do not use this.

What Dlock is a distributed lock manager [1]. It is designed after flock utility but for multiple machines. When client disconnects, all his locks are

Sergey Shepelev 25 Dec 24, 2019
Dkron - Distributed, fault tolerant job scheduling system

Dkron - Distributed, fault tolerant job scheduling system for cloud native environments Website: Dkron is a distributed cron service,

Distributed Works 3.4k Dec 28, 2022
Distributed-Services - Distributed Systems with Golang to consequently build a fully-fletched distributed service

Distributed-Services This project is essentially a result of my attempt to under

Hamza Yusuff 6 Jun 1, 2022
Easy to use Raft library to make your app distributed, highly available and fault-tolerant

An easy to use customizable library to make your Go application Distributed, Highly available, Fault Tolerant etc... using Hashicorp's Raft library wh

Richard Bertok 60 Nov 16, 2022
Ultra performant API Gateway with middlewares

The KrakenD framework An open framework to assemble ultra performance API Gateways with middlewares; core service of the KrakenD API Gateway. Looking

Devops Faith - Open source for DevOps 5.4k Dec 29, 2022
Distributed reliable key-value store for the most critical data of a distributed system

etcd Note: The main branch may be in an unstable or even broken state during development. For stable versions, see releases. etcd is a distributed rel

etcd-io 42.2k Dec 30, 2022
💡 A Distributed and High-Performance Monitoring System. The next generation of Open-Falcon

夜莺简介 夜莺是一套分布式高可用的运维监控系统,最大的特点是混合云支持,既可以支持传统物理机虚拟机的场景,也可以支持K8S容器的场景。同时,夜莺也不只是监控,还有一部分CMDB的能力、自动化运维的能力,很多公司都基于夜莺开发自己公司的运维平台。开源的这部分功能模块也是商业版本的一部分,所以可靠性有保

DiDi 5.7k Jan 5, 2023
A distributed and coördination-free log management system

OK Log is archived I hoped to find the opportunity to continue developing OK Log after the spike of its creation. Unfortunately, despite effort, no su

OK Log 3k Dec 26, 2022
JuiceFS is a distributed POSIX file system built on top of Redis and S3.

JuiceFS is a high-performance POSIX file system released under GNU Affero General Public License v3.0. It is specially optimized for the cloud-native

Juicedata, Inc 7.2k Jan 4, 2023
Distributed-system - Practicing and learning the foundations of DS with Go

Distributed-System For practicing and learning the foundations of distributed sy

Ian Armstrong 1 May 4, 2022
BlobStore is a highly reliable,highly available and ultra-large scale distributed storage system

BlobStore Overview Documents Build BlobStore Deploy BlobStore Manage BlobStore License Overview BlobStore is a highly reliable,highly available and ul

CubeFS 14 Oct 10, 2022
A distributed system for embedding-based retrieval

Overview Vearch is a scalable distributed system for efficient similarity search of deep learning vectors. Architecture Data Model space, documents, v

vector search infrastructure for AI applications 1.5k Dec 30, 2022
a dynamic configuration framework used in distributed system

go-archaius This is a light weight configuration management framework which helps to manage configurations in distributed system The main objective of

null 205 Dec 9, 2022
Verifiable credential system on Cosmos with IBC for Distributed Identities

CertX This is a project designed to demonstrate the use of IBC between different zones in the Cosmos ecosystem for privacy preserving credential manag

bwty 6 Mar 29, 2022
A distributed MySQL binlog storage system built on Raft

What is kingbus? 中文 Kingbus is a distributed MySQL binlog store based on raft. Kingbus can act as a slave to the real master and as a master to the sl

Fei Chen 856 Dec 31, 2022
A distributed key-value storage system developed by Alibaba Group

Product Overview Tair is fast-access memory (MDB)/persistent (LDB) storage service. Using a high-performance and high-availability distributed cluster

Alibaba 2k Dec 31, 2022