A library to help you create pipelines in Golang

Overview

pipeline

GitHub Workflow Status codecov GoDoc Go Report Card

Pipeline is a go library that helps you build pipelines without worrying about channel management and concurrency. It contains common fan-in and fan-out operations as well as useful utility funcs for batch processing and scaling.

If you have another common use case you would like to see covered by this package, please open a feature request.

Functions

func Cancel

func Cancel(ctx context.Context, cancel func(interface{}, error), in <-chan interface{}) <-chan interface{}

Cancel passes an interface{} from the in <-chan interface{} directly to the out <-chan interface{} until the Context is canceled. After the context is canceled, everything from in <-chan interface{} is sent to the cancel func instead with the ctx.Err().

package main

import (
	"context"
	"github.com/deliveryhero/pipeline"
	"log"
	"time"
)

func main() {
	// Create a context that lasts for 1 second
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	// Create a basic pipeline that emits one int every 250ms
	p := pipeline.Delay(ctx, time.Second/4,
		pipeline.Emit(1, 2, 3, 4, 5),
	)

	// If the context is canceled, pass the ints to the cancel func for teardown
	p = pipeline.Cancel(ctx, func(i interface{}, err error) {
		log.Printf("%+v could not be processed, %s", i, err)
	}, p)

	// Otherwise, process the inputs
	for out := range p {
		log.Printf("process: %+v", out)
	}

	// Output
	// process: 1
	// process: 2
	// process: 3
	// process: 4
	// 5 could not be processed, context deadline exceeded
}

func Collect

func Collect(ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan interface{}) <-chan interface{}

Collect collects interface{}s from its in channel and returns []interface{} from its out channel. It will collect up to maxSize inputs from the in <-chan interface{} over up to maxDuration before returning them as []interface{}. That means when maxSize is reached before maxDuration, [maxSize]interface{} will be passed to the out channel. But if maxDuration is reached before maxSize inputs are collected, [< maxSize]interface{} will be passed to the out channel. When the context is canceled, everything in the buffer will be flushed to the out channel.

func Delay

func Delay(ctx context.Context, duration time.Duration, in <-chan interface{}) <-chan interface{}

Delay delays reading each input by duration. If the context is canceled, the delay will not be applied.

func Emit

func Emit(is ...interface{}) <-chan interface{}

Emit fans is ...interface{}`` out to a <-chan interface{}`

func Merge

func Merge(ins ...<-chan interface{}) <-chan interface{}

Merge fans multiple channels in to a single channel

package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"

	"github.com/deliveryhero/pipeline"
	"github.com/deliveryhero/pipeline/example/db"
)

// SearchResults returns many types of search results at once
type SearchResults struct {
	Advertisements []db.Result `json:"advertisements"`
	Images         []db.Result `json:"images"`
	Products       []db.Result `json:"products"`
	Websites       []db.Result `json:"websites"`
}

func main() {
	r := http.NewServeMux()

	// `GET /search?q=<query>` is an endpoint that merges concurrently fetched
	// search results into a single search response using `pipeline.Merge`
	r.HandleFunc("/search", func(w http.ResponseWriter, r *http.Request) {
		query := r.URL.Query().Get("q")
		if len(query) < 1 {
			w.WriteHeader(http.StatusBadRequest)
			return
		}

		// If the request times out, or we receive an error from our `db`
		// the context will stop all pending db queries for this request
		ctx, cancel := context.WithCancel(r.Context())
		defer cancel()

		// Fetch all of the different search results concurrently
		var results SearchResults
		for err := range pipeline.Merge(
			db.GetAdvertisements(ctx, query, &results.Advertisements),
			db.GetImages(ctx, query, &results.Images),
			db.GetProducts(ctx, query, &results.Products),
			db.GetWebsites(ctx, query, &results.Websites),
		) {
			// Stop all pending db requests if theres an error
			if err != nil {
				log.Print(err)
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
		}

		// Return the search results
		if bs, err := json.Marshal(&results); err != nil {
			log.Print(err)
			w.WriteHeader(http.StatusInternalServerError)
		} else if _, err := w.Write(bs); err != nil {
			log.Print(err)
			w.WriteHeader(http.StatusInternalServerError)
		} else {
			w.WriteHeader(http.StatusOK)
		}
	})
}

func Process

func Process(ctx context.Context, processor Processor, in <-chan interface{}) <-chan interface{}

Process takes each input from the in <-chan interface{} and calls Processor.Process on it. When Processor.Process returns an interface{}, it will be sent to the output <-chan interface{}. If Processor.Process returns an error, Processor.Cancel will be called with the corresponding input and error message. Finally, if the Context is canceled, all inputs remaining in the in <-chan interface{} will go directly to Processor.Cancel.

package main

import (
	"context"
	"github.com/deliveryhero/pipeline"
	"github.com/deliveryhero/pipeline/example/processors"
	"log"
	"time"
)

func main() {
	// Create a context that times out after 5 seconds
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// Create a pipeline that emits 1-6 at a rate of one int per second
	p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))

	// Use the Multiplier to multiply each int by 10
	p = pipeline.Process(ctx, &processors.Multiplier{
		Factor: 10,
	}, p)

	// Finally, lets print the results and see what happened
	for result := range p {
		log.Printf("result: %d\n", result)
	}

	// Output
	// result: 10
	// result: 20
	// result: 30
	// result: 40
	// result: 50
	// error: could not multiply 6, context deadline exceeded
}

func ProcessBatch

func ProcessBatch( ctx context.Context, maxSize int, maxDuration time.Duration, processor Processor, in <-chan interface{}, ) <-chan interface{}

ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of interface{}s. It passed an []interface{} to the Processor.Process method and expects a []interface{} back. It passes []interface{} batches of inputs to the Processor.Cancel method. If the receiver is backed up, ProcessBatch can holds up to 2x maxSize.

package main

import (
	"context"
	"github.com/deliveryhero/pipeline"
	"github.com/deliveryhero/pipeline/example/processors"
	"log"
	"time"
)

func main() {
	// Create a context that times out after 5 seconds
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// Create a pipeline that emits 1-6 at a rate of one int per second
	p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))

	// Use the BatchMultiplier to multiply 2 adjacent numbers together
	p = pipeline.ProcessBatch(ctx, 2, time.Minute, &processors.BatchMultiplier{}, p)

	// Finally, lets print the results and see what happened
	for result := range p {
		log.Printf("result: %d\n", result)
	}

	// Output
	// result: 2
	// result: 12
	// error: could not multiply [5], context deadline exceeded
	// error: could not multiply [6], context deadline exceeded
}

func ProcessBatchConcurrently

func ProcessBatchConcurrently( ctx context.Context, concurrently, maxSize int, maxDuration time.Duration, processor Processor, in <-chan interface{}, ) <-chan interface{}

ProcessBatchConcurrently fans the in channel out to multiple batch Processors running concurrently, then it fans the out channels of the batch Processors back into a single out chan

package main

import (
	"context"
	"github.com/deliveryhero/pipeline"
	"github.com/deliveryhero/pipeline/example/processors"
	"log"
	"time"
)

func main() {
	// Create a context that times out after 5 seconds
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// Create a pipeline that emits 1-9
	p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9)

	// Wait 4 seconds to pass 2 numbers through the pipe
	// * 2 concurrent Processors
	p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, &processors.Waiter{
		Duration: 4 * time.Second,
	}, p)

	// Finally, lets print the results and see what happened
	for result := range p {
		log.Printf("result: %d\n", result)
	}

	// Output
	// result: 3
	// result: 4
	// result: 1
	// result: 2
	// error: could not process [5 6], process was canceled
	// error: could not process [7 8], process was canceled
	// error: could not process [9], context deadline exceeded
}

func ProcessConcurrently

func ProcessConcurrently(ctx context.Context, concurrently int, p Processor, in <-chan interface{}) <-chan interface{}

ProcessConcurrently fans the in channel out to multiple Processors running concurrently, then it fans the out channels of the Processors back into a single out chan

package main

import (
	"context"
	"github.com/deliveryhero/pipeline"
	"github.com/deliveryhero/pipeline/example/processors"
	"log"
	"time"
)

func main() {
	// Create a context that times out after 5 seconds
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// Create a pipeline that emits 1-7
	p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7)

	// Wait 2 seconds to pass each number through the pipe
	// * 2 concurrent Processors
	p = pipeline.ProcessConcurrently(ctx, 2, &processors.Waiter{
		Duration: 2 * time.Second,
	}, p)

	// Finally, lets print the results and see what happened
	for result := range p {
		log.Printf("result: %d\n", result)
	}

	// Output
	// result: 2
	// result: 1
	// result: 4
	// result: 3
	// error: could not process 6, process was canceled
	// error: could not process 5, process was canceled
	// error: could not process 7, context deadline exceeded
}

func Split

func Split(in <-chan interface{}) <-chan interface{}

Split takes an interface from Collect and splits it back out into individual elements Useful for batch processing pipelines (input chan -> Collect -> Process -> Split -> Cancel -> output chan).

Types

type Processor

type Processor interface { ... }

Processor represents a blocking operation in a pipeline. Implementing Processor will allow you to add business logic to your pipelines without directly managing channels. This simplifies your unit tests and eliminates channel management related bugs.


Readme created from Go doc with goreadme

You might also like...
Realize is the #1 Golang Task Runner which enhance your workflow by automating the most common tasks and using the best performing Golang live reloading.
Realize is the #1 Golang Task Runner which enhance your workflow by automating the most common tasks and using the best performing Golang live reloading.

#1 Golang live reload and task runner Content - ⭐️ Top Features - 💃🏻 Get started - 📄 Config sample - 📚 Commands List - 🛠 Support and Suggestions

A standard library for microservices.

Go kit Go kit is a programming toolkit for building microservices (or elegant monoliths) in Go. We solve common problems in distributed systems and ap

Netflix's Hystrix latency and fault tolerance library, for Go

hystrix-go Hystrix is a great project from Netflix. Hystrix is a latency and fault tolerance library designed to isolate points of access to remote sy

Canonicity testing library

What are canonical tests? That's when instead of comparing the expected and actual values in code: assert.Equal(t, expected, actual) You instead asser

String Service. Microservice example using gokit library

Example of Microservices using go-kit Go kit is a collection of Go (golang) packages (libraries) that help you build robust, reliable, maintainable mi

Go-fastapi: a library to quickly build APIs. It is inspired by Python's popular FastAPI
Go-fastapi: a library to quickly build APIs. It is inspired by Python's popular FastAPI

go-fastapi go-fastapi is a library to quickly build APIs. It is inspired by Pyth

A library to generate go models from given json files
A library to generate go models from given json files

generate A library to generate go models from given json files Requirements Go 1

Go Micro: a standard library for distributed systems development

Go Micro Go Micro is a standard library for distributed systems development. Ove

A Go library for building mongo queries with factory functions

FET Build query dynamically A Go library for building mongo queries with factory functions. What is FET? you can build queries with factory functions.

Comments
  • Error Handling Example

    Error Handling Example

    Is your feature request related to a problem? Please describe. I'm trying to use this wonderful library in one of my project, but I'm not sure how to handle processor errors correctly. At the moment if an errors occurs the processor.Cancel method is called, but how is it intended by the library to actually handle these errors by the caller?

    Describe the solution you'd like There should be a simple way to return an error channel from the processor.

    Describe alternatives you've considered Alternatively we have to pass an error channel to all the processor we use in the pipeline and send an error to that channel in the process.Cancel method.

    Additional context At the moment I've created a special Result type to be used by all the processors. It has an Err and a Value fields. But the code is very verbose:

    type Result struct {
        Err error
        Value interface{}
    }
    
    func (p *SomeProcessor) Process(ctx context.Context, ins interface{}) (interface{}, error) {
    	insSlice, err := toSliceOfInterfaces(ins)
    	if err != nil {
    		return nil, merry.Wrap(ErrCannotProcessDepositTxInput, merry.WithCause(err))
    	}
    
    	result := make([]interface{}, 0, len(insSlice))
    
    	for _, in := range insSlice {
    		res, err := ToResult(in)
    		if err != nil {
    			result = append(result, Result{
    				Err:   errors.Wrap(ErrCannotConvertInterfaceToResult),
    				Value: nil,
    			})
    
    			continue
    		}
    
    		foo, err := foo(res.Value)
    		if err != nil {
    			result = append(result, Result{
    				Err:   errors.Wrap(ErrCannotRunFoo),
    				Value: nil,
    			})
    
    			continue
    		}
    
    		bar, err := bar(ctx, foo)
    		if err != nil {
    			result = append(result, Result{
    				Err:   errors.Wrap(ErrCannotRunBar),
    				Value: nil,
    			})
    
    			continue
    		}
    
    		result = append(result, Result{Err: nil, Value: bar})
    	}
    
    	return result, nil
    }
    

    Any help or advice is appreciated.

    duplicate ⭐️ enhancement help wanted 
    opened by screwyprof 2
  • ⭐️ How to make sure that all the processors finished their job?

    ⭐️ How to make sure that all the processors finished their job?

    Is your feature request related to a problem? Please describe. At the moment when we run a pipeline there is no way to check if the spawned goroutines have done their job or not.

    Describe the solution you'd like It would be great if had something like a wait group inside the pipeline to be able to wait until all the processors are done.

    Additional context`

    I wanted to use the library to implement the following pipeline.

    func RunPipeline() (txinput.TxInput, error) {
             // create stakeDepositsNum of stakeDepositMessagesRequests.
    	stakeDepositsNum := dp.Amount / uint64(s.maxStakedBalance)
    	stakeDepositMessageRequest := s.createStakeDepositMessageRequest(authInfo, stake.ID, dp)
    	stakeDepositMessagesRequests := s.createStakeDepositMessageRequests(stakeDepositMessageRequest, stakeDepositsNum)
    
    	// 1. Create a pipeline that yields req.StakeDepositMessage stakeDepositsNum times.
    	// 2. Process req.StakeDepositMessages and turn them into DepositMessageResult.
    	// 3. Process DepositMessageResult and turn them into DepositTxInputResult.
    	p := pipeline.Emit(stakeDepositMessagesRequests...)
    	p = pipeline.ProcessConcurrently(ctx, s.numCPU, s.depositMessageProcessor, p)
    	p = pipeline.ProcessConcurrently(ctx, s.numCPU, s.depositTxInputProcessor, p)
    
    	// allocate results
    	txInputs := make([]txinput.TxInput, stakeDepositsNum)
    	
    	// if range over p at this time I have no guarantee, that the pipeline goroutines have done their jobs.
            // there should be a way to wait for the pipeline to finish.
           for v := range p {
              // collect results  
           } 
    
           return txInputs, nil
    }
    

    In the meantime I have to use errgroup to do the synchronisation.

    func RunPipeline() (txinput.TxInput, error) {
    	// allocate results
    	txInputs := make([]txinput.TxInput, stakeDepositsNum)
    
    	// init channels and an error group to sync goroutines
    	depositMessageResultCh := make(chan processor.DepositMessageResult)
    	depositTxInputResultCh := make(chan processor.DepositTxInputResult)
    
    	g, gCtx := errgroup.WithContext(ctx)
    
    	// 1. Create a pipeline that yields req.StakeDepositMessage stakeDepositsNum times.
    	pipe := s.repeatFnTimes(gCtx, stakeDepositsNum, s.createStakeDepositMessageRequestFn(authInfo, stake.ID, dp))
    
    	// 2. Process req.StakeDepositMessages and turn them into DepositMessageResult.
    	g.Go(s.processDepositMessageRequests(gCtx, pipe, depositMessageResultCh))
    
    	// 3. Process DepositMessageResult and turn them into DepositTxInputResult.
    	g.Go(s.processDepositMessageResult(gCtx, depositMessageResultCh, depositTxInputResultCh))
    
    	// 4. Collect TxInputs.
    	g.Go(s.collect(gCtx, depositTxInputResultCh, txInputs))
    
    	// wait for goroutines to finish.
    	if err := g.Wait(); err != nil {
    		return nil, merry.Wrap(ErrCannotRunStakingPipeline, merry.WithCause(err))
    	}
    
            return txInputs, nil
    }
    

    Any advice is really appreciated.

    documentation ⭐️ enhancement 
    opened by screwyprof 1
Releases(v2.1.1)
Owner
Delivery Hero SE
Delivery Hero SE
Poc-krakend: Allows you to create, modify and delete enpoints in "configuration.json" without restart the application.

poc-krakend Description This POC is for test dynamic (Hot reload) routes in krakend. Allows you to create, modify and delete enpoints in "configuratio

Arturo Elias 4 Jan 26, 2022
A microservice gateway developed based on golang.With a variety of plug-ins which can be expanded by itself, plug and play. what's more,it can quickly help enterprises manage API services and improve the stability and security of API services.

Goku API gateway is a microservice gateway developed based on golang. It can achieve the purposes of high-performance HTTP API forwarding, multi tenant management, API access control, etc. it has a powerful custom plug-in system, which can be expanded by itself, and can quickly help enterprises manage API services and improve the stability and security of API services.

Eolink 378 Dec 29, 2022
Create production ready microservices mono repo pattern wired with Neo4j. Microservices for other languages and front end repos to be added as well in future.

Create Microservices MonoRepo in GO/Python Create a new production-ready project with backend (Golang), (Python) by running one CLI command. Focus on

GoChronicles 14 Oct 26, 2022
Demo for my talk at ArgoCon '21 showing how to use Go to create and submit dynamic Argo Workflows.

argocon-21-demo Demo for my talk at ArgoCon '21 showing how to use Go to create and submit dynamic Argo Workflows. This repo implements a Go-based CLI

null 5 Oct 12, 2022
The service processes HTTP requests to create, get, update some data

portal This generated README.md file loosely follows a popular template. One paragraph of project description goes here. Getting Started These instruc

Hamlet Avetikyan 0 Dec 24, 2021
Omh-users-management - A go microservice that can enables us to create, modify, fetch, and delete users

Users Management System A go microservice that can enables us to create, modify, fetch, and delete users Usage To Run the application $ go run main.

Pulkit Tanwar 0 Jan 20, 2022
micro-draft-manager is a microservice that helps you to manage unstructured data in your application with sorting and full-text search

micro-draft-manager is a microservice that helps you to manage unstructured data in your application with sorting and full-text search. For example, y

Hamed Abdollahpour 1 Nov 24, 2021
This project extends the go-chi router to support OpenAPI 3, bringing to you a simple interface to build a router conforming your API contract.

Go OpenAPI This project extends the go-chi router to support OpenAPI 3, bringing to you a simple interface to build a router conforming your API contr

Tiago Angelo 2 Mar 27, 2022
Just a quick demo of how you can use automatically generated protobuffer and gRPC code from buf.build

buf.build demo The purpose of this repository is to demonstrate how to use the services offered by buf.build for hosting protobuffer definitions and a

Bjørn Borud 0 Jan 4, 2022
Make Endpoint API using Golang, Mux Library, MySQL, and Redis

EndPoint API (Create and Read) I'm Using Golang, Mux Library, MySQL, and Redis Golang Documentation : https://go.dev/doc/ Golang Instalation : https:/

Fulgentius Jasper 0 Dec 12, 2021