Minimalistic and High-performance goroutine worker pool written in Go

Overview

pond

Minimalistic and High-performance goroutine worker pool written in Go

Motivation

This library is meant to provide a simple way to limit concurrency when executing some function over a limited resource or service.

Some common scenarios include:

  • Executing queries against a Database with a limited no. of connections
  • Sending HTTP requests to a a rate/concurrency limited API

Features:

  • Zero dependencies
  • Create pools with fixed or dynamic size
  • Worker goroutines are only created when needed (backpressure detection) and automatically purged after being idle for some time (configurable)
  • Minimalistic APIs for:
    • Creating worker pools with fixed or dynamic size
    • Submitting tasks to a pool in a fire-and-forget fashion
    • Submitting tasks to a pool and waiting for them to complete
    • Submitting tasks to a pool with a deadline
    • Submitting a group of related tasks and waiting for them to complete
    • Getting the number of running workers (goroutines)
    • Stopping a worker pool
  • Task panics are handled gracefully (configurable panic handler)
  • Supports Non-blocking and Blocking task submission modes (buffered / unbuffered)
  • Very high performance and efficient resource usage under heavy workloads, even outperforming unbounded goroutines in some scenarios (See benchmarks)
  • New (since v1.3.0): configurable pool resizing strategy, with 3 presets for common scenarios: Eager, Balanced and Lazy.
  • New (since v1.5.0): complete pool metrics such as number of running workers, tasks waiting in the queue and more.
  • API reference

How to install

go get -u github.com/alitto/pond

How to use

Worker pool with dynamic size

package main

import (
	"fmt"

	"github.com/alitto/pond"
)

func main() {

	// Create a buffered (non-blocking) pool that can scale up to 100 workers
	// and has a buffer capacity of 1000 tasks
	pool := pond.New(100, 1000)

	// Submit 1000 tasks
	for i := 0; i < 1000; i++ {
		n := i
		pool.Submit(func() {
			fmt.Printf("Running task #%d\n", n)
		})
	}

	// Stop the pool and wait for all submitted tasks to complete
	pool.StopAndWait()
}

Worker pool with fixed size

package main

import (
	"fmt"

	"github.com/alitto/pond"
)

func main() {

	// Create an unbuffered (blocking) pool with a fixed 
	// number of workers
	pool := pond.New(10, 0, pond.MinWorkers(10))

	// Submit 1000 tasks
	for i := 0; i < 1000; i++ {
		n := i
		pool.Submit(func() {
			fmt.Printf("Running task #%d\n", n)
		})
	}

	// Stop the pool and wait for all submitted tasks to complete
	pool.StopAndWait()
}

Submitting groups of related tasks

package main

import (
	"fmt"

	"github.com/alitto/pond"
)

func main() {

	// Create a pool
	pool := pond.New(10, 1000)
	defer pool.StopAndWait()

	// Create a task group
	group := pool.Group()

	// Submit a group of related tasks
	for i := 0; i < 20; i++ {
		n := i
		group.Submit(func() {
			fmt.Printf("Running group task #%d\n", n)
		})
	}

	// Wait for all tasks in the group to complete
	group.Wait()
}

Pool Configuration Options

  • MinWorkers: Specifies the minimum number of worker goroutines that must be running at any given time. These goroutines are started when the pool is created. The default value is 0. Example:
// This will create a pool with 5 running worker goroutines 
pool := pond.New(10, 1000, pond.MinWorkers(5))
  • IdleTimeout: Defines how long to wait before removing idle worker goroutines from the pool. The default value is 5 seconds. Example:
// This will create a pool that will remove workers 100ms after they become idle 
pool := pond.New(10, 1000, pond.IdleTimeout(100 * time.Millisecond))
  • PanicHandler: Allows to configure a custom function to handle panics thrown by tasks submitted to the pool. The default handler just writes a message to standard output using fmt.Printf with the following contents: Worker exits from a panic: [panic] \n Stack trace: [stack trace]). Example:
// Custom panic handler function
panicHandler := func(p interface{}) {
	fmt.Printf("Task panicked: %v", p)
}

// This will create a pool that will handle panics using a custom panic handler
pool := pond.New(10, 1000, pond.PanicHandler(panicHandler)))
  • Strategy: Configures the strategy used to resize the pool when backpressure is detected. You can create a custom strategy by implementing the pond.ResizingStrategy interface or choose one of the 3 presets:
    • Eager: maximizes responsiveness at the expense of higher resource usage, which can reduce throughput under certain conditions. This strategy is meant for worker pools that will operate at a small percentage of their capacity most of the time and may occasionally receive bursts of tasks. This is the default strategy.
    • Balanced: tries to find a balance between responsiveness and throughput. It's suitable for general purpose worker pools or those that will operate close to 50% of their capacity most of the time.
    • Lazy: maximizes throughput at the expense of responsiveness. This strategy is meant for worker pools that will operate close to their max. capacity most of the time.
// Example: create pools with different resizing strategies
eagerPool := pond.New(10, 1000, pond.Strategy(pond.Eager()))
balancedPool := pond.New(10, 1000, pond.Strategy(pond.Balanced()))
lazyPool := pond.New(10, 1000, pond.Strategy(pond.Lazy()))

Resizing strategies

The following chart illustrates the behaviour of the different pool resizing strategies as the number of submitted tasks increases. Each line represents the number of worker goroutines in the pool (pool size) and the x-axis reflects the number of submitted tasks (cumulative).

Pool resizing strategies behaviour

As the name suggests, the "Eager" strategy always spawns an extra worker when there are no idles, which causes the pool to grow almost linearly with the number of submitted tasks. On the other end, the "Lazy" strategy creates one worker every N submitted tasks, where N is the maximum number of available CPUs (GOMAXPROCS). The "Balanced" strategy represents a middle ground between the previous two because it creates a worker every N/2 submitted tasks.

Metrics & monitoring

Each worker pool instance exposes useful metrics that can be queried through the following methods:

  • pool.RunningWorkers() int: Current number of running workers
  • pool.IdleWorkers() int: Current number of idle workers
  • pool.MinWorkers() int: Minimum number of worker goroutines
  • pool.MaxWorkers() int: Maxmimum number of worker goroutines
  • pool.MaxCapacity() int: Maximum number of tasks that can be waiting in the queue at any given time (queue capacity)
  • pool.SubmittedTasks() uint64: Total number of tasks submitted since the pool was created
  • pool.WaitingTasks() uint64: Current number of tasks in the queue that are waiting to be executed
  • pool.SuccessfulTasks() uint64: Total number of tasks that have successfully completed their exection since the pool was created
  • pool.FailedTasks() uint64: Total number of tasks that completed with panic since the pool was created
  • pool.CompletedTasks() uint64: Total number of tasks that have completed their exection either successfully or with panic since the pool was created

In our Prometheus example we showcase how to configure collectors for these metrics and expose them to Prometheus.

API Reference

Full API reference is available at https://pkg.go.dev/github.com/alitto/pond

Benchmarks

See Benchmarks.

Resources

Here are some of the resources which have served as inspiration when writing this library:

Contribution & Support

Feel free to send a pull request if you consider there's something which can be improved. Also, please open up an issue if you run into a problem when using this library or just have a question.

Comments
  • add support go 1.17.x

    add support go 1.17.x

    I have tried to download the module using go get URL, but I have not been able to, I suspect that the version of go 1.17.2 that is the one I currently occupy is placing a restriction for something that the repository is missing.

    opened by vay3t 11
  • Strange delay with sleeping tasks

    Strange delay with sleeping tasks

    Hey @alitto,

    I'm trying to execute a simple test in order to grasp how pond works and I'm experiencing a strange delay when submitting 100 tasks which sleep 1 second each:

    package main
    
    import (
    	"fmt"
    	"time"
    
    	"github.com/alitto/pond"
    )
    
    func main() {
    	pool := pond.New(100, 1000)
    	defer pool.StopAndWait()
    
    	for i := 0; i < 100; i++ {
    		n := i
    		pool.Submit(func() {
    			time.Sleep(1 * time.Second)
    			fmt.Printf("Task #%d done\n", n)
    		})
    	}
    }
    

    I was expecting a single, 1 second delay at the end and printout of all "Task # done" at the same time, but I'm seeing tasks executing 1 per second instead. What am I doing wrong? Thanks!

    Ivan

    opened by isavcic 8
  • Exceptions may occur when closing the pool

    Exceptions may occur when closing the pool

    Assuming that the pool is limited in size and is busy, an exception will occur when the pool is closed and the external task is still sending to the pool.

    Test code:

    func TestStop(t *testing.T) {
    	pool := New(1, 4, MinWorkers(1), IdleTimeout(1*time.Second))
    	// Simulate some users sending tasks to the pool
    	go func() {
    		for i := 0; i < 30; i++ {
    			pool.Submit(func() {
    				fmt.Println("do task")
    				time.Sleep(3 * time.Second)
    			})
    		}
    	}()
    	// Suppose the server is shut down after a period of time
    	time.Sleep(5 * time.Second)
    	pool.StopAndWait()
    }
    

    panic: send on closed channel

    opened by zhu121 7
  • Handling Concurrency

    Handling Concurrency

    Dumb question: can tasks be submitted concurrently to a pool or do I need to use a mutex to control access to it?

    I'm adding tasks concurrently. I'm also debugging by printing out the total tasks as well as the waiting tasks (which I call load) and the numbers are all over the place.

    2021/07/06 15:34:46 Events.go:632: Event processing total: 119 load: 0
    2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
    2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
    2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
    2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
    2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 18446744073709551615
    2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 18446744073709551615
    
    opened by git-blame 5
  • Deadlock on zero minimum workers

    Deadlock on zero minimum workers

    I experienced a deadlock when using pond in the following way (https://go.dev/play/p/eJLX1vc3C81).

    workerpool := pond.New(10, 10, pond.Strategy(pond.Eager()))
    
    batchWg := &sync.WaitGroup{}
    batch := []string{"message1", "message2"}
    
    for _, _ = range batch {
    	batchWg.Add(1)
    	workerpool.Submit(func() {
    		batchWg.Done()
    	})
    }
    
    batchWg.Wait()
    

    Minimum workers defaults to 0 and purge can stop idle workers IdleWorkers() > 0. At the same time, workerpool.Submit can add a task but not start a worker because IdleWorkers() > 0. If the purger managed to signal the worker to stop before the newly submitted job is consumed by the worker, the workerpool ends up with a non-empty task channel and no workers to process the tasks.

    Possible solutions:

    1. Increase minimum workers >= 1. This way there will always be an available worker to process pending tasks.
    2. Improve synchronisation between purge and maybeStartWorker to avoid such cases.
    good first issue 
    opened by sermojohn 3
  • Prevent

    Prevent "send on closed channel" panic in purger goroutine

    Changes included

    • Prevent panic thrown in purger goroutine when attempting to stop a worker after tasks channel is closed (reported in https://github.com/alitto/pond/pull/26).
    • Ensure calls to StopAndWait() from multiple goroutines concurrently makes them all wait until all tasks have completed.
    opened by alitto 3
  • Question: What happens to the pool when a worker panic

    Question: What happens to the pool when a worker panic

    Hello,

    I would like to know what happens to the pool when a worker panics. Also what is the best approach to stop the pool when a worker panics.

    Thanks and congrats for an awesome lib

    opened by Tochemey 3
  • does pond support serial queue?

    does pond support serial queue?

    I want a list of tasks to submit to the pool, and want the submitted task to complete one after one, can pond support this scenario?

    for example: taskgroup1 have three tasks: task1, task2, task3

    taskgroup2 have two tasks: task1, task2

    taskgroup1 and taskgroup2 are all submitted to the pool, I want taskgroup1 and taskgroup2 run in concurrency. But in taskgroup1 and taskgroup2, the submitted tasks run one after another. i.e. in taskgroup1 when task1 finished, then task2 will run, and when task2 finished, task3 will run. In taskgroup2 when task1 finished, then task2 will run.

    opened by haifenghuang 3
  • Access channel concurrently

    Access channel concurrently

    When analyzing the code implementation, it is found that adding tasks and removing tasks are multiple goroutines accessing the same channel.

    Some questions: 1.Will there be an upper limit on the number of goroutines(the number of workers)? 2.Will there be a significant impact on performance when the number of goroutines is greater?

    Look forward to your reply.

    opened by zhu121 3
  • Passing variables to function in pool

    Passing variables to function in pool

    Hello sir,

    This might be a very nooby question, but say I want to do a simple for loop like so:

    	pool := pond.New(12, 1000)
    	for i := 0; i < 12; i++ {
    		pool.Submit(func() {
    			log.Println(i)
    		})
    	}
    	pool.StopAndWait()
    

    This will output:

    2021/07/09 11:23:26 12
    2021/07/09 11:23:26 12
    2021/07/09 11:23:26 12
    2021/07/09 11:23:26 12
    2021/07/09 11:23:26 12
    2021/07/09 11:23:26 12
    2021/07/09 11:23:26 12
    2021/07/09 11:23:26 12
    2021/07/09 11:23:26 12
    2021/07/09 11:23:26 12
    2021/07/09 11:23:26 12
    2021/07/09 11:23:26 12
    

    How do I pass variables to functions running in a pool?

    p.s.: for comparison, this does seem to work

    	for i := 0; i < 12; i++ {
    		func() {
    			log.Println(i)
    		}()
    	}
    
    opened by hb0nes 3
  • pond.New(30, 100, pond.MinWorkers(30)) creates 60 goroutines not 30

    pond.New(30, 100, pond.MinWorkers(30)) creates 60 goroutines not 30

    I'm using version 1.8.1.

    By writing pond.New(30, 100, pond.MinWorkers(30)) I'm expecting to create 30 goroutines at startup and have a fix pool of 30 overtime, am I correct?

    Thank you

    opened by hmedkouri 2
Releases(v1.8.2)
  • v1.8.2(Oct 14, 2022)

    Changes included

    • Synchronize read & write of TaskGroupWithContext's err variable https://github.com/alitto/pond/pull/37 (thanks @thekondor :1st_place_medal:)
    Source code(tar.gz)
    Source code(zip)
  • v1.8.1(Aug 28, 2022)

    Changes included

    • Fix for https://github.com/alitto/pond/issues/33
    • Upgrade to go 1.19
    • Extracted counter updates from main worker function to make it simpler and more generic
    • Moved worker function to a separate file
    • Added Makefile with test targets
    Source code(tar.gz)
    Source code(zip)
  • v1.8.0(May 9, 2022)

    • Upgrade go version to 1.18
    • Implement new method in WorkerPool to create a group of tasks associated to a context GroupContext(ctx context.Context)
    • Move TaskGroup to a separate file
    • Move tests related to task groups to a separate file
    Source code(tar.gz)
    Source code(zip)
  • v1.7.2(Apr 10, 2022)

  • v1.7.1(Mar 12, 2022)

    Changes included

    • Prevent "send on closed channel" panic in purger goroutine (https://github.com/alitto/pond/pull/27)
    • Ensure all concurrent calls to StopAndWait() do block until all workers have stopped
    • Simplify logic in submit function
    Source code(tar.gz)
    Source code(zip)
  • v1.7.0(Jan 2, 2022)

    Changes included

    • Added option to specify a parent context on a pool (pond.Context(parentCtx context.Context))
    • Added method to stop the pool and wait until a given deadline is reached (StopAndWaitFor(deadline time.Duration)
    • Migrated from Travis CI to Github Actions
    • Fixed typos in comments
    • Added a few sections to the Readme file
    Source code(tar.gz)
    Source code(zip)
  • v1.6.1(Dec 26, 2021)

    Changes included:

    • Improve handling of tasks submitted to a stopped pool
      • Sending a task using Submit() will panic with ErrSubmitOnStoppedPool if the pool has been stopped.
      • Sending a task using TrySubmit() will return false if the pool has been stopped.
    • Update dependencies in benchmark package
    Source code(tar.gz)
    Source code(zip)
  • v1.6.0(Nov 20, 2021)

  • v1.5.1(Jul 10, 2021)

    Changes included:

    • Prevent waitingTasks counter to wrap around (fixes https://github.com/alitto/pond/issues/12)
    • Upgrade version of go to 1.16
    Source code(tar.gz)
    Source code(zip)
  • v1.5.0(Dec 31, 2020)

    Changelog

    • Expose comprehensive metrics through worker pool instances
    • Add getters for all pool configuration options (max capacity, min and max workers and strategy)
    • Upgrade golang runtime to 1.15
    • Fix issue in TrySubmit() that caused tasks not to be queued when using a buffered pool and its max number of workers had been reached.
    Source code(tar.gz)
    Source code(zip)
  • v1.4.0(Jun 6, 2020)

    • Fix issue with resizing strategies not being thread-safe
    • Simplify resizing strategies
    • Several performance improvements
    • Add TrySubmit(task func()) bool method to allow submitting a task in a non-blocking manner
    • Enhance benchmarks to simulate more workload scenarios and different types of tasks (sync/async)
    Source code(tar.gz)
    Source code(zip)
  • v1.3.0(May 26, 2020)

    Changes include:

    • Add ability to configure pool resizing strategy (https://github.com/alitto/pond/pull/3)
      • 3 preset resizing strategies: Eager, Balanced and Lazy.
    • Improve benchmarks
    • Export interface to define custom resizing strategies pond.ResizingStrategy
    Source code(tar.gz)
    Source code(zip)
Owner
Alejandro Durante
Reinventing wheels for a living
Alejandro Durante
🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants 是一个高性能且低损耗的 goroutine 池。

A goroutine pool for Go English | ???? 中文 ?? Introduction Library ants implements a goroutine pool with fixed capacity, managing and recycling a massi

Andy Pan 9.5k Dec 2, 2022
golang worker pool , Concurrency limiting goroutine pool

golang worker pool 中文说明 Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks su

xxj 424 Nov 16, 2022
Golang Implementation of Worker Pool/ Thread Pool

Golang Implementation of Worker Pool/ Thread Pool

Telkom DEV 1 Jun 18, 2022
Work pool channlege - An url hash retriever worker pool for getting hash digest for a collection of urls

Code challenge The aim of this project is to provide an url hash retriever worke

null 0 Feb 16, 2022
errgroup with goroutine worker limits

neilotoole/errgroup neilotoole/errgroup is a drop-in alternative to Go's wonderful sync/errgroup but limited to N goroutines. This is useful for inter

Neil O'Toole 141 Nov 9, 2022
Worker-Pool written in GO

go-workerpool Worker-Pool written in GO Installation go get github.com/agungsid/go-workerpool Usage package main type SampleSeeder struct{} func (s

Agung Kurniawan 2 Jun 10, 2022
Worker - A Golang library that provides worker pools

Worker A Golang library that provides worker pools. Usage See *_test.go files. T

Fatih Cetinkaya 2 Apr 15, 2022
🐝 A Highly Performant and easy to use goroutine pool for Go

gohive Package gohive implements a simple and easy to use goroutine pool for Go Features Pool can be created with a specific size as per the requireme

Lovelesh 43 Sep 26, 2022
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Bo-Yi Wu 244 Nov 23, 2022
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

golang-queue 244 Nov 23, 2022
Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines

workerpool Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines. Uses Go 1.18 generics. Notable differe

Charalampos Mitsakis 50 Oct 5, 2022
Lightweight Goroutine pool

grpool Lightweight Goroutine pool Clients can submit jobs. Dispatcher takes job, and sends it to first available worker. When worker is done with proc

Ivan Pusic 719 Nov 21, 2022
A goroutine pool for Go

Tunny is a Golang library for spawning and managing a goroutine pool, allowing you to limit work coming from any number of goroutines with a synchrono

Ashley Jeffs 3.4k Nov 28, 2022
Concurrency limiting goroutine pool

workerpool Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks submitting task

Andrew Gillis 952 Nov 21, 2022
goroutine pool in golang

goroutine pool in golang

wksw 1 Nov 1, 2021
Go-miningcore-pool - Miningcore Pool written in GOlang

Go-Miningcore-Pool (COMING SOON) Miningcore Pool written in GOlang 0x01 Configur

miningcore.com 2 Apr 24, 2022
Go simple async worker pool

??‍?? worker-pool Go simple async worker pool. ?? ABOUT Worker pool is a software design pattern for achieving concurrency of task execution. Maintain

Rafał Lorenz 85 Sep 26, 2022
Deadly simple worker pool

go-worker-pool Deadly simple worker pool Usage package main import ( "errors" workerpool "github.com/zelenin/go-worker-pool" "log" "time" ) func

Aleksandr Zelenin 1 Dec 10, 2021
Go-async - Worker pool (fan-in/fan-out)

Worker pool (fan-in/fan-out) func main() { pool := worker.NewPool(2) ctx := co

Ilya Tsyganov 1 Aug 26, 2022