Celery Distributed Task Queue in Go

Overview

gocelery

Go Client/Server for Celery Distributed Task Queue

Build Status Coverage Status Go Report Card "Open Issues" "Latest Release" GoDoc License FOSSA Status

Why?

Having been involved in several projects migrating servers from Python to Go, I have realized Go can improve performance of existing python web applications. As Celery distributed tasks are often used in such web applications, this library allows you to both implement celery workers and submit celery tasks in Go.

You can also use this library as pure go distributed task queue.

Go Celery Worker in Action

demo

Supported Brokers/Backends

Now supporting both Redis and AMQP!!

  • Redis (broker/backend)
  • AMQP (broker/backend) - does not allow concurrent use of channels

Celery Configuration

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

Starting from version 4.0, Celery uses message protocol version 2 as default value. GoCelery does not yet support message protocol version 2, so you must explicitly set CELERY_TASK_PROTOCOL to 1.

CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
CELERY_TASK_PROTOCOL=1,

Example

GoCelery GoDoc has good examples.
Also take a look at example directory for sample python code.

GoCelery Worker Example

Run Celery Worker implemented in Go

// create redis connection pool
redisPool := &redis.Pool{
  Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
	gocelery.NewRedisBroker(redisPool),
	&gocelery.RedisCeleryBackend{Pool: redisPool},
	5, // number of workers
)

// task
add := func(a, b int) int {
	return a + b
}

// register task
cli.Register("worker.add", add)

// start workers (non-blocking call)
cli.StartWorker()

// wait for client request
time.Sleep(10 * time.Second)

// stop workers gracefully (blocking call)
cli.StopWorker()

Python Client Example

Submit Task from Python Client

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    ar = add.apply_async((5456, 2878), serializer='json')
    print(ar.get())

Python Worker Example

Run Celery Worker implemented in Python

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle

GoCelery Client Example

Submit Task from Go Client

// create redis connection pool
redisPool := &redis.Pool{
  Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
	gocelery.NewRedisBroker(redisPool),
	&gocelery.RedisCeleryBackend{Pool: redisPool},
	1,
)

// prepare arguments
taskName := "worker.add"
argA := rand.Intn(10)
argB := rand.Intn(10)

// run task
asyncResult, err := cli.Delay(taskName, argA, argB)
if err != nil {
	panic(err)
}

// get results from backend with timeout
res, err := asyncResult.Get(10 * time.Second)
if err != nil {
	panic(err)
}

log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))

Sample Celery Task Message

Celery Message Protocol Version 1

{
    "expires": null,
    "utc": true,
    "args": [5456, 2878],
    "chord": null,
    "callbacks": null,
    "errbacks": null,
    "taskset": null,
    "id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b",
    "retries": 0,
    "task": "worker.add",
    "timelimit": [null, null],
    "eta": null,
    "kwargs": {}
}

Projects

Please let us know if you use gocelery in your project!

Contributing

You are more than welcome to make any contributions. Please create Pull Request for any changes.

LICENSE

The gocelery is offered under MIT license.

Issues
  • gocelery client crashes python celery workers (python 3.6, celery 4.2.1, redis broker/backend)

    gocelery client crashes python celery workers (python 3.6, celery 4.2.1, redis broker/backend)

    I'm trying this Go client:

    package main
    
    import (
    	"os"
    	"fmt"
    	"github.com/gocelery/gocelery"
    	"math/rand"
    )
    
    func main() {
    	// initialize celery client
    	url := fmt.Sprintf("amqp://platzi:%[email protected]/platzi",os.Getenv("RMQ_PASS"))
    	cli, _ := gocelery.NewCeleryClient(
    		gocelery.NewAMQPCeleryBroker(url),
    		gocelery.NewAMQPCeleryBackend(url),
    		1,
    	)
    
    	// Prepara los comandos
    	taskName := "RegistraComandos.registra"
    	comandos := [3]string{"uno","dos","tres"}
    	
    	i:= 0
    	for i < 1 {
    		_, err := cli.Delay(taskName, comandos[rand.Intn(3)])
    		if err != nil {
    			panic(err)
    		}
    	}
    }
    

    But it crashes the Python worker with this error:

    [2019-03-06 10:23:31,621: CRITICAL/MainProcess] Unrecoverable error: AttributeError("'NoneType' object has no attribute 'tzinfo'",)
    Traceback (most recent call last):
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
        self.blueprint.start(self)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
        step.start(parent)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
        return self.obj.start()
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 317, in start
        blueprint.start(self)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
        step.start(parent)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 593, in start
        c.loop(*c.loop_args())
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/loops.py", line 88, in asynloop
        update_qos()
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/kombu/common.py", line 417, in update
        return self.set(self.value)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/kombu/common.py", line 410, in set
        self.callback(prefetch_count=new_value)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/consumer/tasks.py", line 47, in set_prefetch_count
        apply_global=qos_global,
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/kombu/messaging.py", line 558, in qos
        apply_global)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/channel.py", line 1812, in basic_qos
        wait=spec.Basic.QosOk,
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/abstract_channel.py", line 59, in send_method
        return self.wait(wait, returns_tuple=returns_tuple)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/abstract_channel.py", line 79, in wait
        self.connection.drain_events(timeout=timeout)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/connection.py", line 491, in drain_events
        while not self.blocking_read(timeout):
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/connection.py", line 497, in blocking_read
        return self.on_inbound_frame(frame)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/method_framing.py", line 77, in on_frame
        callback(channel, msg.frame_method, msg.frame_args, msg)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/connection.py", line 501, in on_inbound_method
        method_sig, payload, content,
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/abstract_channel.py", line 128, in dispatch_method
        listener(*args)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/amqp/channel.py", line 1597, in _on_basic_deliver
        fun(msg)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/kombu/messaging.py", line 624, in _receive_callback
        return on_m(message) if on_m else self.receive(decoded, message)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 567, in on_task_received
        callbacks,
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/strategy.py", line 153, in task_message_handler
        body=body, headers=headers, decoded=decoded, utc=utc,
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/worker/request.py", line 140, in __init__
        self.eta = maybe_make_aware(eta, self.tzlocal)
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/utils/time.py", line 327, in maybe_make_aware
        if is_naive(dt):
      File "/home/jmerelo/.pyenv/versions/3.6.1/lib/python3.6/site-packages/celery/utils/time.py", line 282, in is_naive
        return dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None
    AttributeError: 'NoneType' object has no attribute 'tzinfo'
    

    That particular worker works like a charm with Python and also node-celery clients, so far.

    bug 
    opened by JJ 7
  • gocelery worker fails to decode message from python client (python 3.6, celery 4.2.1, redis backend)

    gocelery worker fails to decode message from python client (python 3.6, celery 4.2.1, redis backend)

    Since I couldn't get the RabbitMQ version to work (see #26 ) I have switched to Redis. Here's the program:

    import os
    from celery import Celery
    from dotenv import load_dotenv
    
    load_dotenv()
    app = Celery('tasks',
                 broker='redis://localhost:6379',
                 backend='redis://localhost:6379')
    
    if __name__ == '__main__':
        ordenes =['tres', 'uno','uno','uno','dos','dos', 'tres']
        for i in ordenes:
            print( "Envía ", i )
            enviado = app.send_task("tasks.registra", [i], serializer='json')
    

    And the reception, which is pretty much the same, but for redis:

    package main
    // Estructura de https://github.com/gocelery/gocelery
    
    import (
    	"fmt"
    	"time"
    	"github.com/gocelery/gocelery"
    )
    
    // Celery Task
    var comandos = make(map[string]int)
    func registra(comando string) {
    	fmt.Println( comando )
    	comandos[comando]++
    	fmt.Println( comandos )
    }
    
    func main() {
    	// Crea el broker y el backend
    	celeryBroker := gocelery.NewRedisCeleryBroker("redis://")
    	celeryBackend := gocelery.NewRedisCeleryBackend("redis://")
    
    	// Usa dos workers
    	celeryClient, _ := gocelery.NewCeleryClient(celeryBroker, celeryBackend, 1)
    
    	// Registra la función
    	celeryClient.Register("tasks.registra", registra)
    
    	// Arranca el worker
    	fmt.Println( "Arranca el worker" )
    	go celeryClient.StartWorker()
    
    	// Espera y para 
    	time.Sleep(120 * time.Second)
    	celeryClient.StopWorker()
    }
    

    Messages are received allright, but it errors:

    Arranca el worker
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:10:55 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    2019/03/06 09:11:24 failed to decode task message
    

    The error is the same with or without the serializer='json' argument. Looking at the source of the error, it seems to be trying to decode some Base64 stuff. But I really have no idea. Any help?

    bug 
    opened by JJ 7
  • Why does gocelery use BLPOP command in redis broker?

    Why does gocelery use BLPOP command in redis broker?

    Hi.
    I have the question about redis_broker.go https://github.com/gocelery/gocelery/blob/17631e11026a63f12751d8b4b0e0e2fe521e745b/redis_broker.go#L49-L83 We can find that it uses LPUSH when it sends and BLPOP when it gets.
    I assume it would be LIFO, not FIFO.
    I want to ask why it doesn't uses BRPOP which kombu uses.

    https://github.com/celery/kombu/blob/8a974955a80b94eb0c7138a6cba5b53dd3dc911a/kombu/transport/redis.py#L587-L598

    opened by actumn 6
  • Task should be an interface, not an anonymous function

    Task should be an interface, not an anonymous function

    type Task interface {
        Execute() (result interface{}, err error)
    }
    

    This way you can add dependencies without accessing global state.

    For example

    type AddTask struct {
        A `json:"a"`
        B `json:"b"`
        Cache CachingService
    }
    
    enhancement 
    opened by nathanjordan 6
  • Fix infinite loop bug in deliveryAck

    Fix infinite loop bug in deliveryAck

    Fix an infinite loop bug in https://github.com/gocelery/gocelery/issues/117 If RabbitMQ is down then deliveryAck will retry indefinitely and consume a lot of CPU resources.

    opened by Henderake 5
  • running concurrent tests with multiple CeleryClient and multiple tasks fails

    running concurrent tests with multiple CeleryClient and multiple tasks fails

    As apparent from below log, TestWorkerClientArgs test is attempting to get registeredTasks map 0xc4200e48d0 instead of 0xc4200e5590 which is from TestWorkerClientKwargs test.

    • both tests have new CeleryClient of its own: 0xc4200e4900, 0xc4200e55c0
    • both tests pass if run individually.
    === RUN   TestWorkerClientKwargs
    2016/09/30 20:49:41 kwarg client 0xc4200e4900
    2016/09/30 20:49:41 registering task on 0xc4200e48d0 multiply_kwargs
    2016/09/30 20:49:41 registered tasks: 0xc4200e48d0 map[multiply_kwargs:0xc4200fabc0]
    2016/09/30 20:49:41 getting tasks: 0xc4200e48d0 map[multiply_kwargs:0xc4200fabc0]
    2016/09/30 20:49:41 kwarg client 0xc4200a1b00
    2016/09/30 20:49:41 registering task on 0xc4200a1ad0 multiply_kwargs
    2016/09/30 20:49:41 registered tasks: 0xc4200a1ad0 map[multiply_kwargs:0xc4200fadc0]
    2016/09/30 20:49:41 getting tasks: 0xc4200a1ad0 map[multiply_kwargs:0xc4200fadc0]
    --- PASS: TestWorkerClientKwargs (0.02s)
    === RUN   TestWorkerClientArgs
    2016/09/30 20:49:41 arg client 0xc4200e55c0
    2016/09/30 20:49:41 registering task on 0xc4200e5590 multiply
    2016/09/30 20:49:41 registered tasks: 0xc4200e5590 map[multiply:0x487510]
    2016/09/30 20:49:41 getting tasks: 0xc4200e48d0 map[multiply_kwargs:0xc4200fabc0]
    2016/09/30 20:49:41 task multiply_args is not registered
    2016/09/30 20:49:41 async result got
    --- FAIL: TestWorkerClientArgs (5.01s)
        gocelery_test.go:134: failed to get result: 5s timeout getting result for cdd00c82-e593-45a0-955a-4ac10f2b4b09
    
    bug 
    opened by shicky 5
  • compilation fails on message.go: not enough arguments in call to uuid.

    compilation fails on message.go: not enough arguments in call to uuid.

    I got following error when trying to compile the go project referencing gocelery:

    github.com/gocelery/gocelery/message.go:XX:XX: not enough arguments in call to uuid.Must have (uuid.UUID) want (uuid.UUID, error)

    if you look into the latest code for https://github.com/satori/go.uuid uuid.Must function has extra argument "error"

    func Must(u UUID, err error) UUID {
    	if err != nil {
    		panic(err)
    	}
    	return u
    }
    
    opened by rawbinz 4
  • Fix bug.  support for task execution without arguments

    Fix bug. support for task execution without arguments

    Now. can not execution no args tasks.

    @shared_task
    def test_task():
        print("Hello, World!")
    
    func main() {
    
            cli, _ := gocelery.NewCeleryClient(
                    gocelery.NewRedisCeleryBroker(REDIS_URL),
                    gocelery.NewRedisCeleryBackend(REDIS_URL),
                    1,
            )
    
            asyncResult, err := cli.Delay(test_task)
            if err != nil {
                    panic(err)
            }
    

    go run main.go

    [2019-06-26 17:37:44,256: ERROR/ForkPoolWorker-2] Task test_task[cd94a9db-cdec-43f4-be87-7474df4cca21] raised unexpected: TypeError('test_task object argument after * must be an iterable, not NoneType')
    Traceback (most recent call last):
      File "/$HOME.local/share/virtualenvs/test-python-repo--NH_gJ4S/lib/python3.7/site-packages/celery/app/trace.py", line 385, in trace_task
        R = retval = fun(*args, **kwargs)
    TypeError: test_task object argument after * must be an iterable, not NoneType
    

    https://github.com/gocelery/gocelery/blob/master/message.go#L171 Add below code. fmt.Printf(stirng(jsonData))

    Debug log. {"id":"81af5e46-b6dc-46c3-9057-d21202d9c2aa","task":"vulns.tasks.test_task","args":null,"kwargs":{},"retries":0,"eta":null}

    args is null.

    opened by masahiro331 4
  • Monitoring Tool/UI for Gocelery (FlowerUI)

    Monitoring Tool/UI for Gocelery (FlowerUI)

    Hi @sickyoon would you know if there is any Monitoring UI for gocelery just like how Flower UI can monitor celery? Or do you know if flower UI can be configured to server a gocelery application?

    opened by danielchengml 4
  • Making taskID in AsyncResult a public field.

    Making taskID in AsyncResult a public field.

    https://github.com/gocelery/gocelery/blob/892a6d55c1d7e9aa51a1b613f68e600c6f6a6ffb/gocelery.go#L113

    Hi @sickyoon, great project you have done! I was wondering what the reason was for setting the fields in AsyncResult as private fields especially taskID. I am hoping to retrieve the TaskID immediately after I submit the task to the queue even before the queue's been worked on by the worker. Would it be possible to change it to a public field? eg.

    type AsyncResult struct {
    	TaskID  string
    	backend CeleryBackend
    	result  *ResultMessage
    }
    
    enhancement 
    opened by danielchengml 3
  • Each worker burns one CPU core even if he is only waiting for new messages

    Each worker burns one CPU core even if he is only waiting for new messages

    The StartWorker() function runs into an loop and on each iteration it wants to get new messages from the broker.

    As far as i understood go channels normaly you want explizit the blocking behavior of channel reads and write so that the underling code ony gets executet if there is something on the channel.

    In the way you used the select satement brakes this pattern due to the default: case. Everytime when there is nothing on the channel your code does not stop. Instead it is looping over and over again and burns all the CPU for waiting on tasks.

    To be honest im only a GO beginner so I'm not able to fix your code at all. But I would appreciate it when you think about that design pattern.

    func (w *CeleryWorker) StartWorker() {
    
        w.stopChannel = make(chan struct{}, 1)
        w.workWG.Add(w.numWorkers)
    
        for i := 0; i < w.numWorkers; i++ {
            go func(workerID int) {
                defer w.workWG.Done()
                for {
                    select {
                    case <-w.stopChannel:
                        return
                    default:
    
                        // process messages
                        taskMessage, err := w.broker.GetTaskMessage()
                        if err != nil || taskMessage == nil {
                            //log.Println("continue")
                            continue
                        }
    
                        //log.Printf("WORKER %d task message received: %v\n", workerID, taskMessage)
    
                        // run task
                        resultMsg, err := w.RunTask(taskMessage)
                        if err != nil {
                            log.Println(err)
                            continue
                        }
                        defer releaseResultMessage(resultMsg)
    
                        // push result to backend
                        err = w.backend.SetResult(taskMessage.ID, resultMsg)
                        if err != nil {
                            log.Println(err)
                            continue
                        }
                    }
                }
            }(i)
        }
    }
    
    

    amqp_broker.go

    
    func (b *AMQPCeleryBroker) GetTaskMessage() (*TaskMessage, error) {
        var taskMessage TaskMessage
        select {
        case delivery := <-b.consumingChannel:
            delivery.Ack(false)
            if err := json.Unmarshal(delivery.Body, &taskMessage); err != nil {
                return nil, err
            }
            return &taskMessage, nil
        default:
            return nil, fmt.Errorf("consumingChannel is empty")
    
        }
    
    }
    
    

    Kind regards Reinhard Luediger

    bug 
    opened by luedigernet 3
  • Race-condition with worker task execution

    Race-condition with worker task execution

    Hello,

    This logic in the worker is not thread safe. When the client registers a function with a Task interface instance, that task is essentially a singleton bound to that function-name that will be shared between all worker threads accessing it during execution. As a result, calls to ParseKwargs and making variable assignments on the instance will be dangerous as it'll lead to nondeterministic behavior if multiple threads are concurrently running that task from different calls.

    Without backwards compatibility in mind, the cleanest solution to this would be to rework the CeleryTask interface to simply have a single method with signature RunTask(kwargs map[string]interface{}) (interface{}, error) which will do ParseKwargs and RunTask in one shot. This way we'd restrict the burden of state management to the method's scope.

    That however may not be feasible as that'll break existing client code. Nonetheless, this is a serious issue that I have confirmed during testing.

    opened by keon94 1
  • Interest in a PostgreSQL result backend

    Interest in a PostgreSQL result backend

    Hi,

    I see that there is Celery/Redis backend support out of the box – but unfortunately I've been using PostgreSQL as a result backend for my own project until I can migrate to Redis.

    I plan to write a lightweight Postgres backend wrapper based off of the Celery one – https://github.com/celery/celery/blob/master/celery/backends/database/models.py . Since the interface provided by gocelery is simple (GetResult/SetResult), I imagine it would be pretty straightforward.

    Do you / folks think there would be interest if I were to open a PR with it here, even as a proof of concept?

    opened by jtcho 3
  • connection.close() does not work.

    connection.close() does not work.

    Hi, I'm using gocelery to send celery task to broker (rabbitmq), and python as worker. However sometimes I received this error: "channel/connection is not open" that makes message cannot send to broker. The walkthrough solution is that I will check that connection is usable before send to to broker, if not then I will close old connection and make new connection.

    func (b *AMQPCeleryBroker) SendCeleryMessage(message *gocelery.CeleryMessage) error {
    	err := b.Send(message)
    	if b.ShouldReconnect(err) {
    		log.Printf("Error %#v, retrying", err)
    		b.Reconnect()
    		return b.Send(message)
    	}
    	return err
    }
    
    func (b *AMQPCeleryBroker) ShouldReconnect(err error) bool {
    	return err == amqp.ErrClosed || err == amqp.ErrChannelMax
    }
    
    func (b *AMQPCeleryBroker) Reconnect() {
    	defer func() {
    		if err := recover(); err != nil {
    			fmt.Println(fmt.Sprintf("[ERROR] %v", err))
    		}
    	}()
    
    	_ = b.Close()
    	conn, channel := NewAMQPConnection(b.host)
    	b.Channel = channel
    	b.connection = conn
    }
    
    

    However the method b.Close() seems not working. Number of connections continuously increase, and cause connection leak. Please help. Thanks.

    opened by thanhpd-teko 0
  • Keep getting this error: amqp_backend: failed to acknowledge result message : delivery not initialized

    Keep getting this error: amqp_backend: failed to acknowledge result message : delivery not initialized

    I've used gocelery for months and it has been working well but just yesterday when I deploy my app to a new GCP project I keep getting this error

    amqp_backend: failed to acknowledge result message: delivery not initialized
    

    but if I change to use redis as message broker it works like a charm Is there anything I need to modify? Probably pkg dependencies?

    RabbitMQ Version: rabbitmq:3.8.19-alpine

    opened by bingdow 0
  • Update StartWorkerWithContext to spawn workers on demand

    Update StartWorkerWithContext to spawn workers on demand

    Hi! I would like to propose to spawn workers on demand (when there are tasks to process) instead of keeping n workers at all times. That should improve resources utilization and fix the memory leak https://github.com/gocelery/gocelery/pull/167.

    opened by marselester 0
Releases(v1.0)
Owner
gocelery
Go Client/Server for Celery Distributed Task Queue
gocelery
Gotask - A simple task queue is stripped when the program is written to achieve the task delivery function

gotask The simple task queue is stripped when the program is written to achieve

SaiRson 4 Feb 14, 2022
YTask is an asynchronous task queue for handling distributed jobs in golang

YTask is an asynchronous task queue for handling distributed jobs in golang

gojuukaze 201 Jun 10, 2022
Task Timer (tt) is a dead simple TUI task timer

tasktimer Task Timer (tt) is a dead simple TUI task timer Usage To get started, just run tt: tt You'll be presented with something like this: You can

Carlos Alexandro Becker 230 Jun 28, 2022
Distributed Task Scheduling System|分布式定时任务调度平台

Crocodile Distributed Task Scheduling System English | 中文 Introduction A distributed task scheduling system based on Golang that supports http request

labulaka521 854 Jun 24, 2022
high performance distributed task scheduling system, Support multi protocol scheduling tasks

high performance distributed task scheduling system, Support multi protocol scheduling tasks

null 52 Jun 14, 2022
goInterLock is golang job/task scheduler with distributed locking mechanism (by Using Redis🔒).

goInterLock is golang job/task scheduler with distributed locking mechanism. In distributed system locking is preventing task been executed in every instant that has the scheduler,

Jay Ehsaniara 22 Jun 11, 2022
Go distributed task scheduler

Go distributed task scheduler

Nuggets 1 Nov 13, 2021
A lightweight job scheduler based on priority queue with timeout, retry, replica, context cancellation and easy semantics for job chaining. Build for golang web apps.

Table of Contents Introduction What is RIO? Concern An asynchronous job processor Easy management of these goroutines and chaining them Introduction W

Supratim Samanta 49 May 3, 2022
Fulfills a GitHub workflow_job webhooks into a Pub/Sub queue.

GitHub Workflow Job to Pub/Sub The GitHub Workflow Job to Pub/Sub is a small service that fulfills a GitHub workflow_job webhook. When a job is queued

Seth Vargo 3 Mar 3, 2022
a self terminating concurrent job queue for indeterminate workloads in golang

jobtracker - a self terminating concurrent job queue for indeterminate workloads in golang This library is primarily useful for technically-recursive

maia tillie arson crimew 8 Nov 23, 2021
Redisq is a queue-over-redis that provides simple way to works with queues stored in Redis.

Redisq is a queue-over-redis that provides simple way to works with queues stored in Redis.

null 0 Feb 10, 2022
Go based task runner

Grift Grift is a very simple library that allows you to write simple "task" scripts in Go and run them by name without having to write big main type o

Mark Bates 403 Jun 13, 2022
go task pool

Task Pool Task Pool 是一个易于使用且高度可配置的 golang类库,专门用于任务的管理&执行,支持自定义次数的重发。 功能特点 线程安全 - task pool 内所有的方法以及暴露的接口都是线程安全的 异步发送 - 调用 PushTask 方法后回立即返回,任务将会被传递到io

qinhan 30 Feb 21, 2022
Chrono is a scheduler library that lets you run your task and code periodically

Chrono is a scheduler library that lets you run your tasks and code periodically. It provides different scheduling functionalities to make it easier t

Procyon 182 Jun 18, 2022
go-sche is a golang library that lets you schedule your task to be executed later.

go-sche is a golang library that lets you schedule your task to be executed later.

cza 2 May 29, 2022
A cross-platform task runner for executing commands and generating files from templates

Orbit A cross-platform task runner for executing commands and generating files from templates Orbit started with the need to find a cross-platform alt

Julien Neuhart 168 May 27, 2022
Tasks - Golang CLI, Task manager

Tasks Golang CLI, Task manager Prerequisites Golang Setup environment variables

Santiago Bedoya 0 Jan 30, 2022
A programmable, observable and distributed job orchestration system.

?? Overview Odin is a programmable, observable and distributed job orchestration system which allows for the scheduling, management and unattended bac

James McDermott 439 Jun 23, 2022
分布式定时任务库 distributed-cron

dcron 分布式定时任务库 原理 基于redis同步节点数据,模拟服务注册。然后将任务名 根据一致性hash 选举出执行该任务的节点。 流程图 特性 负载均衡:根据任务数据和节点数据均衡分发任务。 无缝扩容:如果任务节点负载过大,直接启动新的服务器后部分任务会自动迁移至新服务实现无缝扩容。

libi 185 Jun 26, 2022