Simple job queues for Go backed by Redis

Overview

bokchoy

Build Status GoDoc Go report

Introduction

Bokchoy is a simple Go library for queueing tasks and processing them in the background with workers. It should be integrated in your web stack easily and it's designed to have a low barrier entry for newcomers.

It currently only supports Redis (client, sentinel and cluster) with some Lua magic, but internally it relies on a generic broker implementation to extends it.

screen

Motivation

It's relatively easy to make a producer/receiver system in Go since the language contains builtins features to build it from scratch but we keep adding the same system everywhere instead of thinking reusable.

Bokchoy is a plug and play component, it does its job and it does it well for you that you can focus on your business logic.

Features

  • Lightweight
  • A Simple API close to net/http - if you already use net/http then you can learn it pretty quickly
  • Designed with a modular/composable APIs - middlewares, queue middlewares
  • Context control - built on context package, providing value chaining, cancelations and timeouts
  • Highly configurable - tons of options to swap internal parts (broker, logger, timeouts, etc), if you cannot customize something then an option is missing
  • Extensions - RPC server powered by gRPC, Sentry, etc.

Getting started

First, run a Redis server, of course:

redis-server

Define your producer which will send tasks:

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/thoas/bokchoy"
)

func main() {
	ctx := context.Background()

	// define the main engine which will manage queues
	engine, err := bokchoy.NewDefault(ctx, "redis://localhost:6379")
	if err != nil {
		log.Fatal(err)
	}

	payload := map[string]string{
		"data": "hello world",
	}

	task, err := engine.Queue("tasks.message").Publish(ctx, payload)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(task, "has been published")
}

See producer directory for more information and to run it.

Now we have a producer which can send tasks to our engine, we need a worker to process them in the background:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"

	"github.com/thoas/bokchoy"
)

func main() {
	ctx := context.Background()

	engine, err := bokchoy.NewDefault(ctx, "redis://localhost:6379")
	if err != nil {
		log.Fatal(err)
	}

	engine.Queue("tasks.message").HandleFunc(func(r *bokchoy.Request) error {
		fmt.Println("Receive request", r)
		fmt.Println("Payload:", r.Task.Payload)

		return nil
	})

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	go func() {
		for range c {
			log.Print("Received signal, gracefully stopping")
			engine.Stop(ctx)
		}
	}()

	engine.Run(ctx)
}

A worker is defined by handlers, to define a Handler you have to follow this interface:

type Handler interface {
	Handle(*Request) error
}

You can create your own struct which implements this interface or use the HandlerFunc to generate a Handler from your function.

See worker directory for more information and to run it.

If you want a complete application example, you can read A Tour of Bokchoy which explain how to use the main features of it.

Installation

Using Go Modules

go get github.com/thoas/bokchoy

Advanced topics

Delayed tasks

When publishing a task, it will be immediately processed by the worker if it's not already occupied, you may want to delay the task on some occasions by using bokchoy.WithCountdown option:

payload := map[string]string{
    "data": "hello world",
}

queue.Publish(ctx, payload, bokchoy.WithCountdown(5*time.Second))

This task will be executed in 5 seconds.

Priority tasks

A task can be published at front of others by providing a negative countdown.

payload := map[string]string{
    "data": "hello world",
}

queue.Publish(ctx, payload, bokchoy.WithCountdown(-1))

This task will be published and processed immediately.

Custom instantiation

bokchoy.NewDefault allows simple instantiation when you have a simple setup. You may want to have more control over your setup. This allows you to use a customer serializer, custom logger, etc. Use bokchoy.New to do this:

bokchoy.New(ctx, bokchoy.Config{
    Broker: bokchoy.BrokerConfig{
        Type: "redis",
        Redis: bokchoy.RedisConfig{
            Type: "client",
            Client: bokchoy.RedisClientConfig{
                Addr: "localhost:6379",
            },
        },
    },
})

Note: bokchoy.NewDefault uses redis.ParseURL internally so it can handle connection strings like redis://user:[email protected]:port/db without the need for bokchoy.New.

Custom serializer

By default the task serializer is JSON, you can customize it when initializing the Bokchoy engine, it must respect the Serializer interface.

bokchoy.New(ctx, bokchoy.Config{
    Broker: bokchoy.BrokerConfig{
        Type: "redis",
        Redis: bokchoy.RedisConfig{
            Type: "client",
            Client: bokchoy.RedisClientConfig{
                Addr: "localhost:6379",
            },
        },
    },
}, bokchoy.WithSerializer(MySerializer{}))

You will be capable to define a msgpack, yaml serializers if you want.

Custom logger

By default the internal logger is disabled, you can provide a more verbose logger with options:

import (
	"context"
	"fmt"
	"log"

	"github.com/thoas/bokchoy/logging"
)

func main() {
	logger, err := logging.NewDevelopmentLogger()
	if err != nil {
		log.Fatal(err)
	}

	defer logger.Sync()

    bokchoy.New(ctx, bokchoy.Config{
        Broker: bokchoy.BrokerConfig{
            Type: "redis",
            Redis: bokchoy.RedisConfig{
                Type: "client",
                Client: bokchoy.RedisClientConfig{
                    Addr: "localhost:6379",
                },
            },
        },
    }, bokchoy.WithLogger(logger))
}

The builtin logger is based on zap but you can provide your own implementation easily if you have a central component.

If you don't need that much information, you can enable the Logger middleware.

Worker Concurrency

By default the worker concurrency is set to 1, you can override it based on your server capability, Bokchoy will spawn multiple goroutines to handle your tasks.

engine.Queue("tasks.message").HandleFunc(func(r *bokchoy.Request) error {
    fmt.Println("Receive request", r)
    fmt.Println("Payload:", r.Task.Payload)

    return nil
}, bokchoy.WithConcurrency(5))

You can still set it globally with bokchoy.WithConcurrency option when initializing the engine.

Retries

If your task handler is returning an error, the task will be marked as failed and retried 3 times, based on intervals: 60 seconds, 120 seconds, 180 seconds.

You can customize this globally on the engine or when publishing a new task by using bokchoy.WithMaxRetries and bokchoy.WithRetryIntervals options.

bokchoy.WithMaxRetries(1)
bokchoy.WithRetryIntervals([]time.Duration{
	180 * time.Second,
})

Timeout

By default a task will be forced to timeout and marked as canceled if its running time exceed 180 seconds.

You can customize this globally or when publishing a new task by using bokchoy.WithTimeout option:

bokchoy.WithTimeout(5*time.Second)

The worker will regain control and process the next task but be careful, each task is running in a goroutine so you have to cancel your task at some point or it will be leaking.

Catch events

You can catch events by registering handlers on your queue when your tasks are starting, succeeding, completing or failing.

queue := engine.Queue("tasks.message")
queue.OnStartFunc(func(r *bokchoy.Request) error {
    // we update the context by adding a value
    *r = *r.WithContext(context.WithValue(r.Context(), "foo", "bar"))

    return nil
})

queue.OnCompleteFunc(func(r *bokchoy.Request) error {
    fmt.Println(r.Context().Value("foo"))

    return nil
})

queue.OnSuccessFunc(func(r *bokchoy.Request) error {
    fmt.Println(r.Context().Value("foo"))

    return nil
})

queue.OnFailureFunc(func(r *bokchoy.Request) error {
    fmt.Println(r.Context().Value("foo"))

    return nil
})

Store results

By default, if you don't mutate the task in the handler its result will be always nil.

You can store a result in your task to keep it for later, for example: you might need statistics from a twitter profile to save them later.

queue.HandleFunc(func(r *bokchoy.Request) error {
	r.Task.Result = map[string]string{"result": "wow!"}

	return nil
})

You can store anything as long as your serializer can serializes it.

Keep in mind the default task TTL is 180 seconds, you can override it with bokchoy.WithTTL option.

Helpers

Let's define our previous queue:

queue := engine.Queue("tasks.message")

Empty the queue

queue.Empty()

It will remove all waiting tasks from your queue.

Cancel a waiting task

We produce a task without running the worker:

payload := map[string]string{
    "data": "hello world",
}

task, err := queue.Publish(ctx, payload)
if err != nil {
    log.Fatal(err)
}

Then we can cancel it by using its ID:

queue.Cancel(ctx, task.ID)

Retrieve a published task from the queue

queue.Get(ctx, task.ID)

Retrieve statistics from a queue

stats, err := queue.Count(ctx)
if err != nil {
    log.Fatal(err)
}

fmt.Println("Number of waiting tasks:", stats.Direct)
fmt.Println("Number of delayed tasks:", stats.Delayed)
fmt.Println("Number of total tasks:", stats.Total)

Middleware handlers

Bokchoy comes equipped with an optional middleware package, providing a suite of standard middlewares. Middlewares have the same API as handlers. It's easy to implement them and think of them like net/http middlewares, they share the same purpose to follow the lifecycle of a Bokchoy request.

Core middlewares


bokchoy/middleware description
Logger Logs the start and end of each request with the elapsed processing time
Recoverer Gracefully absorb panics and prints the stack trace
RequestID Injects a request ID into the context of each request
Timeout Signals to the request context when the timeout deadline is reached

See middleware directory for more information.

FAQs

Are Task IDs unique?

Yes! There are based on ulid.

Is exactly-once execution of tasks guaranteed?

It's guaranteed by the underlying broker, it uses BRPOP/BLPOP from Redis.

If multiple clients are blocked for the same key, the first client to be served is the one that was waiting for more time (the first that blocked for the key).

Contributing

Don't hesitate ;)

Project history

Bokchoy is highly influenced by the great rq and celery.

Both are great projects well maintained but only used in a Python ecosystem.

Some parts (middlewares mostly) of Bokchoy are heavily inspired or taken from go-chi.

Comments
  • Add RedisClient option to BrokerConfig to use already initialized client

    Add RedisClient option to BrokerConfig to use already initialized client

    Adding RedisClient option to BrokerConfig so we can use an already initialized client.

    Actually, we have a central place in our app where we initialize redis client and we don't want to init another one. It would be great if we can re-use the existing client. Also the current code for redis.NewClient(), NewFailoverClient() etc. does not include all the redis options, e.g. TLSConfig, OnConnect.

    opened by roman-vynar 6
  • math.Rand doesn't support concurrency

    math.Rand doesn't support concurrency

    I am using the Publish method like this

    task, err := c.engine.Queue(queueName).Publish(context.Background(), entry)
    if err != nil {
    	c.errs <- err
    	return
    }
    

    but eventually, I always encounter this error

    panic: runtime error: index out of range [-2]
    
    goroutine 9634 [running]:
    math/rand.(*rngSource).Uint64(...)
            /usr/local/go/src/math/rand/rng.go:249
    math/rand.(*rngSource).Int63(0xc0000af500, 0x290b9eb2ba0ec300)
            /usr/local/go/src/math/rand/rng.go:234 +0x93
    math/rand.(*Rand).Int63(...)
            /usr/local/go/src/math/rand/rand.go:85
    math/rand.read(0xc0030a3116, 0xa, 0xa, 0xc0058956b0, 0xc000098d70, 0xc000098d78, 0x10, 0xc0030a3110, 0xc000161400)
            /usr/local/go/src/math/rand/rand.go:272 +0x64
    math/rand.(*Rand).Read(0xc000098d50, 0xc0030a3116, 0xa, 0xa, 0xc005895720, 0x100e258, 0x10)
            /usr/local/go/src/math/rand/rand.go:264 +0x106
    io.ReadAtLeast(0x143b6a0, 0xc000098d50, 0xc0030a3116, 0xa, 0xa, 0xa, 0x16b8620, 0x4b8f430, 0xed5645a4e)
            /usr/local/go/src/io/io.go:310 +0x87
    io.ReadFull(...)
            /usr/local/go/src/io/io.go:329
    github.com/oklog/ulid.New(0x16e7dd3e8ff, 0x143b6a0, 0xc000098d50, 0xc002ce5a40, 0xc00564c440, 0xc0030a30f0, 0xc0030a30e0)
            /Users/tsatke/go/pkg/mod/github.com/oklog/[email protected]/ulid.go:96 +0xc9
    github.com/oklog/ulid.MustNew(...)
            /Users/tsatke/go/pkg/mod/github.com/oklog/[email protected]/ulid.go:105
    github.com/thoas/bokchoy.id(0xc002ce5a40, 0x1000000010d75c7)
            /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/utils.go:16 +0x53
    github.com/thoas/bokchoy.NewTask(0x13c59ad, 0xd, 0x13596a0, 0xc0030d86c0, 0x0, 0x0, 0x0, 0x10)
            /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/task.go:51 +0x6d
    github.com/thoas/bokchoy.(*Queue).NewTask(0xc00012e000, 0x13596a0, 0xc0030d86c0, 0x0, 0x0, 0x0, 0x1)
            /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/queue.go:427 +0xd3
    github.com/thoas/bokchoy.(*Queue).Publish(0xc00012e000, 0x14411c0, 0xc00001a078, 0x13596a0, 0xc0030d86c0, 0x0, 0x0, 0x0, 0xc004c83f40, 0x0, ...)
            /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/queue.go:445 +0x67
    github.com/TimSatke/crawler/crawler.(*Crawler).Enqueue(0xc000098f30, 0xc002cecd80, 0x29)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:123 +0xd2
    github.com/TimSatke/crawler/crawler.(*Crawler).processReference(0xc000098f30, 0xc0030ca680)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:235 +0x49
    github.com/TimSatke/crawler/crawler.(*Crawler).processAnchorNode(0xc000098f30, 0xc003012c80, 0xc002072e70)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:230 +0x13b
    github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc002072e70)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:199 +0x9e
    github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc002072e00)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
    github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc002059030)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
    github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c16700)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
    github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c16690)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
    github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c16000)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
    github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c15a40)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
    github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c157a0)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
    github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc0065abea0)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
    github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc0065abdc0)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
    github.com/TimSatke/crawler/crawler.(*Crawler).processHTML(0xc000098f30, 0xc003012c80, 0x1b5fdd8, 0xc003e7ae70, 0xc003e7ae70, 0x0)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:193 +0x8f
    github.com/TimSatke/crawler/crawler.(*Crawler).process(0xc000098f30, 0xc007073080, 0x22, 0x0, 0x0)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:184 +0x29d
    github.com/TimSatke/crawler/crawler.(*Crawler).Worker(0xc000098f30, 0xc0005ebc80, 0x20, 0x20)
            /Users/tsatke/Desktop/crawler/crawler/crawler.go:149 +0xb8
    github.com/thoas/bokchoy.HandlerFunc.Handle(0xc000124040, 0xc0005ebc80, 0x100d9e6, 0xc0001f7e68)
            /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/handler.go:8 +0x30
    github.com/thoas/bokchoy.(*consumer).handleTask.func1.2(0xc0005ebc80, 0xc0001dc001, 0xc005227340)
            /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:73 +0x64
    github.com/thoas/bokchoy.HandlerFunc.Handle(0xc005227320, 0xc0005ebc80, 0xc0001f7ec0, 0x12df49e)
            /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/handler.go:8 +0x30
    github.com/thoas/bokchoy.(*consumer).handleRequest.func1(0xc0005ebc80, 0xc005227340, 0xc005227320)
            /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:162 +0x5c
    github.com/thoas/bokchoy.HandlerFunc.Handle(0xc005227340, 0xc0005ebc80, 0x1, 0x143bf60)
            /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/handler.go:8 +0x30
    github.com/thoas/bokchoy.(*consumer).handleTask.func1(0xc000b71c80, 0xc002484900, 0xc000130120, 0xc0005ebc80)
            /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:74 +0x1b8
    created by github.com/thoas/bokchoy.(*consumer).handleTask
            /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:56 +0x13c
    

    Why is that?

    bug 
    opened by tsatke 5
  • ids no longer unique

    ids no longer unique

    071d83d Introduced a change that makes ids predictable and no longer random. This needs to be fixed, either replace ulid by a global counter or generate a random entropy source.

    opened by tsatke 3
  • Task may stop retrying on a specific condition

    Task may stop retrying on a specific condition

    I have been implementing bokchoy actively to replace our existing queue library. There is an issue with tasks being retried and when there are connectivity issues.

    For example, I create a task which always fails and being retried every 5s. Then I have the network connectivity issues to Redis and do hard-stop of my application.

    When a task is retried there is a key in Redis delay containing task ID:

    Type: Sorted Set (1 Members)
    
    # | Score | Value
    -- | -- | --
    0 | 1566470199 | 01DJWE7F5WWCQ16BMYGHSK4CAX
    

    However, if it happens to be a coincidence of redis connectivity issues and I stop an app, delay key is removed and the task remains always in queue with Failed status, MaxRetries >=0, some ETA.

    To fix this we need whether to do some "transaction" on delay key so it is never lost or to create a cleanup goroutine to garbage collect Failed tasks having MaxRetries >=0 and re-queue them.

    Also it would be great to have a method to tell a task to be re-processed if "max_retries" field is removed (no more attempts left) or some other case like above. I know I can modify MaxRetries, ETA etc. but need to put a task id into delay set anyway.

    Thanks!

    bug 
    opened by roman-vynar 3
  • 2 panics when using custom redis broker

    2 panics when using custom redis broker

    Started using v0.2.0. The example https://github.com/thoas/bokchoy/blob/master/examples/custom-broker/main.go stopped working because there are private fields which have to be initialized. My code:

    broker, err = bokchoy.New(ctx, bokchoy.Config{},
    	bokchoy.WithBroker(&bokchoy.RedisBroker{Client: myRedisClient, ClientType: "client"}),
    

    type RedisBroker requires the following fields to be initialized:

    mu         *sync.Mutex
    queues     map[string]struct{}
    

    and it's only done in newRedisBroker() which is private.

    Without those 2 being initialized there are 2 panics: 1.

    panic: runtime error: invalid memory address or nil pointer dereference
    [signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x13c7912]
    
    goroutine 79 [running]:
    sync.(*Mutex).Lock(...)
    	/usr/local/Cellar/go/1.13.7/libexec/src/sync/mutex.go:74
    github.com/thoas/bokchoy.(*RedisBroker).consumeDelayed(0xc000545440, 0x1d0ec60, 0xc0000d0038, 0xc0003ceda0, 0x1d, 0x3b9aca00)
    	/Users/weber/gopath/pkg/mod/github.com/thoas/[email protected]/broker_redis.go:183 +0x42
    
    panic: assignment to entry in nil map
    
    goroutine 52 [running]:
    github.com/thoas/bokchoy.(*RedisBroker).consumeDelayed(0xc0005bf0e0, 0x1d0ec20, 0xc0000380c8, 0xc0003882e0, 0x1d, 0x3b9aca00)
    	/Users/weber/gopath/pkg/mod/github.com/thoas/[email protected]/broker_redis.go:228 +0x1fe
    
    

    So this makes impossible to use a custom broker (custom redis client in my case).

    Thanks!

    opened by roman-vynar 2
  • Question: Tracer vs logger

    Question: Tracer vs logger

    Hello!

    My focus atm is on instrumentation, I wrote my logger and my tracer using zap. It is kind of working I think, but I do not understand the difference. Looking at the Tracer and how it is used in queue.go it logs and that's it. I would like to do a trace such as OpenTracing, OpenTeletry. Do you have any suggestions? I think I can do a mix of middleware/event but I would like to get a direction from you.

    Thanks a lot

    opened by gianarb 2
  • Restarting Redis causes error retrieving delayed tasks

    Restarting Redis causes error retrieving delayed tasks

    I'm running a processor using Run(ctx). Everything works great until I restart redis server. It's not uncommon for our systems to have hiccups, so I would hope that the server picks up where it left off. It simply stops working and I don't see anything unless I turn on development logging. Then I see the following:

    ERROR	logging/logging.go:123	Received error when retrieving delayed tasks	{"error": "unable to ZPOPBYSCORE PrefixQueue:delay: NOSCRIPT No matching script. Please use EVAL.", "errorVerbose": "NOSCRIPT No matching script. Please use EVAL.\nunable to ZPOPBYSCORE PrefixQueue:delay\ngithub.com/thoas/bokchoy.(*redisBroker).Consume\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/broker_redis.go:198\ngithub.com/thoas/bokchoy.(*Queue).consume\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:308\ngithub.com/thoas/bokchoy.(*Queue).ConsumeDelayed\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:299\ngithub.com/thoas/bokchoy.(*Queue).consumeDelayedTasks.func1\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:150\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1337"}
    github.com/thoas/bokchoy/logging.wrapLogger.Error
    	/userdir/go/pkg/mod/github.com/thoas/[email protected]/logging/logging.go:123
    github.com/thoas/bokchoy.loggerTracer.Log
    	/userdir/go/pkg/mod/github.com/thoas/[email protected]/tracer.go:32
    github.com/thoas/bokchoy.(*Queue).consumeDelayedTasks.func1
    	/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:152
    
    bug 
    opened by tsoslow 2
  • Remove redisClient in favour of UniversalClient

    Remove redisClient in favour of UniversalClient

    Remove our interface redisClient and swap it for goredis' redis.UniversalClient interface. This fixes the inability to provide custom brokers with goredis v7.

    Couldn't see any reason as to why redisClient was being used. This allows the passing of a redis client as a custom broker, and updates the custom broker example.

    If possible, would we be able to get a new release too? That way the redis v7 and these changes are easy to use :)

    opened by Celant 1
  • Upgrade go-redis to v7

    Upgrade go-redis to v7

    Upgrade go-redis to v7.

    We have upgraded go-redis in our project and it's impossible to use bokchoy with v6 and our other redis related stuff on v7. Even I re-map v6 to v7 in my project's go.mod, it will still conflict.

    I think it's a good idea to update anyway. v7 has some new features.

    opened by roman-vynar 1
  • Delay task when it fails

    Delay task when it fails

    Hello! Thanks a lot for your work. I have a question, I would like to delay the execution of a task when something inside the handlers fails. Let's assume I am calling an API that rates limit requests for a day when 1000 calls are reached. When I catch a failed request to that API caused by a rate-limiting I would like to re-enqueue the task with a couple of hours delay. I saw that task as TTL I should for sure increase that, but what else can I do to delay the next execution? I am also not sure if the modification to a task inside a handler gets stored or not. Thanks!

    opened by gianarb 1
  • How to handle errors?

    How to handle errors?

    It needs fine tuning from the README

    • What's happening when an error is catched?
    • When an task is returning an explicit error?
    • When a task is panicking?
    documentation 
    opened by thoas 1
  • unexpected error when task is not found in Redis

    unexpected error when task is not found in Redis

    Hello,

    When a task is not found in Redis, it returns an error with message cannot cast id: Attribute error.

    Looking into the code, I see there is actually a check to tell whether the task is not found in Redis: https://github.com/thoas/bokchoy/blob/0421beacfc775ab0c9431da5eab5a3d9a7cb9afb/queue.go#L228

    But I think that check will always be false because of https://github.com/thoas/bokchoy/blob/0421beacfc775ab0c9431da5eab5a3d9a7cb9afb/broker_redis.go#L325

    opened by hurngchunlee 0
  • prevent OnCompleteFunc called before last retry

    prevent OnCompleteFunc called before last retry

    Hi @thoas,

    First of all, thank you for creating this useful library for task scheduling.

    I noticed that the queue.OnCompleteFunc is triggered before the last retry takes place. After checking the source code, I think the changes in this pull request will prevent it to happen.

    Cheers, Hong

    opened by hurngchunlee 1
  • Redis cluster mode doesn't work properly

    Redis cluster mode doesn't work properly

    Everything works fine locally with the single/client mode. But when I run the code on production with cluster mode, the consumer never receives anything from the queue.

    bug 
    opened by duke-cliff 1
  • By-pass serializer or pass struct to be serialized

    By-pass serializer or pass struct to be serialized

    Hello :wave:,

    It would be great it we could by-pass the serializer so we can unmarshal (manually) a specific struct, and not a generic map. Or, provide the struct as payload so the serializer could use this instance.

    For simplicity, I would prefer the second option. For flexibility, the first one: we could receive a payload that is schema-less (a webhook for example) and depending of its type (defined by a keyword or composite information), unmarshal a specific struct.

    If you have further question, don't hesitate to ask.

    Cheers,

    opened by novln 0
  • Queues created afrer Engine started

    Queues created afrer Engine started

    Hi How can I process tasks in queues created after Engine started? All this tasks have Status "waiting" and Queue "start" method is unexported

    opened by gondone666 0
Releases(v0.2.0)
  • v0.2.0(Dec 4, 2019)

  • v0.1.0(Jul 16, 2019)

    Initial release

    • Queueing tasks and processing them
    • Delayed tasks
    • Retries
    • Event listeners
    • Middlewares (logger, recover, request-id, timeout, ...)
    Source code(tar.gz)
    Source code(zip)
Owner
Florent Messa
CTO @ulule
Florent Messa
A simple job scheduler backed by Postgres.

A simple job scheduler backed by Postgres used in production at https://operand.ai. Setup needs two environment variables, SECRET and ENDPOINT. The se

Morgan Gallant 12 Sep 10, 2022
A lightweight job scheduler based on priority queue with timeout, retry, replica, context cancellation and easy semantics for job chaining. Build for golang web apps.

Table of Contents Introduction What is RIO? Concern An asynchronous job processor Easy management of these goroutines and chaining them Introduction W

Supratim Samanta 52 Dec 9, 2022
Executes jobs in separate GO routines. Provides Timeout, StartTime controls. Provides Cancel all running job before new job is run.

jobExecutor Library to execute jobs in GO routines. Provides for Job Timeout/Deadline (MaxDuration()) Job Start WallClock control (When()) Add a job b

Eswaran SK 0 Jan 10, 2022
Simple, efficient background processing for Golang backed by RabbitMQ and Redis

Table of Contents How to Use Motivation Requirements Features Examples Setup Config Client/Server Task Worker/Task Hander Register The Handlers Send t

Hasan 39 Nov 10, 2022
goInterLock is golang job/task scheduler with distributed locking mechanism (by Using Redis🔒).

goInterLock is golang job/task scheduler with distributed locking mechanism. In distributed system locking is preventing task been executed in every instant that has the scheduler,

Jay Ehsaniara 29 Dec 5, 2022
clockwork - Simple and intuitive job scheduling library in Go.

clockwork A simple and intuitive scheduling library in Go. Inspired by python's schedule and ruby's clockwork libraries. Example use package main imp

null 26 Jul 27, 2022
Tasqueue is a simple, lightweight distributed job/worker implementation in Go

Tasqueue Tasqueue is a simple, lightweight distributed job/worker implementation in Go Concepts tasqueue.Broker is a generic interface that provides m

Lakshay Kalbhor 256 Dec 24, 2022
You had one job, or more then one, which can be done in steps

Leprechaun Leprechaun is tool where you can schedule your recurring tasks to be performed over and over. In Leprechaun tasks are recipes, lets observe

Strahinja 94 Nov 23, 2022
Job scheduling made easy.

scheduler Job scheduling made easy. Scheduler allows you to schedule recurrent jobs with an easy-to-read syntax. Inspired by the article Rethinking Cr

Carles Cerezo Guzmán 409 Dec 30, 2022
goCron: A Golang Job Scheduling Package.

goCron: A Golang Job Scheduling Package.

辣椒面 3.1k Jan 9, 2023
A programmable, observable and distributed job orchestration system.

?? Overview Odin is a programmable, observable and distributed job orchestration system which allows for the scheduling, management and unattended bac

James McDermott 448 Dec 21, 2022
Machinery is an asynchronous task queue/job queue based on distributed message passing.

Machinery Machinery is an asynchronous task queue/job queue based on distributed message passing. V2 Experiment First Steps Configuration Lock Broker

Richard Knop 6.7k Dec 24, 2022
Run Jobs on a schedule, supports fixed interval, timely, and cron-expression timers; Instrument your processes and expose metrics for each job.

A simple process manager that allows you to specify a Schedule that execute a Job based on a Timer. Schedule manage the state of this job allowing you to start/stop/restart in concurrent safe way. Schedule also instrument this Job and gather metrics and optionally expose them via uber-go/tally scope.

Sherif Abdel-Naby 58 Dec 8, 2022
golang job dispatcher

go-gearman The shardingkey is hashed to the same queue, each of which is bound to a worker.

fengyun.rui 16 Dec 28, 2022
Job worker service that provides an API to run arbitrary Linux processes.

Job Scheduler Summary Prototype job worker service that provides an API to run arbitrary Linux processes. Overview Library The library (Worker) is a r

Renato Guimarães 8 May 26, 2022
xxl-job 对应的golang客户端

xxl-job-go-client xxl-job 对应的golang客户端 提供Elasticsearch 日志组件,把job执行过程写入elasticsearch方便跟踪查询 func main() { exec := xxl.NewExecutor( xxl.ServerAd

Ronnie 6 Aug 26, 2022
a self terminating concurrent job queue for indeterminate workloads in golang

jobtracker - a self terminating concurrent job queue for indeterminate workloads in golang This library is primarily useful for technically-recursive

maia tillie arson crimew 9 Sep 6, 2022
A zero-dependencies and lightweight go library for job scheduling

A zero-dependencies and lightweight go library for job scheduling.

null 3 Aug 3, 2022
Cloud-native, enterprise-level cron job platform for Kubernetes

Furiko Furiko is a cloud-native, enterprise-level cron and adhoc job platform for Kubernetes. The main website for documentation and updates is hosted

Furiko 268 Dec 30, 2022