Concurrency limiting goroutine pool

Overview

workerpool

Build Status Go Report Card codecov License

Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks submitting tasks, no matter how many tasks are queued.

GoDoc

This implementation builds on ideas from the following:

Installation

To install this package, you need to setup your Go workspace. The simplest way to install the library is to run:

$ go get github.com/gammazero/workerpool

Example

package main

import (
	"fmt"
	"github.com/gammazero/workerpool"
)

func main() {
	wp := workerpool.New(2)
	requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}

	for _, r := range requests {
		r := r
		wp.Submit(func() {
			fmt.Println("Handling request:", r)
		})
	}

	wp.StopWait()
}

Usage Note

There is no upper limit on the number of tasks queued, other than the limits of system resources. If the number of inbound tasks is too many to even queue for pending processing, then the solution is outside the scope of workerpool, and should be solved by distributing load over multiple systems, and/or storing input for pending processing in intermediate storage such as a file system, distributed message queue, etc.

Real world examples

The list of open source projects using worker pool can be found here

Issues
  • Stop job while running

    Stop job while running

    Is there a way to stop jobs in mid-execution?

    I switched workerpoolxt to use context but the job still runs even though the context has been cancelled... I'm not really sure how to fix this, or if it is even possible.

    I have created a POC that reproduces this issue (code below) which you can also view/run on The Go Playground

    Any help would be greatly appreciated!!

    • Current output
    0 from job a
    1 from job a
    Job 'a' should have stopped here
    2 from job a
    3 from job a
    4 from job a
    [{a context deadline exceeded <nil>} {b <nil> from b}]
    
    • Expected output
    0 from job a
    1 from job a
    Job 'a' should have stopped here
    [{a context deadline exceeded <nil>} {b <nil> from b}]
    

    POC Code:

    package main
    
    import (
    	"context"
    	"fmt"
    	"time"
    
    	"github.com/gammazero/workerpool"
    )
    
    func main() {
    	runner := newRunner(context.Background(), 10)
    
    	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Second*3))
    	defer cancel()
    
    	runner.do(job{
    		Name:    "a",
    		Context: ctx,
    		Task: func() jobResult {
    			for i := 0; i < 10000; i++ {
    				time.Sleep(time.Second * 1)
    				fmt.Println(i, "from job a")
    			}
    			return jobResult{Data: "from a"}
    		},
    	})
    
    	runner.do(job{
    		Name: "b",
    		Task: func() jobResult {
    			time.Sleep(time.Duration(time.Second * 6))
    			return jobResult{Data: "from b"}
    		},
    	})
    
    	results := runner.getjobResults()
    	fmt.Println(results)
    }
    
    type runner struct {
    	*workerpool.WorkerPool
    	defaultCtx context.Context
    	kill       chan struct{}
    	result     chan jobResult
    	results    []jobResult
    }
    
    func (r *runner) processResults() {
    	for {
    		select {
    		case res, ok := <-r.result:
    			if !ok {
    				goto Done
    			}
    			r.results = append(r.results, res)
    		}
    	}
    Done:
    	<-r.kill
    }
    
    func newRunner(ctx context.Context, numRunners int) *runner {
    	r := &runner{
    		WorkerPool: workerpool.New(numRunners),
    		kill:       make(chan struct{}),
    		result:     make(chan jobResult),
    		defaultCtx: ctx,
    	}
    	go r.processResults()
    	return r
    }
    
    func (r *runner) do(j job) {
    	r.Submit(r.wrap(&j))
    }
    
    func (r *runner) getjobResults() []jobResult {
    	r.StopWait()
    	close(r.result)
    	r.kill <- struct{}{}
    	return r.results
    }
    
    func (r *runner) wrap(job *job) func() {
    	return func() {
    		if job.Context == nil {
    			job.Context = r.defaultCtx
    		}
    		job.childContext, job.done = context.WithCancel(job.Context)
    		job.result = make(chan jobResult)
    		go job.Run()
    		r.result <- job.getResult()
    	}
    }
    
    type job struct {
    	Name         string
    	Task         func() jobResult
    	Context      context.Context
    	result       chan jobResult
    	childContext context.Context
    	stopped      chan struct{}
    	done         context.CancelFunc
    }
    
    func (j *job) Run() {
    	result := j.Task()
    	result.name = j.Name
    	j.result <- result
    	j.done()
    }
    
    func (j *job) getResult() jobResult {
    	select {
    	case r := <-j.result:
    		return r
    	case <-j.childContext.Done():
    		fmt.Printf("Job '%s' should have stopped here\n", j.Name)
    		switch j.childContext.Err() {
    		default:
    			return jobResult{name: j.Name, Error: j.childContext.Err()}
    		}
    	}
    }
    
    type jobResult struct {
    	name  string
    	Error error
    	Data  interface{}
    }
    
    opened by oze4 20
  • Delay between workers?

    Delay between workers?

    I was looking at your implementation for a worker pool in Go and I really like the way you approached it; in that you can submit at any time and it adds it to the queue. Most implementations seem to operate off of a set of known tasks a priori.

    Secondly, it doesn't delay the submission channel and this is very important because in my case if I lag even just a few milliseconds (or 10s of milliseconds) from the event firing, I will miss the next event. So far so good.

    However, what I am struggling to figure out is how to put a delay in somewhere so that I can set the time between executions for each task without interfering with the incoming events. This delay is especially needed for the first set of workers that get added to the pool because they get fired off simultaneously. I need at least some spacing for the web server to breathe a bit before I lay siege on it. ;)

    Any suggestions on where I can add this delay?

    opened by SGarno 9
  • Should Stop() be renamed or split into Close() and Wait()?

    Should Stop() be renamed or split into Close() and Wait()?

    Stumble upon this lib and it looks good but I have a concern the Stop() method could be named better.

    When I read example code I had impression Stop() would wait for current processes to complete, clear execution tasks queue and quit. But in reality seems it's wait until all tasks completed.

    Should not it be named something like Close() or CloseAndWait() or something?

    May be even better if there are 2 methods Close() to signal there are no more tasks and Wait() to wait for tasks completion.

    opened by trakhimenok 7
  • Submitting a task to a specific worker

    Submitting a task to a specific worker

    A concurrent put to a same dynamodb resource fails in aws-sdk-go so i was thinking if i could redirect put calls to specific workers (1 worker for each table and there can be a lot of tables) i would be able to solve this problem

    opened by kishaningithub 5
  • Get response from all worker pulls

    Get response from all worker pulls

    Hi,

    I want to run multiple requests and get one response (struct or something) that all done. I mean if I've 10 request and the longest took 10 sec i want after 10 seconds get the all the responses with id or something for each request to know who success and who has failed, is it possible ?

    thanks

    opened by JennyMet 3
  • Possible memory leak?

    Possible memory leak?

    When I use something like this

    package main
    
    import (
    	"fmt"
    	"time"
    	"github.com/gammazero/workerpool"
    )
    
    func main() {
    	wp := workerpool.New(2)
    	z := 0
    	for {
    		r := z
    		wp.Submit(func() {
    			fmt.Println("Handling request:", r)
    			time.Sleep(time.Second * 3)
    		})
    		z++
    	}
    	wp.StopWait()
    }
    

    leak

    opened by DeadNumbers 3
  • What the best way for pause pool?

    What the best way for pause pool?

    something like this

    package main
    
    import (
    	"fmt"
    	"github.com/gammazero/workerpool"
    	"time"
    )
    
    var isPaused = 1
    
    func main() {
    	go func() {
    		time.Sleep(time.Second * 10)
    		isPaused = 0
    	}()
    	wp := workerpool.New(2)
    	requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}
    
    	for _, r := range requests {
    		r := r
    		wp.Submit(func() {
    			pause()
    			fmt.Println("Handling request:", r)
    		})
    	}
    
    	wp.StopWait()
    }
    
    func pause() int {
    	if isPaused == 1 {
    		time.Sleep(time.Millisecond * 100)
    		return pause()
    	} else {
    		return 0
    	}
    }
    
    opened by DeadNumbers 3
  • Race condition testing

    Race condition testing

    FYI

    I noticed the following tests are inconsistent/broken..

    • TestWaitingQueueSizeRace
    • TestStopRace
    # fails when count > ~30
    go test -race -run ^TestWaitingQueueSizeRace$ -count=100
    
    # this fails sometimes (run back to back to back, etc..)
    go test -race -run ^TestWaitingQueueSizeRace$ -count=30
    
    # without `-race` you need a high count, and run over and over
    go test -run ^TestStopRace$ -count=2000
    
    # this fails every time (I have yet to get it to pass)
    go test -race -run ^TestStopRace$ -count=100
    
    # this passes sometimes
    go test -race -run ^TestStopRace$ -count=40
    

    Output:

    • Example of how you need to run over and over to reveal inconsistencies
    MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
    PASS
    ok      github.com/gammazero/workerpool 0.180s
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
    PASS
    ok      github.com/gammazero/workerpool 0.177s
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
    --- FAIL: TestStopRace (0.00s)
        workerpool_test.go:331: Stop should not return in any goroutine
    FAIL
    exit status 1
    FAIL    github.com/gammazero/workerpool 0.172s
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
    PASS
    ok      github.com/gammazero/workerpool 0.168s
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$ go test -run ^TestStopRace$ -count=2000
    --- FAIL: TestStopRace (0.00s)
        workerpool_test.go:331: Stop should not return in any goroutine
    --- FAIL: TestStopRace (0.00s)
        workerpool_test.go:331: Stop should not return in any goroutine
    FAIL
    exit status 1
    FAIL    github.com/gammazero/workerpool 0.169s
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    MY-COMPUTER:workerpool user$
    
    opened by oze4 3
  • Adding testcase for concurrent use of WaitingQueueSize()

    Adding testcase for concurrent use of WaitingQueueSize()

    This adds a test for concurrent usage of WaitingQueueSize(). This currently will fail if run with the go race detector go test -race because the underlying deque used for the queue of waiting tasks is not thread-safe.

    opened by evantorrie 3
  • New release with `Pause` method

    New release with `Pause` method

    Are there any plans to publish a release with the new Pause method?

    opened by oze4 3
  • Document how to handle errors, please

    Document how to handle errors, please

    How do you surface errors in your tasks?

    opened by zapman449 1
  • Possible memory leak

    Possible memory leak

    I use this package in my application and I observe that the memory consumption is increased over time. I added goleak to search for leaks in one of the existing tests and found the following:

    Goroutine 6 in state select, with github.com/gammazero/workerpool.(*WorkerPool).dispatch on top of the stack:
            goroutine 6 [select]:
            github.com/gammazero/workerpool.(*WorkerPool).dispatch(0x14000108080)
            	/Users/myuser/ws/workerpool/workerpool.go:184 +0x180
            created by github.com/gammazero/workerpool.New
            	/Users/myuser/ws/workerpool/workerpool.go:37 +0x108
            ]
    
    opened by lidortal 8
  • support to change maxWorkers dynamically

    support to change maxWorkers dynamically

    We really want the feature to change maxWorkers dynamically. Any plan to support this feature? I try to implement a SetMaxWorkers for WorkerPool but find that the implementation of Pause rely on a static value of maxWorkers which make the situation complicated.

    opened by YueHonghui 1
  • Pause a workerpool through keyboard

    Pause a workerpool through keyboard

    Hello dear developer, I'm looking for a way to pause a workerpool with pressing some key("P" as an example) and resume work with the same key. And I want to completely stop workerpooI with another key(maybe with "S" key). I have construction as an your example

    opened by Numenorean 14
Releases(v1.1.2)
Owner
Andrew Gillis
Andrew Gillis
:speedboat: a limited consumer goroutine or unlimited goroutine pool for easier goroutine handling and cancellation

Package pool Package pool implements a limited consumer goroutine or unlimited goroutine pool for easier goroutine handling and cancellation. Features

Go Playgound 667 Jan 13, 2022
🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants 是一个高性能且低损耗的 goroutine 池。

A goroutine pool for Go English | ???? 中文 ?? Introduction Library ants implements a goroutine pool with fixed capacity, managing and recycling a massi

Andy Pan 7.5k Jan 23, 2022
🐝 A Highly Performant and easy to use goroutine pool for Go

gohive Package gohive implements a simple and easy to use goroutine pool for Go Features Pool can be created with a specific size as per the requireme

Lovelesh 29 Jan 15, 2022
Lightweight Goroutine pool

grpool Lightweight Goroutine pool Clients can submit jobs. Dispatcher takes job, and sends it to first available worker. When worker is done with proc

Ivan Pusic 670 Jan 7, 2022
Minimalistic and High-performance goroutine worker pool written in Go

pond Minimalistic and High-performance goroutine worker pool written in Go Motivation This library is meant to provide a simple way to limit concurren

Alejandro Durante 434 Jan 14, 2022
A goroutine pool for Go

Tunny is a Golang library for spawning and managing a goroutine pool, allowing you to limit work coming from any number of goroutines with a synchrono

Ashley Jeffs 2.8k Jan 22, 2022
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Bo-Yi Wu 128 Jan 12, 2022
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

golang-queue 128 Jan 12, 2022
goroutine pool in golang

goroutine pool in golang

wksw 1 Nov 1, 2021
gpool - a generic context-aware resizable goroutines pool to bound concurrency based on semaphore.

gpool - a generic context-aware resizable goroutines pool to bound concurrency. Installation $ go get github.com/sherifabdlnaby/gpool import "github.c

Sherif Abdel-Naby 82 Nov 23, 2021
errgroup with goroutine worker limits

neilotoole/errgroup neilotoole/errgroup is a drop-in alternative to Go's wonderful sync/errgroup but limited to N goroutines. This is useful for inter

Neil O'Toole 108 Jan 12, 2022
Waiting group for collecting goroutine information.

在go语言waitGroup和errGroup都是用来控制goroutine的并发的方式,前者只能等待所有goroutine执行完成之后再执行Wait()函数后面的代码并且不

Jarvib Ding 111 Dec 3, 2021
A simple and useful goroutine concurrent library.

Taskgroup A simple and useful goroutine concurrent library. Installation go get github.com/anthhub/taskgroup

Tangqy 4 May 19, 2021
Provides some convenient API, includes Goid(), AllGoid(), and LocalStorage, which is a goroutine's local storage, just like ThreadLocal in other languages.

routine 中文版 routine encapsulates and provides some easy-to-use, high-performance goroutine context access interfaces, which can help you access corout

null 38 Jan 17, 2022
A universal mechanism to manage goroutine lifecycles

A universal mechanism to manage goroutine lifecycles

OK Log 1k Jan 9, 2022
A cross goroutine storage tool with very simple implementation and function.

Simple-goroutine-local is a cross goroutine storage tool with very simple implementation and function (the concept is similar to Java ThreadLocal). Ge

null 0 Jan 13, 2022
A sync.WaitGroup with error handling and concurrency control

go-waitgroup How to use An package that allows you to use the constructs of a sync.WaitGroup to create a pool of goroutines and control the concurrenc

Pieter Claerhout 23 Dec 28, 2021
Structured Concurrency in Go

nursery: structured concurrency in Go RunConcurrently( // Job 1 func(context.Context, chan error) { time.Sleep(time.Millisecond * 10)

null 38 Nov 15, 2021
This repository collects common concurrency patterns in Golang

Go Concurrency Patterns This repository collects common concurrency patterns in Golang Materials Concurrency is not parallelism Go Concurrency Pattern

Kha Nguyen 1.3k Jan 20, 2022