Concurrency limiting goroutine pool

Overview

workerpool

Build Status Go Report Card codecov License

Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks submitting tasks, no matter how many tasks are queued.

GoDoc

This implementation builds on ideas from the following:

Installation

To install this package, you need to setup your Go workspace. The simplest way to install the library is to run:

$ go get github.com/gammazero/workerpool

Example

package main

import (
	"fmt"
	"github.com/gammazero/workerpool"
)

func main() {
	wp := workerpool.New(2)
	requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}

	for _, r := range requests {
		r := r
		wp.Submit(func() {
			fmt.Println("Handling request:", r)
		})
	}

	wp.StopWait()
}

Usage Note

There is no upper limit on the number of tasks queued, other than the limits of system resources. If the number of inbound tasks is too many to even queue for pending processing, then the solution is outside the scope of workerpool, and should be solved by distributing load over multiple systems, and/or storing input for pending processing in intermediate storage such as a file system, distributed message queue, etc.

Real world examples

The list of open source projects using worker pool can be found here

Comments
  • Stop job while running

    Stop job while running

    Is there a way to stop jobs in mid-execution?

    I switched workerpoolxt to use context but the job still runs even though the context has been cancelled... I'm not really sure how to fix this, or if it is even possible.

    I have created a POC that reproduces this issue (code below) which you can also view/run on The Go Playground

    Any help would be greatly appreciated!!

    • Current output
    0 from job a
    1 from job a
    Job 'a' should have stopped here
    2 from job a
    3 from job a
    4 from job a
    [{a context deadline exceeded <nil>} {b <nil> from b}]
    
    • Expected output
    0 from job a
    1 from job a
    Job 'a' should have stopped here
    [{a context deadline exceeded <nil>} {b <nil> from b}]
    

    POC Code:

    package main
    
    import (
    	"context"
    	"fmt"
    	"time"
    
    	"github.com/gammazero/workerpool"
    )
    
    func main() {
    	runner := newRunner(context.Background(), 10)
    
    	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second*3))
    	defer cancel()
    
    	runner.do(job{
    		Name:    "a",
    		Context: ctx,
    		Task: func() jobResult {
    			for i := 0; i < 10000; i++ {
    				time.Sleep(time.Second * 1)
    				fmt.Println(i, "from job a")
    			}
    			return jobResult{Data: "from a"}
    		},
    	})
    
    	runner.do(job{
    		Name: "b",
    		Task: func() jobResult {
    			time.Sleep(time.Duration(time.Second * 6))
    			return jobResult{Data: "from b"}
    		},
    	})
    
    	results := runner.getjobResults()
    	fmt.Println(results)
    }
    
    type runner struct {
    	*workerpool.WorkerPool
    	defaultCtx context.Context
    	kill       chan struct{}
    	result     chan jobResult
    	results    []jobResult
    }
    
    func (r *runner) processResults() {
    	for {
    		select {
    		case res, ok := <-r.result:
    			if !ok {
    				goto Done
    			}
    			r.results = append(r.results, res)
    		}
    	}
    Done:
    	<-r.kill
    }
    
    func newRunner(ctx context.Context, numRunners int) *runner {
    	r := &runner{
    		WorkerPool: workerpool.New(numRunners),
    		kill:       make(chan struct{}),
    		result:     make(chan jobResult),
    		defaultCtx: ctx,
    	}
    	go r.processResults()
    	return r
    }
    
    func (r *runner) do(j job) {
    	r.Submit(r.wrap(&j))
    }
    
    func (r *runner) getjobResults() []jobResult {
    	r.StopWait()
    	close(r.result)
    	r.kill <- struct{}{}
    	return r.results
    }
    
    func (r *runner) wrap(job *job) func() {
    	return func() {
    		if job.Context == nil {
    			job.Context = r.defaultCtx
    		}
    		job.childContext, job.done = context.WithCancel(job.Context)
    		job.result = make(chan jobResult)
    		go job.Run()
    		r.result <- job.getResult()
    	}
    }
    
    type job struct {
    	Name         string
    	Task         func() jobResult
    	Context      context.Context
    	result       chan jobResult
    	childContext context.Context
    	stopped      chan struct{}
    	done         context.CancelFunc
    }
    
    func (j *job) Run() {
    	result := j.Task()
    	result.name = j.Name
    	j.result <- result
    	j.done()
    }
    
    func (j *job) getResult() jobResult {
    	select {
    	case r := <-j.result:
    		return r
    	case <-j.childContext.Done():
    		fmt.Printf("Job '%s' should have stopped here\n", j.Name)
    		switch j.childContext.Err() {
    		default:
    			return jobResult{name: j.Name, Error: j.childContext.Err()}
    		}
    	}
    }
    
    type jobResult struct {
    	name  string
    	Error error
    	Data  interface{}
    }
    
    opened by oze4 20
  • Delay between workers?

    Delay between workers?

    I was looking at your implementation for a worker pool in Go and I really like the way you approached it; in that you can submit at any time and it adds it to the queue. Most implementations seem to operate off of a set of known tasks a priori.

    Secondly, it doesn't delay the submission channel and this is very important because in my case if I lag even just a few milliseconds (or 10s of milliseconds) from the event firing, I will miss the next event. So far so good.

    However, what I am struggling to figure out is how to put a delay in somewhere so that I can set the time between executions for each task without interfering with the incoming events. This delay is especially needed for the first set of workers that get added to the pool because they get fired off simultaneously. I need at least some spacing for the web server to breathe a bit before I lay siege on it. ;)

    Any suggestions on where I can add this delay?

    opened by SGarno 9
  • Should Stop() be renamed or split into Close() and Wait()?

    Should Stop() be renamed or split into Close() and Wait()?

    Stumble upon this lib and it looks good but I have a concern the Stop() method could be named better.

    When I read example code I had impression Stop() would wait for current processes to complete, clear execution tasks queue and quit. But in reality seems it's wait until all tasks completed.

    Should not it be named something like Close() or CloseAndWait() or something?

    May be even better if there are 2 methods Close() to signal there are no more tasks and Wait() to wait for tasks completion.

    opened by trakhimenok 7
  • Submitting a task to a specific worker

    Submitting a task to a specific worker

    A concurrent put to a same dynamodb resource fails in aws-sdk-go so i was thinking if i could redirect put calls to specific workers (1 worker for each table and there can be a lot of tables) i would be able to solve this problem

    opened by kishaningithub 5
  • Race condition testing

    Race condition testing

    FYI

    I noticed the following tests are inconsistent/broken..

    • TestWaitingQueueSizeRace
    • TestStopRace
    # fails when count > ~30
    go test -race -run ^TestWaitingQueueSizeRace$ -count=100
    
    # this fails sometimes (run back to back to back, etc..)
    go test -race -run ^TestWaitingQueueSizeRace$ -count=30
    
    # without `-race` you need a high count, and run over and over
    go test -run ^TestStopRace$ -count=2000
    
    # this fails every time (I have yet to get it to pass)
    go test -race -run ^TestStopRace$ -count=100
    
    # this passes sometimes
    go test -race -run ^TestStopRace$ -count=40
    

    Output:

    • Example of how you need to run over and over to reveal inconsistencies
    MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
    PASS
    ok      github.com/gammazero/workerpool 0.180s
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
    PASS
    ok      github.com/gammazero/workerpool 0.177s
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
    --- FAIL: TestStopRace (0.00s)
        workerpool_test.go:331: Stop should not return in any goroutine
    FAIL
    exit status 1
    FAIL    github.com/gammazero/workerpool 0.172s
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
    PASS
    ok      github.com/gammazero/workerpool 0.168s
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
    --- FAIL: TestStopRace (0.00s)
        workerpool_test.go:331: Stop should not return in any goroutine
    --- FAIL: TestStopRace (0.00s)
        workerpool_test.go:331: Stop should not return in any goroutine
    FAIL
    exit status 1
    FAIL    github.com/gammazero/workerpool 0.169s
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    
    opened by oze4 3
  • Get response from all worker pulls

    Get response from all worker pulls

    Hi,

    I want to run multiple requests and get one response (struct or something) that all done. I mean if I've 10 request and the longest took 10 sec i want after 10 seconds get the all the responses with id or something for each request to know who success and who has failed, is it possible ?

    thanks

    opened by JennyMet 3
  • Possible memory leak?

    Possible memory leak?

    When I use something like this

    package main
    
    import (
    	"fmt"
    	"time"
    	"github.com/gammazero/workerpool"
    )
    
    func main() {
    	wp := workerpool.New(2)
    	z := 0
    	for {
    		r := z
    		wp.Submit(func() {
    			fmt.Println("Handling request:", r)
    			time.Sleep(time.Second * 3)
    		})
    		z++
    	}
    	wp.StopWait()
    }
    

    leak

    opened by DeadNumbers 3
  • What the best way for pause pool?

    What the best way for pause pool?

    something like this

    package main
    
    import (
    	"fmt"
    	"github.com/gammazero/workerpool"
    	"time"
    )
    
    var isPaused = 1
    
    func main() {
    	go func() {
    		time.Sleep(time.Second * 10)
    		isPaused = 0
    	}()
    	wp := workerpool.New(2)
    	requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}
    
    	for _, r := range requests {
    		r := r
    		wp.Submit(func() {
    			pause()
    			fmt.Println("Handling request:", r)
    		})
    	}
    
    	wp.StopWait()
    }
    
    func pause() int {
    	if isPaused == 1 {
    		time.Sleep(time.Millisecond * 100)
    		return pause()
    	} else {
    		return 0
    	}
    }
    
    opened by DeadNumbers 3
  • Adding testcase for concurrent use of WaitingQueueSize()

    Adding testcase for concurrent use of WaitingQueueSize()

    This adds a test for concurrent usage of WaitingQueueSize(). This currently will fail if run with the go race detector go test -race because the underlying deque used for the queue of waiting tasks is not thread-safe.

    opened by evantorrie 3
  • perf: unify startWorker and worker to spawn less goroutines

    perf: unify startWorker and worker to spawn less goroutines

    This PR is more like a question, more than a fix, as I think worker and startWorker are in split for a reason, which I just probably don't get. Can you pls explain it? I couldn't find the answer in git history and out of curiosity.

    If there is no reason for that, then this PR should decrease the load on the runtime by spawning fewer routines in general and one less per worker in particular. Thanks for the great library anyway.

    opened by Wondertan 2
  • add RunningCount() method to pool

    add RunningCount() method to pool

    This returns the number of currently running jobs, analogously to how WaitingQueueSize() returns the size of the waiting queue.

    To be able to access the new running pool attribute, I took the liberty of refactoring worker() and startWorker() as methods of WorkerPool (as opposed to functions). Since all member accesses are either protected by atomic or channels, this should not be a problem, but please let me know if you think I missed something.

    Naming is of course up for bikeshedding :wink:

    opened by costela 2
  • What is the purpose of

    What is the purpose of "r := r"?

    Hi there,

    I'm using your library on every project that I have. But there is a line in your example that I don't understand. I'm not an expert on Go, maybe it's something very basic. So forgive me if it's a silly question.

    for _, r := range requests {
    		r := r
    		wp.Submit(func() {
    			fmt.Println("Handling request:", r)
    		})
    	}
    

    Why we are re-defining r there at r := r? We already have the value of r. The code doesn't work without that line. It became a mystery for me.

    opened by utkusen 1
  • fix for channel to be closed gracefully

    fix for channel to be closed gracefully

    If you input to closed channel, we face error panic: Input to a closed channel

    It actually happened when taskQueue <- task in Submit function in my project

    I am using Stopped() function for check. This does not guarantee that the channel is not closed.

    and I think most application is N Senders model for channel.

    This fix will be necessary

    opened by San9H0 0
  • Ability configure idle timeout.

    Ability configure idle timeout.

    Currently, idle timeout is hardcoded, but I encountered a use case where it will be useful to change it to a higher number. Any chance you can accept a PR for this @gammazero?

    opened by Wondertan 1
  • [feature request] naming workerpools

    [feature request] naming workerpools

    First, so many thanks for your workerpool module that is exactly what I've been looking for!

    In my projects I tend to use several worker pools and I use them for transient work only, so these are not worker pools kept around all the time a service is up. Instead, I create worker pools as needed for individual requests (and there are some reasons for not sharing certain workers across different service requests).

    When unit testing my service handlers for goroutine leaks (using Gomega's at this time still brand new gleak goroutine leak tester) I noticed that it would be quite nice for diagnosing if workerpools and their workers could be individually identified by some name. The name of a worker could simply be the one of its pool for simplicity. Would you be interested in a PR?

    opened by thediveo 0
  • Why startWorker() and not worker()?

    Why startWorker() and not worker()?

    Hello!

    Why are you using startWorker() and not worker()?

    https://github.com/gammazero/workerpool/blob/85cc841576f67b14360c7298da3bcbbb76757020/workerpool.go#L192-L200

    Maybe you have some examples, how I can pass db connection to worker? Not to specific task. Main idea is: Up some workers with 1 connection per worker and do sql requests(tasks) using worker sql connection.

    opened by Tri0L 2
  • Add blocking submission of tasks until worker becomes available

    Add blocking submission of tasks until worker becomes available

    By creating a blocking submission alternative in worker pool the size of task queue can be regulated/controlled. This could be useful when you want to limit the size of task queue, e.g. to the number of workers.

    This PR includes small changes similar to SubmitWait where the only difference is to close the "done" channel before executing the actual task from the worker, like so:

    if waitForWorker {
        close(doneChan)
        task()
    } else {
        task()
        close(doneChan)
    }
    
    opened by AntonioMorales97 1
Releases(v1.1.3)
  • v1.1.3(Aug 12, 2022)

    What's Changed

    • Fix tests not working with goleak
    • Use sync.WaitGroup to ensure that all workers exit before Stop exits
    • perf: unify startWorker and worker to spawn less goroutines
    • make it work with deque0.2 and go 1.18
    • Task queue does not need to be buffered by

    New Contributors

    • @Wondertan made their first contribution in https://github.com/gammazero/workerpool/pull/59
    • @yulin-li made their first contribution in https://github.com/gammazero/workerpool/pull/64

    Full Changelog: https://github.com/gammazero/workerpool/compare/v1.1.2...v1.1.3

    Source code(tar.gz)
    Source code(zip)
  • v1.1.1(Oct 13, 2020)

  • v1.1.0(Oct 10, 2020)

  • v1.0.0(Jul 19, 2020)

Owner
Andrew Gillis
Andrew Gillis
:speedboat: a limited consumer goroutine or unlimited goroutine pool for easier goroutine handling and cancellation

Package pool Package pool implements a limited consumer goroutine or unlimited goroutine pool for easier goroutine handling and cancellation. Features

Go Playgound 703 Nov 8, 2022
🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants 是一个高性能且低损耗的 goroutine 池。

A goroutine pool for Go English | ???? 中文 ?? Introduction Library ants implements a goroutine pool with fixed capacity, managing and recycling a massi

Andy Pan 9.5k Nov 22, 2022
🐝 A Highly Performant and easy to use goroutine pool for Go

gohive Package gohive implements a simple and easy to use goroutine pool for Go Features Pool can be created with a specific size as per the requireme

Lovelesh 43 Sep 26, 2022
Lightweight Goroutine pool

grpool Lightweight Goroutine pool Clients can submit jobs. Dispatcher takes job, and sends it to first available worker. When worker is done with proc

Ivan Pusic 718 Nov 18, 2022
Minimalistic and High-performance goroutine worker pool written in Go

pond Minimalistic and High-performance goroutine worker pool written in Go Motivation This library is meant to provide a simple way to limit concurren

Alejandro Durante 668 Nov 23, 2022
A goroutine pool for Go

Tunny is a Golang library for spawning and managing a goroutine pool, allowing you to limit work coming from any number of goroutines with a synchrono

Ashley Jeffs 3.4k Nov 17, 2022
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Bo-Yi Wu 243 Nov 21, 2022
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

golang-queue 243 Nov 21, 2022
goroutine pool in golang

goroutine pool in golang

wksw 1 Nov 1, 2021
gpool - a generic context-aware resizable goroutines pool to bound concurrency based on semaphore.

gpool - a generic context-aware resizable goroutines pool to bound concurrency. Installation $ go get github.com/sherifabdlnaby/gpool import "github.c

Sherif Abdel-Naby 86 Oct 31, 2022
Golang Implementation of Worker Pool/ Thread Pool

Golang Implementation of Worker Pool/ Thread Pool

Telkom DEV 1 Jun 18, 2022
Go-miningcore-pool - Miningcore Pool written in GOlang

Go-Miningcore-Pool (COMING SOON) Miningcore Pool written in GOlang 0x01 Configur

miningcore.com 2 Apr 24, 2022
Go-ldap-pool - A simple connection pool for go-ldap

Basic connection pool for go-ldap This little library use the go-ldap library an

Vincent 2 May 9, 2022
Work pool channlege - An url hash retriever worker pool for getting hash digest for a collection of urls

Code challenge The aim of this project is to provide an url hash retriever worke

null 0 Feb 16, 2022
errgroup with goroutine worker limits

neilotoole/errgroup neilotoole/errgroup is a drop-in alternative to Go's wonderful sync/errgroup but limited to N goroutines. This is useful for inter

Neil O'Toole 141 Nov 9, 2022
Waiting group for collecting goroutine information.

在go语言waitGroup和errGroup都是用来控制goroutine的并发的方式,前者只能等待所有goroutine执行完成之后再执行Wait()函数后面的代码并且不

Jarvib Ding 111 Dec 3, 2021
A simple and useful goroutine concurrent library.

Taskgroup A simple and useful goroutine concurrent library. Installation go get github.com/anthhub/taskgroup

Tangqy 4 May 19, 2021
Provides some convenient API, includes Goid(), AllGoid(), and LocalStorage, which is a goroutine's local storage, just like ThreadLocal in other languages.

routine 中文版 routine encapsulates and provides some easy-to-use, high-performance goroutine context access interfaces, which can help you access corout

null 92 Nov 20, 2022
A universal mechanism to manage goroutine lifecycles

A universal mechanism to manage goroutine lifecycles

OK Log 1.2k Nov 16, 2022