Efficient and reliable background processing for Go

Overview

CurlyQ

GoDoc Build Status GoCover Go Report Card License

CurlyQ provides a simple, easy-to-use interface for performing background processing in Go. It supports scheduled jobs, job deduplication, and configurable concurrent execution out of the box.

Quickstart

package main

import (
	"context"
	"log"

	cq "github.com/mcmathja/curlyq"
)

func main() {
	// Create a new producer
	producer := cq.NewProducer(&cq.ProducerOpts{
		Address: "localhost:6379",
		Queue: "testq",
	})

	// Use the producer to push a job to the queue
	producer.Perform(cq.Job{
		Data: []byte("Some data!"),
	})

	// Create a new consumer
	consumer := cq.NewConsumer(&cq.ConsumerOpts{
		Address: "localhost:6379",
		Queue: "testq",
	})

	// Consume jobs from the queue with the consumer
	consumer.Consume(func(ctx context.Context, job cq.Job) error {
		log.Println(string(job.Data))
		return nil
	})
}

The Basics

CurlyQ exposes three key types: Jobs, Producers, and Consumers.

Jobs

A Job wraps your data. In most cases, that's all you'll ever need to know about it:

job := cq.Job{
	Data: []byte("Some data."),
}

Every Job also exposes an ID field that uniquely identifies it among all jobs in the queue, and an Attempt field representing how many times it has been attempted so far.

Producers

A Producer pushes jobs on to the queue. Create one by providing it with the address of your Redis instance and a queue name:

producer := cq.NewProducer(&cq.ProducerOpts{
	Address: "my.redis.addr:6379",
	Queue: "queue_name",
})

You can also provide an existing go-redis instance if you would like to configure the queue to run on a more advanced Redis configuration or set up your own retry and timeout logic for network calls:

import "github.com/go-redis/redis/v7"

client := redis.NewClient(&redis.Options{
	Password: "[email protected]",
	DB: 3,
	MaxRetries: 2,
})

producer := cq.NewProducer(&cq.ProducerOpts{
	Client: client,
	Queue: "queue_name",
})

Running producer.Perform(job) will add a job to the queue to be run asynchronously. You can also schedule a job to be enqueued at a particular time by running producer.PerformAt(time, job), or after a certain wait period by running producer.PerformAfter(duration, job). All of the Perform methods return the ID assigned to the job and an error if one occurred.

You can deduplicate jobs by pre-assigning them IDs:

job := cq.Job{
	ID: "todays_job",
}

// Enqueue the job
producer.PerformAfter(10 * time.Second, job)

// Does nothing, because a job with the same ID is already on the queue
producer.Perform(job)

Once a job has been acknowledged, its ID becomes available for reuse.

See the documentation for ProducerOpts for more details about available configuration options.

Consumers

A Consumer pulls jobs off the queue and executes them using a provided handler function. Create one with the same basic options as a Producer:

consumer := cq.NewConsumer(&cq.ConsumerOpts{
	Queue: "queue_name",

	// With an address:
	Address: "my.redis.addr:6379",
	// With a preconfigured go-redis client:
	Client: redisClient,
})

You start a consumer by running its Consume method with a handler function:

consumer.Consume(func(ctx context.Context, job cq.Job) error {
	log.Println("Job %s has been processed!")
	return nil
})

If the provided handler function returns nil, the job is considered to have been processed successfully and is removed from the queue. If the handler returns an error or panics, the job is considered to have failed and will be retried or killed based on how many times it has been attempted.

Consume will continue to process jobs until your application receives an interrupt signal or the consumer encounters a fatal error. Fatal errors only occur when the consumer is unable to communicate with Redis for an essential operation, such as updating the status of a job in flight.

See the documentation for ConsumerOpts for more details about available configuration options.

Issues
  • `go test -v` fails on Windows. Unix-only syscalls on consumer_test.go

    `go test -v` fails on Windows. Unix-only syscalls on consumer_test.go

    I have to say that I am really impressed, good job @mcmathja. The package is well-written with documentation and easy-to-understand code flow. You use the Go Best Practices. I am feeling that you will shine among go community, keep up the great work. You've got my star :)


    Now, on consumer_test.go you use syscalls that are only available on unix systems and not windows, you have two options:

    1. Add // +build !windows to ignore building on Windows on consumer_test.go (not recommended)
    2. Replace all syscall.Kill and remove all syscall.SIGUSR1 from your tests code.

    Here is the output I got when running go test -v on my windows 10 machine.

    C:\mygopath\src\github.com\mcmathja\curlyq>go test -v
    # github.com/mcmathja/curlyq [github.com/mcmathja/curlyq.test]
    .\consumer_test.go:404:42: undefined: syscall.SIGUSR1
    .\consumer_test.go:421:5: undefined: syscall.Kill
    .\consumer_test.go:421:36: undefined: syscall.SIGUSR1
    .\consumer_test.go:437:5: undefined: syscall.Kill
    .\consumer_test.go:437:36: undefined: syscall.SIGUSR1
    

    Thanks, Gerasimos Maropoulos. Author of iris

    bug 
    opened by kataras 4
  • Minor fixes

    Minor fixes

    This PR includes minor fixes, read the commit messages for more.

    opened by kataras 1
  • Use cross-platform signal in tests

    Use cross-platform signal in tests

    Fixes https://github.com/mcmathja/curlyq/issues/1

    This modifies the test to use syscall.SIGALRM as its test signal instead of syscall.SIGUSR1. The latter is only available on unix systems, while the former is available on Windows as well. It changes the tests to use process.Signal instead of process.Kill for similar reasons.

    opened by mcmathja 0
  • Don't reorder queue when polling

    Don't reorder queue when polling

    Currently, we run BRPopLPush against the active jobs list to provide real-time notifications to consumers when new work hits the queue. Unfortunately, this has the side-effect of reordering the queue, so that the last item is moved to the front. In certain circumstances this could prevent jobs in the front of the queue from being executed in a timely fashion - e.g., if there was only one consumer, and for each job it processed it enqueued a new job on the same queue. CurlyQ doesn't provide any explicit guarantees about execution order because multiple jobs may be run concurrently, but I think it's reasonable to guarantee queue order to avoid scenarios like the above.

    The more common pattern is to use BRPopLPush to shift items one-at-a-time off of a queue and on to an inflight list. This is difficult to implement in our case, because we want to support batch job polling and constant time acks. We could shift one item onto an inflight list when polling, then combine this onto a set with other items during getJobs, but that increases complexity on the hot path and generates more keys to keep track of when cleaning up after dead consumers.

    This PR takes a different approach. We keep a separate signal list and block on that instead. If the list is empty when a job is added to the queue, we add a throwaway item to that list. This item gets recycled with each call to BRPopLPush, unblocking all consumers in the process. Then, the first time a consumer detects that there are no more jobs to process during polling, it just removes the throwaway item from the signal list, causing all future checks against the signal list to block.

    opened by mcmathja 0
  • Notify of duplicates with an error

    Notify of duplicates with an error

    This modifies the behavior of the Perform methods to return an error when they try to enqueue or schedule a duplicate job. Previously, no indication that the job already exists was provided.

    In cases where a duplicate is expected, this allows the client to act on that information, e.g. to proactively stop additional attempts to enqueue the same job. In cases where it's not expected, the error would serve as an important indication of a problem with how IDs are being assigned client-side.

    _, err := producer.Perform(job)
    if errors.As(err, &cq.ErrJobAlreadyExists{}) {
    	// Handle duplicate
    }
    
    opened by mcmathja 0
  • Upgrade go-redis to v7

    Upgrade go-redis to v7

    This updates go-redis to v7, enabling some important features that should be available before stabilizing the API (such as compatibility with sentinel authentication).

    One major change in v7 is that the go-redis client now correctly respects context cancellations, preventing future requests and quitting those that are inflight. Because we need the client to stay alive until cleanup is finished, we need to provide it with a separate context from the main consumer context that isn't cancelled until cleanup is finished. This change allows us to shut down the client when we don't finish in the specified grace period, preventing unexpected changes to the queue after a forced shutdown.

    opened by mcmathja 0
Releases(v1.0.0-rc2)
  • v1.0.0-rc2(Apr 30, 2020)

    This is a release candidate for v1.0.0.

    This release includes an upgrade to the internal go-redis version, allowing CurlyQ to accept go-redis clients with more advanced configuration options.

    Source code(tar.gz)
    Source code(zip)
  • v1.0.0-rc1(Apr 22, 2020)

    This is a release candidate for v1.0.0.

    This release introduces a guarantee that jobs added to the queue will always be consumed in FIFO order. Previously, the mechanism used to detect the presence of new jobs on the queue had the side-effect of rotating jobs from the back of the queue to the front. While unlikely, it was possible for this procedure to repeatedly push the same jobs back from the front of the queue, preventing them from being consumed indefinitely. The logic for detecting new jobs has been modified so that it no longer operates against the queue, eliminating this issue. (Please note that processing order is still not guaranteed, as jobs are executed concurrently once consumed.)

    Other changes:

    • ErrJobAlreadyExists is now returned when trying to enqueue or schedule a job that is already present in Redis. Previously, no error was returned. This API change allows clients to detect if duplicate jobs have been scheduled and handle the situation as needed.
    • The dependency list was pruned of unneeded libraries.
    Source code(tar.gz)
    Source code(zip)
  • v0.4.0(Feb 10, 2020)

    This release integrates an exponential backoff procedure into all of the consumer's internal processing loops so that they will not attempt to poll Redis in tight loops if they encounter network failures. It also exposes the ability to automatically trigger a critical error and abort after a maximum number of such failures. The default behavior is still for the processing loops to attempt to reconnect indefinitely.

    Other changes:

    • The maximum backoff between job retries is now configurable. The default parameters for retries have been modified so that jobs will now be retried for roughly 2 weeks before being killed.
    • DefaultLogger is now less verbose by default. A LoudLogger type has been added for easy debugging.
    • A race condition was fixed where consumers might fail to register themselves before retrieving jobs in cases of intermittent network failure.
    • Various performance enhancements and increased test coverage.

    This is expected to be the final feature release before v1. Future pre-releases will focus on bug fixes, usability enhancements, and documentation improvements.

    Source code(tar.gz)
    Source code(zip)
  • v0.3.0(Feb 9, 2020)

    This release adds logic to listen on changes to data stored in Redis so that new jobs can be detected and processed as soon as they are added to the queue. It also addresses several minor issues with the tests and refactors them so that they are easier to follow, particularly when testing concurrent behavior.

    Source code(tar.gz)
    Source code(zip)
  • v0.2.0(Jan 26, 2020)

    This release implements logic to address failure scenarios related to transient network or Redis errors. Previously, if a consumer failed to advance a job to its next state but then successfully registered a heartbeart, it was possible for it to retain ownership of that job indefinitely. Now, if a consumer encounters an error when trying to retry, kill, or acknowledge a job, it will immediately abort in order to ensure the job can be transferred to another consumer in a timely fashion.

    This release also exposes a configurable logger interface and fixes some concurrency-related bugs.

    Source code(tar.gz)
    Source code(zip)
  • v0.1.0(Jan 24, 2020)

Owner
James McMath
James McMath
Efficient and reliable background processing for Go

CurlyQ CurlyQ provides a simple, easy-to-use interface for performing background processing in Go. It supports scheduled jobs, job deduplication, and

James McMath 112 Apr 25, 2021
Simple, efficient background processing for Golang backed by RabbitMQ and Redis

Table of Contents How to Use Motivation Requirements Features Examples Setup Config Client/Server Task Worker/Task Hander Register The Handlers Send t

Hasan 28 May 14, 2021
Simple job queues for Go backed by Redis

bokchoy Introduction Bokchoy is a simple Go library for queueing tasks and processing them in the background with workers. It should be integrated in

Florent Messa 237 Jul 14, 2021
A persistent and flexible background jobs library for go.

Jobs Development Status Jobs is no longer being actively developed. I will still try my best to respond to issues and pull requests, but in general yo

Alex Browne 482 May 31, 2021
Machinery is an asynchronous task queue/job queue based on distributed message passing.

Machinery Machinery is an asynchronous task queue/job queue based on distributed message passing. V2 Experiment First Steps Configuration Lock Broker

Richard Knop 5.4k Jul 20, 2021
Framework for performing work asynchronously, outside of the request flow

JobRunner JobRunner is framework for performing work asynchronously, outside of the request flow. It comes with cron to schedule and queue job functio

Bam Azizi 861 Jul 27, 2021
a cron library for go

cron Cron V3 has been released! To download the specific tagged release, run: go get github.com/robfig/cron/[email protected] Import it in your program as: im

Rob Figueiredo 8.2k Jul 21, 2021
YTask is an asynchronous task queue for handling distributed jobs in golang

YTask is an asynchronous task queue for handling distributed jobs in golang

gojuukaze 154 Jul 22, 2021
A simple Cron library for go that can execute closures or functions at varying intervals, from once a second to once a year on a specific date and time. Primarily for web applications and long running daemons.

Cron.go This is a simple library to handle scheduled tasks. Tasks can be run in a minimum delay of once a second--for which Cron isn't actually design

Robert K 202 Jun 12, 2021
A lightweight job scheduler based on priority queue with timeout, retry, replica, context cancellation and easy semantics for job chaining. Build for golang web apps.

Table of Contents Introduction What is RIO? Concern An asynchronous job processor Easy management of these goroutines and chaining them Introduction W

Supratim Samanta 36 Jul 7, 2021
You had one job, or more then one, which can be done in steps

Leprechaun Leprechaun is tool where you can schedule your recurring tasks to be performed over and over. In Leprechaun tasks are recipes, lets observe

Strahinja 83 Jul 10, 2021
A programmable, observable and distributed job orchestration system.

?? Overview Odin is a programmable, observable and distributed job orchestration system which allows for the scheduling, management and unattended bac

James McDermott 421 Jul 11, 2021
Job worker service that provides an API to run arbitrary Linux processes.

Job Scheduler Summary Prototype job worker service that provides an API to run arbitrary Linux processes. Overview Library The library (Worker) is a r

Renato GuimarĂ£es 5 Jun 2, 2021
Run Jobs on a schedule, supports fixed interval, timely, and cron-expression timers; Instrument your processes and expose metrics for each job.

A simple process manager that allows you to specify a Schedule that execute a Job based on a Timer. Schedule manage the state of this job allowing you to start/stop/restart in concurrent safe way. Schedule also instrument this Job and gather metrics and optionally expose them via uber-go/tally scope.

Sherif Abdel-Naby 53 Jul 2, 2021