Asynq: simple, reliable, and efficient distributed task queue in Go

Overview

Asynq

Build Status GoDoc Go Report Card License: MIT Gitter chat

Overview

Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.

Highlevel overview of how Asynq works:

  • Client puts task on a queue
  • Server pulls task off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines.
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Task Queue Diagram

Stability and Compatibility

Important Note: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

Features

Quickstart

First, make sure you are running a Redis server locally.

$ redis-server

Next, write a package that encapsulates task creation and task handling.

package tasks

import (
    "fmt"

    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
    payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
    return asynq.NewTask(TypeEmailDelivery, payload)
}

func NewImageResizeTask(src string) *asynq.Task {
    payload := map[string]interface{}{"src": src}
    return asynq.NewTask(TypeImageResize, payload)
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    userID, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    tmplID, err := t.Payload.GetString("template_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
    // Email delivery code ...
    return nil
}

// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
    // ... fields for struct
}

func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    src, err := t.Payload.GetString("src")
    if err != nil {
        return err
    }
    fmt.Printf("Resize image: src = %s\n", src)
    // Image resizing code ...
    return nil
}

func NewImageProcessor() *ImageProcessor {
    // ... return an instance
}

In your application code, import the above package and use Client to put tasks on the queue.

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}
    c := asynq.NewClient(r)
    defer c.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    t := tasks.NewEmailDeliveryTask(42, "some:template:id")
    res, err := c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    t = tasks.NewEmailDeliveryTask(42, "other:template:id")
    res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal("could not schedule task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)

    // ---------------------------------------------------------------------------
    // Example 4: Pass options to tune task processing behavior at enqueue time.
    //            Options passed at enqueue time override default ones, if any.
    // ---------------------------------------------------------------------------

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)
}

Next, start a worker server to process these tasks in the background.
To start the background workers, use Server and provide your Handler to process the tasks.

You can optionally use ServeMux to create a handler, just as you would with "net/http" Handler.

package main

import (
    "log"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}

    srv := asynq.NewServer(r, asynq.Config{
        // Specify how many concurrent workers to use
        Concurrency: 10,
        // Optionally specify multiple queues with different priority.
        Queues: map[string]int{
            "critical": 6,
            "default":  3,
            "low":      1,
        },
        // See the godoc for other configuration options
    })

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

For a more detailed walk-through of the library, see our Getting Started Guide.

To Learn more about asynq features and APIs, see our Wiki and godoc.

Web UI

Asynqmon is a web based tool for monitoring and administrating Asynq queues and tasks. Please see the tool's README for details.

Here's a few screenshots of the web UI.

Queues view
Web UI QueuesView

Tasks view
Web UI TasksView

Command Line Tool

Asynq ships with a command line tool to inspect the state of queues and tasks.

Here's an example of running the stats command.

Gif

For details on how to use the tool, refer to the tool's README.

Installation

To install asynq library, run the following command:

go get -u github.com/hibiken/asynq

To install the CLI tool, run the following command:

go get -u github.com/hibiken/asynq/tools/asynq

Requirements

Dependency Version
Redis v3.0+
Go v1.13+

Contributing

We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community. Please see the Contribution Guide before contributing.

Acknowledgements

  • Sidekiq : Many of the design ideas are taken from sidekiq and its Web UI
  • RQ : Client APIs are inspired by rq library.
  • Cobra : Asynq CLI is built with cobra

License

Asynq is released under the MIT license. See LICENSE.

Comments
  • [FEATURE REQUEST] Instrumentation/Tracing for asynq redis client

    [FEATURE REQUEST] Instrumentation/Tracing for asynq redis client

    Is your feature request related to a problem? Please describe.

    • I want to add tracing to the underlying redis client instance using dd-trace-go.
    • Is it possible to get access to the underlying redis instance so I can wrap the client with a function like the one below?
    import (
    	"github.com/go-redis/redis/v7"
    	ddRedis "gopkg.in/DataDog/dd-trace-go.v1/contrib/go-redis/redis.v7"
    )
    
    func TraceClient(rClient redis.UniversalClient) {
    	ddRedis.WrapClient(
    		rClient,
    		ddRedis.WithServiceName(config.ServiceName()+"-redis"),
    		ddRedis.WithAnalytics(config.Enabled()),
    	)
    }
    
    

    Describe alternatives you've considered

    • I am also looking if the way to go about this is creating a RedisConnOpt interface that returns a wrapped redis instance with datadog tracing. What do you think?

    https://pkg.go.dev/gopkg.in/DataDog/[email protected]/contrib/go-redis/redis.v7#WrapClient

    enhancement 
    opened by seanyu4296 24
  • [BUG] The task is always active

    [BUG] The task is always active

    Describe the bug When the task is active, the worker shutdown, then the task will always be active.

    To Reproduce Steps to reproduce the behavior:

    1. Set task sleep times.
    func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    	id, err := t.Payload.GetInt("user_id")
    	if err != nil {
    		return err
    	}
    	fmt.Printf("[%s] - [Start] Send Welcome Email to User %d\n", time.Now().String(), id)
    	time.Sleep(10*time.Second)
    	fmt.Printf("[%s] - [ End ] Send Welcome Email to User %d\n", time.Now().String(), id)
    	return nil
    }
    
    1. Start the worker.
    2. Start the scheduler.
    3. When the task is active, shutdown the worker.
    4. The task will always be active.

    Expected behavior I think the task should change the state to failed based on Timeout, MaxRetry, etc.

    Screenshots If applicable, add screenshots to help explain your problem.

    Environment (please complete the following information):

    • OS: Windows
    • Version of asynq package: v0.17.2
    bug 
    opened by GoneGo1ng 22
  • [QUESTION] i think set stop in run function is not a good idea

    [QUESTION] i think set stop in run function is not a good idea

    https://github.com/hibiken/asynq/blob/master/background.go#L253

    normally, packages has two methods, Run and Stop, so i can cleanup other resources before or after Stop(), another scene is that user will run multipe backgroud server in main(), like kafka consumer

    enhancement idea 
    opened by yxlimo 17
  • [FEATURE REQUEST] Processed job's payload and result

    [FEATURE REQUEST] Processed job's payload and result

    Is your feature request related to a problem? Please describe. Asynq currently removes the successfully processed jobs from Redis. That means the information about the processed jobs also vanishes like;

    • When did the client pushed the job into the queue
    • When is the job completed.
    • Is a given job id processed successfully.

    Describe the solution you'd like It would be nice to keep processed jobs until a given TTL and return a basic struct that contains something like id, queued_at, completed_at. I think to achieve that, the first thing is to generate a unique id for each task and keep them like asynq:{default}:processed:{job_id}

    Describe alternatives you've considered Python RQ or Laravel Horizon might be good examples to take a look at.

    enhancement 
    opened by unicod3 15
  • [Question] Are there TTL set for tasks? How long does it stay in Redis?

    [Question] Are there TTL set for tasks? How long does it stay in Redis?

    Asking because i cannot find this in the wiki and others might find it useful.

    Specifically, How long do archived tasks stay in Redis so we can keep track or retry manually?

    enhancement 
    opened by seanyu4296 13
  • [FEATURE REQUEST] WebUI of server states (like sidekiq's UI)

    [FEATURE REQUEST] WebUI of server states (like sidekiq's UI)

    We could offer a cmd for serving a comprehensive Web UI for management and monitoring

    Describe the solution you'd like maybe we can add a sub cmd in asynq. eg. asynq webui.

    eg. https://raw.githubusercontent.com/mperham/sidekiq/master/examples/web-ui.png

    enhancement 
    opened by zhaolion 11
  • [Proposal] Change Payload to bytes & Add Wrapper around Key-Value pair payload

    [Proposal] Change Payload to bytes & Add Wrapper around Key-Value pair payload

    Is your feature request related to a problem? Please describe. I'm always frustrated when I need to call GetX method for all the values in Payload to get all the data (also need to make sure that I don't make a typo in payload key string). It'd be nice if asynq supported en/decoding protobuf or gob message directly so that I can load that data to an object with one method call.

    Describe the solution you'd like initial proposal:

    type EmailTaskPayload struct {
        UserID int
    }
    
    // client side.
    func main() {
         c := asynq.NewClient(asynq.RedisClientOpt{Addr: ":6379"})
    
        t := asynq.NewTask("email:welcome", EmailTaskPayload{UserID: 42})
        err := c.Enqueue(t)
        if err != nil {
            log.Fatalf("could not enqueue task: %v", err)
        }
    }
    
    // background workers side
    func main() {
        bg := asynq.NewBackground(r, &asynq.Config{
            Concurrency: 10,
        })
    
        mux := asynq.NewMux()
        mux.HandleFunc("email:welcome", emailHandler)
    
        bg.Run(mux)
    }
    
    func emailHandler(ctx context.Context, t *asynq.Task) error {
        var p EmailTaskPayload
        err := t.ReadPayload(&p)
        if err != nil {
            return err
        }
        fmt.Printf("Send Email to User %d\n", p.UserID)
        return nil
    }
    

    Describe alternatives you've considered None for now.

    Additional context None for now.

    enhancement idea 
    opened by hibiken 11
  • [BUG] The high priority queue is process very slow compare to lower queue

    [BUG] The high priority queue is process very slow compare to lower queue

    Describe the bug The high priority queue is processed very slow compare to the lower queue

    To Reproduce Steps to reproduce the behavior (Code snippets if applicable):

    • Run a long-running task on the default queue. Many small but critical tasks will issue from here and added to the critical queue
    • The default queue is hold all the time of worker pools
    • The critical queue is stuck, and no task is process

    Expected behavior

    • The critical queue's task need to be selected to process first and clean out quickly

    Screenshots

    • Setup server with 3 queue Screen Shot 2021-08-20 at 09 38 28

    • Enqueue critical task like that Screen Shot 2021-08-20 at 09 41 06

    • The critical queue stuck Screen Shot 2021-08-20 at 09 37 49

    • The default queue is running on all other workers Screen Shot 2021-08-20 at 09 37 42

    Environment (please complete the following information):

    • OS: Ubuntu 18.4
    • Version of asynq package 0.18.3

    Screen Shot 2021-08-20 at 09 47 04

    bug 
    opened by duyhungtnn 10
  • add UniqueKey option to extend Unique's behaviour

    add UniqueKey option to extend Unique's behaviour

    Instead of (queue, type, payload) use any custom key. This allows to enforce unique tasks not limited to same payload and/or type.

    Tested with --redis_cluster flag.

    opened by benjajaja 9
  • [FEATURE REQUEST] Distributed concurrency support per queue / taskType

    [FEATURE REQUEST] Distributed concurrency support per queue / taskType

    Is your feature request related to a problem? Please describe. N/A

    Describe the solution you'd like Is it possible to set concurrency individually for every queue like it is supported here.

    Describe alternatives you've considered Currently I tried to wire up the server in a way where I calculate total concurrency and set max priority and assuming that the combination will provide some sort of similar effects as in gocraft/work. Storing the maxConcurrency and priority info per queue and using it for initialisation of the server when starting the adapter (a simple wrapper over asynq).

    type Adapter struct {
    	taskConfigMap map[string]taskConfig
    	tcMutex       sync.RWMutex
    }
    
    type taskConfig struct {
    	priority       int
    	maxConcurrency int
    }
    
    func (a *Adapter) server() *asynq.Server {
    	return asynq.NewServer(a.redisConnOpt, asynq.Config{
    		Concurrency:    a.concurrency(),
    		Queues:         a.queuePriorityMap(),
    	})
    }
    
    func (a *Adapter) queuePriorityMap() map[string]int {
    	a.tcMutex.RLock()
    	defer a.tcMutex.RUnlock()
    
    	m := make(map[string]int)
    
    	for tn, cfg := range f.taskConfigMap {
    		if cfg.priority == 0 {
    			m[tn] = 1
    			continue
    		}
    
    		m[tn] = cfg.priority
    	}
    
    	return m
    }
    
    func (a *Adapter) concurrency() int {
    	a.tcMutex.RLock()
    	defer a.tcMutex.RUnlock()
    
    	concurrency := 0
    	for _, cfg := range f.taskConfigMap {
    		if cfg.maxConcurrency == 0 {
    			concurrency++
    			continue
    		}
    
    		concurrency += cfg.maxConcurrency
    	}
    
    	if rtcpu := runtime.NumCPU(); concurrency < rtcpu {
    		return rtcpu
    	}
    
    	return concurrency
    }
    

    Additional context Ref: Priority Sampler from gocraft/work

    enhancement 
    opened by ajatprabha 9
  • [FEATURE REQUEST] Support for ULID in TaskMessage ID field

    [FEATURE REQUEST] Support for ULID in TaskMessage ID field

    Hey there,

    Was wondering if we could add support for ULID in the TaskMessage ID field. The first 10 characters of each ULID string encode timestamp information, which would enable the sorting of tasks by ID.

    There is another library (albeit 3rd party) that implements ULIDs with a similar API to the UUID library, though it would require a little work to to make the two interchangeable within asynq.

    • Client.go L:299 calls uuid.New, which takes no arguments, while ulid.New needs a uint64 timestamp and an io.Reader as an entropy source. time.Now() and rand.Reader (or nil) seem like sensible defaults.
    • Base.go L:264 calls uuid.MustParse. which returns a UUID type while ulid.MustParse returns a ULID type.

    Given the two differences above, it looks like a custom type would have to be implemented to support both, but it shouldn't be too much of a hassle.

    Happy to submit a PR to help implement if needed.

    Cheers.

    enhancement 
    opened by dhh93 9
  • Add interface for Inspector methods

    Add interface for Inspector methods

    This is a code structure change, not a functional one. There is a base.Broker interface that RDB implements, so I wanted to do the same for the Inspector.

    This PR adds the interface base.QueueInspector and moves some types to the base package in order to avoid circular dependencies. As expected, RDB already implements both base.QueueInspector and base.Broker.

    Why?. To make the CLI and UI work with different broker implementations. I would like to add another broker to this library, backed by MongoDB. base.Broker works fine, I have implemented basic features, but it needs this new interface to add support in the CLI and UI.

    All tests are passing.

    opened by vladfr 0
  • Add TaskStateProber to get task state changed events

    Add TaskStateProber to get task state changed events

    (server | client).SetTaskStateProber(asynq.TaskStateProber{
      // Probers to specify what kind of state or data wanted, default = {*: task}
      // {state = pending | retry | compeleted | archieved | * : data-key = next | task | result }
      // Probers: map[string]string{} 
    
      // Handler process the state changed event
      Handler: func(out map[string]interface{}) {
        // default out contains {state, task: asynq.TaskInfo}
      },
    })
    
    opened by mindon 0
  • [FEATURE REQUEST] Create new task in CLI or asynqmon

    [FEATURE REQUEST] Create new task in CLI or asynqmon

    Is your feature request related to a problem? Please describe. I'm always frustrated when trying to simulate task creation locally. There's no such feature for creating a new task in asynq tool CLI, nor in asynqmon.

    Describe the solution you'd like Provide task creation via CLI or asynqmon to make it easier to debug locally. Example command in CLI, could be like this

    asynq task enqueue --queue=example-queue --payload="{\"type\":\"UPDATE_VIEWER_COUNT\",\"stream_id\":83929120323,\"exec_at\":1663659030,\"interval\":300,\"exp_at\":1663662630}" 
    

    Describe alternatives you've considered

    Additional context

    enhancement 
    opened by isdzulqor 0
  • [FEATURE REQUEST] Update Redis GO library to support Redis 7

    [FEATURE REQUEST] Update Redis GO library to support Redis 7

    Is your feature request related to a problem? Please describe. Steps to reproduce

    1. Creating a redis cluster with redis 7. link
    2. Following the steps in wiki link
    3. Run the enqueue code below.
    client := asynq.NewClient(asynq.RedisClusterClientOpt{
    		Addrs: []string{"localhost:7000", "localhost:7001", "localhost:7002", "localhost:7003", "localhost:7004", "localhost:7005"},
    	})
    	defer client.Close()
    
    	// ------------------------------------------------------
    	// Example 1: Enqueue task to be processed immediately.
    	//            Use (*Client).Enqueue method.
    	// ------------------------------------------------------
    
    	task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
    	if err != nil {
    		log.Fatalf("could not create task: %v", err)
    	}
    	queues := []string{"q_1", "q_2", "q_3", "q_4"}
    	for i := 0; i < 10000; i++ {
    		que := queues[i%4]
    		info, err := client.Enqueue(task, asynq.Queue(que))
    		if err != nil {
    			log.Fatalf("could not enqueue task: %v %s", err, que)
    		}
    
    		log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
    	}
    

    Describe the solution you'd like Seems to be related to the redis/v8 library results in error link Upgrading to redis/v9 might solve this?

    error: SADD failed: got 4 elements in cluster info address, expected 2 or 3
    

    Additional context Following the steps above to figure out how to use asynq

    enhancement 
    opened by abit2 1
  • [FEATURE REQUEST] Update Task

    [FEATURE REQUEST] Update Task

    Is your feature request related to a problem? Please describe. I have enqueued delayed task

    info, err := client.Enqueue(task, asynq.ProcessAt(firstTime), asynq.TaskID(taskID))
    

    But after some time, I want to update the time for that task to be processed (can be earlier or later). The Task that is created have task id (or I can get from info.ID), so I want to be able to update it in the future.

    I can do delete task and enqueue again,

    err := inspector.DeleteTask(queueName, taskID)
    if err != nil {
        return err
    }
    info, err := client.Enqueue(task, asynq.ProcessAt(secondTime), asynq.TaskID(taskID))
    if err != nil {
        return err
    }
    

    but if somehow I can't enqueue after delete task, the task will be lost. This will be a trouble if there are checking if task exist before update, so retrying doesn't work.

    _, err := inspector.GetTaskInfo(queueName, taskID)
    if err != nil {
        if err.Error() == fmt.Sprintf("asynq: %s", asynq.ErrTaskNotFound) {
            // task doesn't exist, so doesn't need to update
            return nil
        }
        return err
    }
    err = inspector.DeleteTask(queueName, taskID)
    if err != nil {
        return err
    }
    info, err := client.Enqueue(task, asynq.ProcessAt(secondTime), asynq.TaskID(taskID))
    if err != nil {
        return err
    }
    

    Another approach that I can think of is create task UpdateTask, that will do:

    err := inspector.DeleteTask(queueName, taskID)
    if err != nil && err.Error() != fmt.Sprintf("asynq: %s", asynq.ErrTaskNotFound) { // if task is not found, it can be because retrying delete again
        return err
    }
    info, err := client.Enqueue(task, asynq.ProcessAt(secondTime), asynq.TaskID(taskID))
    if err != nil {
        return err
    }
    

    with enqueue UpdateTask like this:

    taskInfo, err := r.inspector.GetTaskInfo(queueName, taskID)
    if err != nil {
        return err
    }
    info, err := client.Enqueue(asynq.NewTask(UPDATE_TASK_TYPE, taskInfo.Payload), asynq.ProcessAt(secondTime), asynq.TaskID(taskID))
    if err != nil {
        return err
    }
    

    So the retrying just need to be retried from Delete, not checking if task exist.

    Beside that, I can also save task payload somewhere else so I can refer it.

    I think the library should be able to do atomic update.

    Let me know if there are missing docs that I haven't read to fulfill my requirements.

    Describe the solution you'd like

    info, err := client.UpdateTask(task, asynq.ProcessAt(firstTime), asynq.TaskID(taskID))
    if err != nil {
        if err == asynq.ErrTaskNotFound {
            // can do something else if task doesn't exist at the first place
            return nil
        }
        return err
    }
    

    It can also return err if the task isn't exist, so I know that it's updated or not (I don't want enqueue non existant task).

    Describe alternatives you've considered

    • Have option like asynq.UpdateIfExist(true).
    • Using transaction and pipeline so DeleteTask and Enqueue can be 1 atomic operation.
    • Support workflow that can handle this problem.

    Additional context

    enhancement designing 
    opened by endrawanandika 0
Releases(v0.23.0)
  • v0.23.0(Apr 12, 2022)

    This version adds Task Aggregation feature and includes a few improvements and fixes.

    Task Aggregation

    This is a new feature which allows you to enqueue multiple tasks successively, and have them passed to the Handler together rather than individually. The feature allows you to batch multiple successive operations into one, in order to save on costs, optimize caching, or batch notifications, for example. See the Wiki page for details.


    Added

    • Group option is introduced to enqueue task in a group.
    • GroupAggregator and related types are introduced for task aggregation feature.
    • GroupGracePeriod, GroupMaxSize, GroupMaxDelay, and GroupAggregator fields are added to Config.
    • Inspector has new methods related to "aggregating tasks".
    • Group field is added to TaskInfo.
    • (CLI): group ls command is added
    • (CLI): task ls supports listing aggregating tasks via --state=aggregating --group=<GROUP> flags
    • Enable rediss url parsing support: Thanks to @eleboucher

    Fixed

    • Fixed overflow issue with 32-bit systems (For details, see https://github.com/hibiken/asynq/pull/426): Thanks to @assimon!
    Source code(tar.gz)
    Source code(zip)
  • v0.22.1(Feb 20, 2022)

  • v0.22.0(Feb 19, 2022)

    This version improves worker crash recovery by introducing a concept of worker lease for an active task. It also adds an optional field to Config to customize the context passed to the Handler.

    Important Note: Since this version changes the logic of crash recovery, update of the library version should be done with a clean restart of the process. In other words, please make sure that the process running asynq.Server shutdown cleanly before restarting the process with this new version.

    Added

    • BaseContext is introduced in Config to specify callback hook to provide a base context from which Handler context is derived
    • IsOrphaned field is added to TaskInfo to describe a task left in active state with no worker processing it.

    Changed

    • Server now recovers tasks with an expired lease. Recovered tasks are retried/archived with ErrLeaseExpired error.
    Source code(tar.gz)
    Source code(zip)
  • v0.21.0(Jan 22, 2022)

    Added

    • PeriodicTaskManager is added. Prefer using this over Scheduler as it has better support for dynamic periodic tasks. See the wiki page for an example of using PeriodicTaskManager
    • The asynq stats command now supports a --json option, making its output a JSON object
    • Introduced new configuration for DelayedTaskCheckInterval. See godoc for more details.
    Source code(tar.gz)
    Source code(zip)
  • v0.20.0(Dec 19, 2021)

    This release includes changes to support Prometheus integration in the Web UI. The integration is optional, there shouldn't be any changes in performance or behavior of the library in this release.

    Added

    • Package x/metrics is added.
    • Tool tools/metrics_exporter binary is added.
    • ProcessedTotal and FailedTotal fields were added to QueueInfo struct.
    Source code(tar.gz)
    Source code(zip)
  • v0.19.1(Dec 12, 2021)

    This release includes a few additions and one fix.

    Added

    • Latency field is added to QueueInfo.
    • EnqueueContext method is added to Client.

    Fixed

    • Fixed an error when user pass a duration less than 1s to Unique option
    Source code(tar.gz)
    Source code(zip)
  • v0.19.0(Nov 6, 2021)

    This release includes an introduction of task retention after successful processing. With Retention option, you can specify how long the task should be retained in the queue as a completed task.

    Changed

    • NewTask takes Option as variadic argument
    • Bumped minimum supported go version to 1.14 (i.e. go1.14 or higher is required).

    Added

    • Retention option is added to allow user to specify task retention duration after completion.
    • TaskID option is added to allow user to specify task ID.
    • ErrTaskIDConflict sentinel error value is added.
    • ResultWriter type is added and provided through Task.ResultWriter method.
    • TaskInfo has new fields CompletedAt, Result and Retention.

    Removed

    • Client.SetDefaultOptions is removed. Use NewTask instead to pass default options for tasks.
    Source code(tar.gz)
    Source code(zip)
  • v0.18.6(Oct 3, 2021)

    This release fixes CLI import issue

    Changed

    • Updated github.com/go-redis/redis version to v8

    Fixes

    • Fixes tool import issue described in #325
    Source code(tar.gz)
    Source code(zip)
  • v0.18.5(Sep 1, 2021)

    This release includes one addition to the server config.

    Added

    IsFailure config option is added to Config to determine whether error returned from Handler counts as a failure.

    Source code(tar.gz)
    Source code(zip)
  • v0.18.4(Aug 18, 2021)

  • v0.18.3(Aug 9, 2021)

    This release includes minor change around task typename and critical performance fix for tooling (CLI, Web UI).

    Changed

    • Changed Queue function to not to convert the provided queue name to lowercase. Queue names are now case-sensitive.
    • QueueInfo.MemoryUsage is now an approximate usage value.

    Fixed

    • Fixed latency issue around memory usage (see https://github.com/hibiken/asynq/issues/309)
    Source code(tar.gz)
    Source code(zip)
  • v0.18.2(Jul 15, 2021)

  • v0.18.1(Jul 4, 2021)

    This release includes a minor change and critical fix in the task recovering logic (tasks left in active state due to server crash).

    Changed

    • Changed to execute task recovering logic when server starts up; Previously it needed to wait for a minute for task recovering logic to exeucte.

    Fixed

    • Fixed task recovering logic to execute every minute
    Source code(tar.gz)
    Source code(zip)
  • v0.18.0(Jun 29, 2021)

    Asynq 0.18 includes major API changes and changes to the tooling (WebUI and CLI). The inspeq sub package is removed, and all types and functions from the package are moved to asynq package. The upgrade guide is available here

    Package API changes

    Task

    • NewTask function takes array of bytes as payload.
    • Task Type and Payload is accessed by a method call (previously these were fields on Task).

    Server

    • Server API has changed. Renamed Quiet to Stop. Renamed Stop to Shutdown. Note: As a result of this renaming, the behavior of Stop has changed. Please update the existing code to call Shutdown where it used to call Stop.

    Scheduler

    • Renamed Stop to Shutdown.

    Client

    • Client.Enqueue returns TaskInfo (Previously returned Result struct)

    Inspector

    • inspeq package is removed. All types and functions from the package is moved to asynq package.
    • Inspector.RunTaskByKey is replaced with Inspector.RunTask
    • Inspector.DeleteTaskByKey is replaced with Inspector.DeleteTask
    • Inspector.ArchiveTaskByKey is replaced with Inspector.ArchiveTask
    • WorkerInfo field names have changed.
    • Inspector.CancelActiveTask is renamed to Inspector.CancelProcessing
    • Inspector List methods (e.g. ListActiveTasks) returns slice of TaskInfo

    CLI changes

    • asynq task delete|run|archive commands takes take ID as well as the queue name (previously required task key) For example:
    asynq task delete --queue=QUEUE --id=TASK_ID
    

    Web UI (asynqmon) changes

    • Payload now shows "non-printable bytes" if the payload bytes are not human readable (e.g. binary format)

    Redis

    • Requires redis v4.0+ for multiple field/value pair support
    • Internal redis keys/values has changed (please see the migration guide)
    Source code(tar.gz)
    Source code(zip)
  • v0.17.2(Jun 6, 2021)

  • v0.17.1(Apr 4, 2021)

  • v0.17.0(Mar 24, 2021)

    This release includes one minor update to RedisConnOpt to support redis connection timeouts.

    Added

    • DialTimeout, ReadTimeout, and WriteTimeout options are added to RedisConnOpt types.
    Source code(tar.gz)
    Source code(zip)
  • v0.16.1(Mar 20, 2021)

  • v0.16.0(Mar 11, 2021)

  • v0.15.0(Jan 31, 2021)

    IMPORTATNT: All Inspector related code are moved to subpackage "github.com/hibiken/asynq/inspeq"

    Changed

    • Inspector related code are moved to subpackage "github.com/hibken/asynq/inspeq".
    • RedisConnOpt interface has changed slightly. If you have been passing RedisClientOpt, RedisFailoverClientOpt, or RedisClusterClientOpt as a pointer, update your code to pass as a value.
    • ErrorMsg field in RetryTask and ArchivedTask was renamed to LastError.

    Added

    • MaxRetry, Retried, LastError fields were added to all task types returned from Inspector.
    • MemoryUsage field was added to QueueStats.
    • DeleteAllPendingTasks, ArchiveAllPendingTasks were added to Inspector
    • DeleteTaskByKey and ArchiveTaskByKey now supports deleting/archiving PendingTask.
    • asynq CLI now supports deleting/archiving pending tasks.
    Source code(tar.gz)
    Source code(zip)
  • v0.14.0(Jan 14, 2021)

    IMPORANT This release includes a breaking change, please install the latest version of CLI and run asynq migrate command.

    Changed

    • Renamed DeadTask to ArchivedTask.
    • Renamed the operation Kill to Archive in Inpsector.
    • Print stack trace when Handler panics.
    • Includes a file name and a line number in the error message when recovering from a panic.

    Added

    • DefaultRetryDelayFunc is now a public API, which can be used in the custom RetryDelayFunc.
    • SkipRetry error is added to be used as a return value from Handler.
    • Servers method is added to Inspector
    • CancelActiveTask method is added to Inspector.
    • ListSchedulerEnqueueEvents method is added to Inspector.
    • SchedulerEntries method is added to Inspector.
    • DeleteQueue method is added to Inspector.
    Source code(tar.gz)
    Source code(zip)
  • v0.13.1(Nov 22, 2020)

    Fixed

    • Fixed shutdown timeout in processor. Shutdown timeout can be configured in Config.ShutdownTimeout and defaults to 8s if not specified.
    Source code(tar.gz)
    Source code(zip)
  • v0.13.0(Oct 13, 2020)

    Version 0.13.0 Adds Scheduler type for Periodic tasks

    Added

    • Scheduler type is added to enable periodic tasks. See the godoc for its APIs and wiki for the getting-started guid

    Changed

    • The interface Option has changed. See the godoc for the new interface. This would have no impact as long as you were using the exported functions to create Option (e.g. MaxRetry, Queue, etc)
    Source code(tar.gz)
    Source code(zip)
  • v0.12.0(Sep 12, 2020)

    Version 0.12.0 adds support for Redis Cluster and includes a number of breaking changes.

    IMPORTANT: If you are upgrading from a previous version, please install the latest version of the CLI go get -u github.com/hibiken/asynq/tools/asynq and run asynq migrate command. No process should be writing to Redis while you run the migration command.

    The semantics of queue have changed

    Previously, we called tasks that are ready to be processed "Enqueued tasks", and other tasks that are scheduled to be processed in the future "Scheduled tasks", etc. We changed the semantics of "Enqueue" slightly; All tasks that client pushes to Redis are Enqueued to a queue. Within a queue, tasks will transition from one state to another. Possible task states are:

    • Pending: task is ready to be processed (previously called "Enqueued")
    • Active: tasks is currently being processed (previously called "InProgress")
    • Scheduled: task is scheduled to be processed in the future
    • Retry: task failed to be processed and will be retried again in the future
    • Dead: task has exhausted all of its retries and stored for manual inspection purpose

    These semantics change is reflected in the new Inspector API and CLI commands.


    Changed

    Client

    Use ProcessIn or ProcessAt option to schedule a task instead of EnqueueIn or EnqueueAt.

    | Previously | v0.12.0 | |---|---| | client.EnqueueAt(t, task) |client.Enqueue(task, asynq.ProcessAt(t)) |
    | client.EnqueueIn(d, task)| client.Enqueue(task, asynq.ProcessIn(d)) |

    Inspector

    All Inspector methods are scoped to a queue, and the methods take qname (string) as the first argument.
    EnqueuedTask is renamed to PendingTask and its corresponding methods.
    InProgressTask is renamed to ActiveTask and its corresponding methods.
    Command "Enqueue" is replaced by the verb "Run" (e.g. EnqueueAllScheduledTasks --> RunAllScheduledTasks)

    CLI

    CLI commands are restructured to use subcommands. Commands are organized into a few management commands: To view details on any command, use asynq help <command> <subcommand>.

    • asynq stats
    • asynq queue [ls inspect history rm pause unpause]
    • asynq task [ls cancel delete kill run delete-all kill-all run-all]
    • asynq server [ls]

    Added

    RedisConnOpt

    • RedisClusterClientOpt is added to connect to Redis Cluster.
    • Username field is added to all RedisConnOpt types in order to authenticate connection when Redis ACLs are used.

    Client

    • ProcessIn(d time.Duration) Option and ProcessAt(t time.Time) Option are added to replace EnqueueIn and EnqueueAt functionality.

    Inspector

    • Queues() ([]string, error) method is added to get all queue names.
    • ClusterKeySlot(qname string) (int64, error) method is added to get queue's hash slot in Redis cluster.
    • ClusterNodes(qname string) ([]ClusterNode, error) method is added to get a list of Redis cluster nodes for the given queue.
    • Close() error method is added to close connection with redis.

    Handler

    • GetQueueName(ctx context.Context) (string, bool) helper is added to extract queue name from a context.
    Source code(tar.gz)
    Source code(zip)
  • v0.11.0(Jul 29, 2020)

    Added

    • Inspector type was added to monitor and mutate state of queues and tasks. See the godoc for details.

    • HealthCheckFunc and HealthCheckInterval fields were added to Config to allow user to specify a callback function to check for broker connection.

    Source code(tar.gz)
    Source code(zip)
  • v0.10.0(Jul 6, 2020)

    Added Features

    • Automatic recovery of tasks in the event of a worker crash
    • Automatic retry of tasks which exceeded its timeout/deadline

    Version 0.10.0 includes the following API changes:

    • (*Client).Enqueue, (*Client).EnqueueIn, and (*Client).EnqueueAt has changed to return a *Result and error. A Result struct contains metadata about task that was enqueued (e.g. ID, Queue, etc).
    • ErrorHandler signature has changed to func(context.Context, *Task, error).

    Version 0.10.0 includes the following semantics changes:

    • All tasks now require timeout or deadline. By default, timeout is set to 1800 seconds(30 mins) if neither of them are specified.
    • Tasks that exceed its deadline are automatically retried. In the previous versions, User provided Handler needed to explicitly return an error when ctx.Done channel is closed. In the new version, this is taken care of by the library. In order to avoid processing tasks when its deadline is exceeded, Handler should always check ctx.Done channel and stop processing when the channel is closed.

    Other important changes:

    • Please upgrade to the new version of asynq CLI which is compatible with the new version of the library.
    • Encoding schema for messages has changed. Please install the latest CLI and run migrate command if you have tasks enqueued with the previous version of asynq.
    Source code(tar.gz)
    Source code(zip)
  • v0.10.0.rc1(Jul 4, 2020)

    Beta version for v0.10 includes the following API changes:

    • (*Client).Enqueue, (*Client).EnqueueIn, and (*Client).EnqueueAt has changed to return a *Result and error. A Result struct contains metadata about task that was enqueued (e.g. ID, Queue, etc).
    • ErrorHandler signature has changed to func(context.Context, *Task, error). Please use GetRetryCount(ctx) and/or GetMaxRetry(ctx) to get the count values that was part of the argument list in the previous versions.

    Beta version for v0.10 includes the following semantics changes:

    • All tasks now require timeout or deadline. By default, timeout is set to 1800 seconds(i.e. 30 mins) if none of them are specified.
    • Tasks that exceed its deadline are automatically retried. In the previous versions, User provided Handler needed to explicitly return an error when ctx.Done channel is closed. In the new version, this is taken care of by the library. In order to avoid processing tasks when its deadline is exceeded, Handler should always check ctx.Done channel and stop processing when the channel is closed.

    Other important changes:

    • Please upgrade to the new version of asynq CLI which is compatible with the new version of the library.
    • Encoding schema for messages has changed. Please install the latest CLI and run migrate command if you have tasks enqueued with the previous version of asynq.
    Source code(tar.gz)
    Source code(zip)
Owner
Ken Hibino
Ken Hibino
GTA(Go Task Async) is a lightweight reliable asynchronous task and transaction message library for Golang

GTA (Go Task Async) is a lightweight and reliable asynchronous task and transaction message library for by golang.

Kevin Su 12 Jun 4, 2022
A lightweight, distributed and reliable message queue based on Redis

nmq A lightweight, distributed and reliable message queue based on Redis Get Started Download go get github.com/inuggets/nmq Usage import "github.com

Nuggets 2 Nov 22, 2021
The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

houseme 5 Oct 20, 2022
RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue

RapidMQ RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue in the Go programming langu

Vadim Shakun 65 Sep 27, 2022
A Multi Consumer per Message Queue with persistence and Queue Stages.

CrimsonQ A Multi Consumer per Message Queue with persistence and Queue Stages. Under Active Development Crimson Queue allows you to have multiple cons

Yousef Wadi 11 Jul 30, 2022
Go client to reliable queues based on Redis Cluster Streams

Ami Go client to reliable queues based on Redis Cluster Streams. Consume/produce performance Performance is dependent from: Redis Cluster nodes count;

Andrey Kuzmin 24 Sep 27, 2022
💨A well crafted go packages that help you build robust, reliable, maintainable microservices.

Hippo A Microservices Toolkit. Hippo is a collection of well crafted go packages that help you build robust, reliable, maintainable microservices. It

Ahmed 141 Aug 11, 2022
A single binary, simple, message queue.

MiniQueue A stupid simple, single binary message queue using HTTP/2. Most messaging workloads don't require enormous amounts of data, endless features

Tom Arrell 125 Nov 9, 2022
A simple persistent directory-backed FIFO queue.

pqueue pqueue is a simple persistent directory-backed FIFO queue. It provides the typical queue interface Enqueue and Dequeue and may store any byte s

Philipp C. Heckel 4 May 7, 2022
Simple docker container to publish a fixed message to a specified queue. Created to be used with k8s CRON scheduling.

RabbitMQ Publish CRON Simple docker container to publish a fixed message to a specified rabbitmq exchange. Created to be used as part of a Kubernetes

Daniel Emery 0 Dec 20, 2021
redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue redisqueue provides a producer and consumer of a queue that uses Redis streams. Features A Producer struct to make enqueuing messages easy.

Robin Joseph 95 Nov 1, 2022
Kudruk helps you to create queue channels and manage them gracefully.

kudruk Channels are widely used as queues. kudruk (means queue in Turkish) helps you to easily create queue with channel and manage the data in the qu

Erhan Yakut 8 Feb 21, 2022
Chanman helps you to create queue channels and manage them gracefully.

chanman Channels are widely used as queues. chanman (Channel Manager) helps you to easily create queue with channel and manage the data in the queue.

Erhan Yakut 9 Oct 16, 2021
A basic event queue (and publisher/subscriber) in go

queue A basic event queue (and publisher/subscriber) in go. Installation go get github.com/jimjibone/queue Queue Usage Queue is a channel-based FIFO q

James Reuss 0 Dec 17, 2021
implentacion queue in kafka, rabbit and sqs

Big Queue on Go This is a simple big queue and implementation in kafka, rabbit and aws sqs. Publish in a topic in kafka: Use NewPublisher method to cr

Patricia Bonaldy 0 Dec 29, 2021
dque is a fast, embedded, durable queue for Go

dque - a fast embedded durable queue for Go dque is: persistent -- survives program restarts scalable -- not limited by your RAM, but by your disk spa

Jon Carlson 707 Nov 16, 2022
Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Vladimir Garvardt 123 Nov 22, 2022
Queue with NATS Jetstream to remove all the erlangs from cloud

Saf in Persian means Queue. One of the problems, that we face on projects with queues is deploying RabbitMQ on the cloud which brings us many challenges for CPU load, etc. I want to see how NATS with Jetstream can work as the queue to replace RabbitMQ.

Parham Alvani 10 Nov 9, 2022
A fast durable queue for Go

pqueue - a fast durable queue for Go pqueue is thread-safety, serves environments where more durability is required (e.g., outages last longer than me

Linh Tran Tuan 21 Oct 16, 2022