Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines

Overview

workerpool

Worker pool library with auto-scaling, backpressure, and easy composability of pools into pipelines. Uses Go 1.18 generics.

Notable differences from other worker pool libraries:

  • Each worker runs init/deinit functions (if set) when it starts/stops respectively, and stores the value returned by the init function (e.g. a connection, or external process) for the duration of the time it is active. This way you can easily create a connection pool for a crawler or email sender.
  • You don't submit a function for each job. Instead you pass a handler function at the creation of the pool and then you submit a value (payload) for each job.
  • You can connect worker pools into a pipeline. This way you can increase performance by separating IO-intensive from CPU-intensive tasks (see crawler example), or IO tasks of different parallelizability (e.g. crawling and saving to disk).

backpressure: The pool includes a queue with limited capacity. If the queue is full, job submissions block until they can be put in queue.

auto-scaling: If too many jobs are in queue, new workers are started (if available). If there are more active workers than needed, some workers (and their respective goroutines) are stopped. You can disable auto-scaling for CPU intensive tasks.

steady-state behavior: If the rate of job submissions is constant, the number of active workers will quickly become almost constant, and the output rate will be equal to the input (submission) rate.

Installation

Requires Go 1.18

go get go.mitsakis.org/workerpool

Documentation

https://pkg.go.dev/go.mitsakis.org/workerpool

Under development. API is subject to change.

Usage

Type Pool[I, O, C any] uses three type parameters:

  • I: input (job payload) type
  • O: output (result) type
  • C: type returned by the workerInit() function (e.g. a connection)

You might not need all three type parameter so for convenience you can create a pool by using a constructor that hides some type parameters. That's why there are four constructors of increasing complexity:

NewPoolSimple(
	maxActiveWorkers int,
	handler func(job Job[I], workerID int) error,
	...)

NewPoolWithInit(
	maxActiveWorkers int,
	handler func(job Job[I], workerID int, connection C) error,
	workerInit func(workerID int) (C, error),
	workerDeinit func(workerID int, connection C) error,
	...)

NewPoolWithResults(
	maxActiveWorkers int,
	handler func(job Job[I], workerID int) (O, error),
	...)

NewPoolWithResultsAndInit(
	maxActiveWorkers int,
	handler func(job Job[I], workerID int, connection C) (O, error),
	workerInit func(workerID int) (C, error),
	workerDeinit func(workerID int, connection C) error,
	...)

You can also connect pools of compatible type (results of p1 are the same type as inputs to p2) into a pipeline by using the ConnectPools(p1, p2, handleError) function like this:

workerpool.ConnectPools(p1, p2, func(result workerpool.Result[string, []byte]) {
	// log error
})

By connecting two pools, results of p1 that have no error are submitted to p2, and those with an error are handled by the handleError() function.

Simple example

package main

import (
	"fmt"
	"math"

	"go.mitsakis.org/workerpool"
)

func main() {
	p, _ := workerpool.NewPoolSimple(4, func(job workerpool.Job[float64], workerID int) error {
		result := math.Sqrt(job.Payload)
		fmt.Println("result:", result)
		return nil
	})
	for i := 0; i < 100; i++ {
		p.Submit(float64(i))
	}
	p.StopAndWait()
}

Pipeline example

A more complicated example with three pools connected into a pipeline.

package main

import (
	"fmt"
	"math"

	"go.mitsakis.org/workerpool"
)

func main() {
	// stage 1: calculate square root
	p1, _ := workerpool.NewPoolWithResults(10, func(job workerpool.Job[float64], workerID int) (float64, error) {
		return math.Sqrt(job.Payload), nil
	})

	// stage 2: negate number
	p2, _ := workerpool.NewPoolWithResults(10, func(job workerpool.Job[float64], workerID int) (float64, error) {
		return -job.Payload, nil
	})

	// stage 3: convert float to string
	p3, _ := workerpool.NewPoolWithResults(10, func(job workerpool.Job[float64], workerID int) (string, error) {
		return fmt.Sprintf("%.3f", job.Payload), nil
	})

	// connect p1, p2, p3 into a pipeline
	workerpool.ConnectPools(p1, p2, nil)
	workerpool.ConnectPools(p2, p3, nil)

	go func() {
		for i := 0; i < 100; i++ {
			p1.Submit(float64(i))
		}
		p1.StopAndWait()
	}()

	for result := range p3.Results {
		fmt.Println("result:", result.Value)
	}
}

Crawler pipeline example

A real world example with two pools. The first pool (p1) downloads URLs and the second (p2) processes the downloaded documents. Each worker has its own http.Transport that is reused between requests.

399 { return nil, fmt.Errorf("HTTP status code %d", resp.StatusCode) } body, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) } return body, nil }, func(workerID int) (*http.Transport, error) { // worker init function return &http.Transport{}, nil }, func(workerID int, tr *http.Transport) error { // worker deinit function tr.CloseIdleConnections() return nil }, workerpool.Retries(2)) // retry twice if error is retryable // pool p2 processes the content of the URLs downloaded by p1 p2, _ := workerpool.NewPoolWithResults(1, func(job workerpool.Job[[]byte], workerID int) (int, error) { numOfLines := bytes.Count(job.Payload, []byte("\n")) return numOfLines, nil }, workerpool.FixedWorkers()) // we use a fixed number of workers (1) because it's a CPU intensive task // connect pools p1, p2 into a pipeline. // documents downloaded by p1 are submitted to p2 for further processing. workerpool.ConnectPools(p1, p2, func(result workerpool.Result[string, []byte]) { if result.Error != nil { log.Printf("failed to download URL %v - error: %v", result.Job.Payload, result.Error) } }) go func() { urls := []string{ "http://example.com/", // add your own URLs } for _, u := range urls { time.Sleep(100 * time.Millisecond) log.Println("submitting URL:", u) p1.Submit(u) } p1.StopAndWait() }() for result := range p2.Results { log.Printf("web page has %d lines\n", result.Value) } }">
package main

import (
	"bytes"
	"fmt"
	"io"
	"log"
	"net/http"
	"time"

	"go.mitsakis.org/workerpool"
)

func main() {
	// pool p1 downloads URLs
	p1, _ := workerpool.NewPoolWithResultsAndInit(5, func(job workerpool.Job[string], workerID int, tr *http.Transport) ([]byte, error) {
		client := &http.Client{
			Transport: tr,
			Timeout:   30 * time.Second,
		}
		resp, err := client.Get(job.Payload)
		if err != nil {
			// mark error as retryable
			return nil, workerpool.ErrorWrapRetryable(fmt.Errorf("client.Get failed: %w", err))
		}
		if resp.StatusCode < 200 || resp.StatusCode > 399 {
			return nil, fmt.Errorf("HTTP status code %d", resp.StatusCode)
		}
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return nil, fmt.Errorf("failed to read response body: %w", err)
		}
		return body, nil
	}, func(workerID int) (*http.Transport, error) { // worker init function
		return &http.Transport{}, nil
	}, func(workerID int, tr *http.Transport) error { // worker deinit function
		tr.CloseIdleConnections()
		return nil
	}, workerpool.Retries(2)) // retry twice if error is retryable

	// pool p2 processes the content of the URLs downloaded by p1
	p2, _ := workerpool.NewPoolWithResults(1, func(job workerpool.Job[[]byte], workerID int) (int, error) {
		numOfLines := bytes.Count(job.Payload, []byte("\n"))
		return numOfLines, nil
	}, workerpool.FixedWorkers()) // we use a fixed number of workers (1) because it's a CPU intensive task

	// connect pools p1, p2 into a pipeline.
	// documents downloaded by p1 are submitted to p2 for further processing.
	workerpool.ConnectPools(p1, p2, func(result workerpool.Result[string, []byte]) {
		if result.Error != nil {
			log.Printf("failed to download URL %v - error: %v", result.Job.Payload, result.Error)
		}
	})

	go func() {
		urls := []string{
			"http://example.com/",
			// add your own URLs
		}
		for _, u := range urls {
			time.Sleep(100 * time.Millisecond)
			log.Println("submitting URL:", u)
			p1.Submit(u)
		}
		p1.StopAndWait()
	}()

	for result := range p2.Results {
		log.Printf("web page has %d lines\n", result.Value)
	}
}

Contributing

Non-code Contributions

Bug reports, and ideas to improve the API or the auto-scaling behavior, are welcome.

Code Contributions

Bug fixes, and improvements to auto-scaling (implementation or tests), are welcome.

Correctness tests (go test -run Correctness) must pass, and auto-scaling behavior tests (go test -run Autoscaling -v -timeout 30m) should not become worse.

License

Copyright (C) 2022 Charalampos Mitsakis (go.mitsakis.org/workerpool)

This software is licensed under the terms of the Apache License, Version 2.0

Issues
  • duplicate debug if

    duplicate debug if

    i read through the code to see if it may fit my needs and stumbled upon this very minor duplication in https://github.com/cmitsakis/workerpool-go/blob/master/main.go#L552

    Concerning the debug statements it looks like a local struct method could clean things up.

    
    // usage 
    // => w.debug(fmt.Sprintf("[workerpool/worker%d] finished", w.id))
    
    func (w *worker[I, O, C]) debug(msg string){
    	if w.pool.loggerDebug != nil {
    		LoggerDebug.Println(msg) //this should also add the \n 
    	}
    }
    

    I am not a go veteran, so i had to lookup some constructs you are using and found some methods are hard to interpret/read. Especially the loop func is quite a bit of magic :-) Still i really like the basic ideas and having it encapsulated in a tiny self-contained package is great!

    opened by schorsch 1
Owner
Charalampos Mitsakis
Charalampos Mitsakis
golang worker pool , Concurrency limiting goroutine pool

golang worker pool 中文说明 Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks su

xxj 404 Aug 8, 2022
Golang Implementation of Worker Pool/ Thread Pool

Golang Implementation of Worker Pool/ Thread Pool

Telkom DEV 1 Jun 18, 2022
Work pool channlege - An url hash retriever worker pool for getting hash digest for a collection of urls

Code challenge The aim of this project is to provide an url hash retriever worke

null 0 Feb 16, 2022
Minimalistic and High-performance goroutine worker pool written in Go

pond Minimalistic and High-performance goroutine worker pool written in Go Motivation This library is meant to provide a simple way to limit concurren

Alejandro Durante 589 Aug 3, 2022
Go simple async worker pool

??‍?? worker-pool Go simple async worker pool. ?? ABOUT Worker pool is a software design pattern for achieving concurrency of task execution. Maintain

Rafał Lorenz 82 Feb 21, 2022
Worker-Pool written in GO

go-workerpool Worker-Pool written in GO Installation go get github.com/agungsid/go-workerpool Usage package main type SampleSeeder struct{} func (s

Agung Kurniawan 2 Jun 10, 2022
Deadly simple worker pool

go-worker-pool Deadly simple worker pool Usage package main import ( "errors" workerpool "github.com/zelenin/go-worker-pool" "log" "time" ) func

Aleksandr Zelenin 1 Dec 10, 2021
Go-async - Worker pool (fan-in/fan-out)

Worker pool (fan-in/fan-out) func main() { pool := worker.NewPool(2) ctx := co

Ilya Tsyganov 0 Jan 3, 2022
Goworkers - Zero dependency Golang worker pool

Golang Worker Pool Zero dependency golang goroutines pool library. It is useful

Madhav Bhargava 0 Apr 28, 2022
Go-miningcore-pool - Miningcore Pool written in GOlang

Go-Miningcore-Pool (COMING SOON) Miningcore Pool written in GOlang 0x01 Configur

miningcore.com 2 Apr 24, 2022
Go-ldap-pool - A simple connection pool for go-ldap

Basic connection pool for go-ldap This little library use the go-ldap library an

Vincent 2 May 9, 2022
🐝 A Highly Performant and easy to use goroutine pool for Go

gohive Package gohive implements a simple and easy to use goroutine pool for Go Features Pool can be created with a specific size as per the requireme

Lovelesh 39 Jul 23, 2022
Simple in-memory job queue for Golang using worker-based dispatching

artifex Simple in-memory job queue for Golang using worker-based dispatching Documentation here: https://godoc.org/github.com/mborders/artifex Cron jo

Matthew Borders 134 Aug 3, 2022
goworker is a Go-based background worker that runs 10 to 100,000* times faster than Ruby-based workers.

goworker goworker is a Resque-compatible, Go-based background worker. It allows you to push jobs into a queue using an expressive language like Ruby w

Benjamin Manns 2.7k Aug 3, 2022
errgroup with goroutine worker limits

neilotoole/errgroup neilotoole/errgroup is a drop-in alternative to Go's wonderful sync/errgroup but limited to N goroutines. This is useful for inter

Neil O'Toole 130 Jul 29, 2022
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

Bo-Yi Wu 214 Aug 6, 2022
Queue is a Golang library for spawning and managing a Goroutine pool

Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.

golang-queue 211 Aug 2, 2022
🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go, inspired by fasthttp./ ants 是一个高性能且低损耗的 goroutine 池。

A goroutine pool for Go English | ???? 中文 ?? Introduction Library ants implements a goroutine pool with fixed capacity, managing and recycling a massi

Andy Pan 8.9k Aug 7, 2022
:speedboat: a limited consumer goroutine or unlimited goroutine pool for easier goroutine handling and cancellation

Package pool Package pool implements a limited consumer goroutine or unlimited goroutine pool for easier goroutine handling and cancellation. Features

Go Playgound 688 Jul 30, 2022