Asynq: simple, reliable, and efficient distributed task queue in Go

Overview

Asynq

Build Status GoDoc Go Report Card License: MIT Gitter chat

Overview

Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.

Highlevel overview of how Asynq works:

  • Client puts task on a queue
  • Server pulls task off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines.
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Task Queue Diagram

Stability and Compatibility

Important Note: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

Features

Quickstart

First, make sure you are running a Redis server locally.

$ redis-server

Next, write a package that encapsulates task creation and task handling.

package tasks

import (
    "fmt"

    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
    payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
    return asynq.NewTask(TypeEmailDelivery, payload)
}

func NewImageResizeTask(src string) *asynq.Task {
    payload := map[string]interface{}{"src": src}
    return asynq.NewTask(TypeImageResize, payload)
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    userID, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    tmplID, err := t.Payload.GetString("template_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
    // Email delivery code ...
    return nil
}

// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
    // ... fields for struct
}

func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    src, err := t.Payload.GetString("src")
    if err != nil {
        return err
    }
    fmt.Printf("Resize image: src = %s\n", src)
    // Image resizing code ...
    return nil
}

func NewImageProcessor() *ImageProcessor {
    // ... return an instance
}

In your application code, import the above package and use Client to put tasks on the queue.

package main

import (
    "fmt"
    "log"
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}
    c := asynq.NewClient(r)
    defer c.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    t := tasks.NewEmailDeliveryTask(42, "some:template:id")
    res, err := c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    t = tasks.NewEmailDeliveryTask(42, "other:template:id")
    res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal("could not schedule task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)

    // ---------------------------------------------------------------------------
    // Example 4: Pass options to tune task processing behavior at enqueue time.
    //            Options passed at enqueue time override default ones, if any.
    // ---------------------------------------------------------------------------

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)
}

Next, start a worker server to process these tasks in the background.
To start the background workers, use Server and provide your Handler to process the tasks.

You can optionally use ServeMux to create a handler, just as you would with "net/http" Handler.

package main

import (
    "log"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}

    srv := asynq.NewServer(r, asynq.Config{
        // Specify how many concurrent workers to use
        Concurrency: 10,
        // Optionally specify multiple queues with different priority.
        Queues: map[string]int{
            "critical": 6,
            "default":  3,
            "low":      1,
        },
        // See the godoc for other configuration options
    })

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

For a more detailed walk-through of the library, see our Getting Started Guide.

To Learn more about asynq features and APIs, see our Wiki and godoc.

Web UI

Asynqmon is a web based tool for monitoring and administrating Asynq queues and tasks. Please see the tool's README for details.

Here's a few screenshots of the web UI.

Queues view
Web UI QueuesView

Tasks view
Web UI TasksView

Command Line Tool

Asynq ships with a command line tool to inspect the state of queues and tasks.

Here's an example of running the stats command.

Gif

For details on how to use the tool, refer to the tool's README.

Installation

To install asynq library, run the following command:

go get -u github.com/hibiken/asynq

To install the CLI tool, run the following command:

go get -u github.com/hibiken/asynq/tools/asynq

Requirements

Dependency Version
Redis v3.0+
Go v1.13+

Contributing

We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community. Please see the Contribution Guide before contributing.

Acknowledgements

  • Sidekiq : Many of the design ideas are taken from sidekiq and its Web UI
  • RQ : Client APIs are inspired by rq library.
  • Cobra : Asynq CLI is built with cobra

License

Asynq is released under the MIT license. See LICENSE.

Issues
  • [FEATURE REQUEST] Instrumentation/Tracing for asynq redis client

    [FEATURE REQUEST] Instrumentation/Tracing for asynq redis client

    Is your feature request related to a problem? Please describe.

    • I want to add tracing to the underlying redis client instance using dd-trace-go.
    • Is it possible to get access to the underlying redis instance so I can wrap the client with a function like the one below?
    import (
    	"github.com/go-redis/redis/v7"
    	ddRedis "gopkg.in/DataDog/dd-trace-go.v1/contrib/go-redis/redis.v7"
    )
    
    func TraceClient(rClient redis.UniversalClient) {
    	ddRedis.WrapClient(
    		rClient,
    		ddRedis.WithServiceName(config.ServiceName()+"-redis"),
    		ddRedis.WithAnalytics(config.Enabled()),
    	)
    }
    
    

    Describe alternatives you've considered

    • I am also looking if the way to go about this is creating a RedisConnOpt interface that returns a wrapped redis instance with datadog tracing. What do you think?

    https://pkg.go.dev/gopkg.in/DataDog/[email protected]/contrib/go-redis/redis.v7#WrapClient

    enhancement 
    opened by seanyu4296 24
  • [BUG] The task is always active

    [BUG] The task is always active

    Describe the bug When the task is active, the worker shutdown, then the task will always be active.

    To Reproduce Steps to reproduce the behavior:

    1. Set task sleep times.
    func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
    	id, err := t.Payload.GetInt("user_id")
    	if err != nil {
    		return err
    	}
    	fmt.Printf("[%s] - [Start] Send Welcome Email to User %d\n", time.Now().String(), id)
    	time.Sleep(10*time.Second)
    	fmt.Printf("[%s] - [ End ] Send Welcome Email to User %d\n", time.Now().String(), id)
    	return nil
    }
    
    1. Start the worker.
    2. Start the scheduler.
    3. When the task is active, shutdown the worker.
    4. The task will always be active.

    Expected behavior I think the task should change the state to failed based on Timeout, MaxRetry, etc.

    Screenshots If applicable, add screenshots to help explain your problem.

    Environment (please complete the following information):

    • OS: Windows
    • Version of asynq package: v0.17.2
    bug 
    opened by GoneGo1ng 22
  • [QUESTION] i think set stop in run function is not a good idea

    [QUESTION] i think set stop in run function is not a good idea

    https://github.com/hibiken/asynq/blob/master/background.go#L253

    normally, packages has two methods, Run and Stop, so i can cleanup other resources before or after Stop(), another scene is that user will run multipe backgroud server in main(), like kafka consumer

    enhancement idea 
    opened by yxlimo 17
  • [FEATURE REQUEST] Processed job's payload and result

    [FEATURE REQUEST] Processed job's payload and result

    Is your feature request related to a problem? Please describe. Asynq currently removes the successfully processed jobs from Redis. That means the information about the processed jobs also vanishes like;

    • When did the client pushed the job into the queue
    • When is the job completed.
    • Is a given job id processed successfully.

    Describe the solution you'd like It would be nice to keep processed jobs until a given TTL and return a basic struct that contains something like id, queued_at, completed_at. I think to achieve that, the first thing is to generate a unique id for each task and keep them like asynq:{default}:processed:{job_id}

    Describe alternatives you've considered Python RQ or Laravel Horizon might be good examples to take a look at.

    enhancement 
    opened by unicod3 15
  • [Question] Are there TTL set for tasks? How long does it stay in Redis?

    [Question] Are there TTL set for tasks? How long does it stay in Redis?

    Asking because i cannot find this in the wiki and others might find it useful.

    Specifically, How long do archived tasks stay in Redis so we can keep track or retry manually?

    enhancement 
    opened by seanyu4296 13
  • [FEATURE REQUEST] WebUI of server states (like sidekiq's UI)

    [FEATURE REQUEST] WebUI of server states (like sidekiq's UI)

    We could offer a cmd for serving a comprehensive Web UI for management and monitoring

    Describe the solution you'd like maybe we can add a sub cmd in asynq. eg. asynq webui.

    eg. https://raw.githubusercontent.com/mperham/sidekiq/master/examples/web-ui.png

    enhancement 
    opened by zhaolion 11
  • [Proposal] Change Payload to bytes & Add Wrapper around Key-Value pair payload

    [Proposal] Change Payload to bytes & Add Wrapper around Key-Value pair payload

    Is your feature request related to a problem? Please describe. I'm always frustrated when I need to call GetX method for all the values in Payload to get all the data (also need to make sure that I don't make a typo in payload key string). It'd be nice if asynq supported en/decoding protobuf or gob message directly so that I can load that data to an object with one method call.

    Describe the solution you'd like initial proposal:

    type EmailTaskPayload struct {
        UserID int
    }
    
    // client side.
    func main() {
         c := asynq.NewClient(asynq.RedisClientOpt{Addr: ":6379"})
    
        t := asynq.NewTask("email:welcome", EmailTaskPayload{UserID: 42})
        err := c.Enqueue(t)
        if err != nil {
            log.Fatalf("could not enqueue task: %v", err)
        }
    }
    
    // background workers side
    func main() {
        bg := asynq.NewBackground(r, &asynq.Config{
            Concurrency: 10,
        })
    
        mux := asynq.NewMux()
        mux.HandleFunc("email:welcome", emailHandler)
    
        bg.Run(mux)
    }
    
    func emailHandler(ctx context.Context, t *asynq.Task) error {
        var p EmailTaskPayload
        err := t.ReadPayload(&p)
        if err != nil {
            return err
        }
        fmt.Printf("Send Email to User %d\n", p.UserID)
        return nil
    }
    

    Describe alternatives you've considered None for now.

    Additional context None for now.

    enhancement idea 
    opened by hibiken 11
  • [BUG] The high priority queue is process very slow compare to lower queue

    [BUG] The high priority queue is process very slow compare to lower queue

    Describe the bug The high priority queue is processed very slow compare to the lower queue

    To Reproduce Steps to reproduce the behavior (Code snippets if applicable):

    • Run a long-running task on the default queue. Many small but critical tasks will issue from here and added to the critical queue
    • The default queue is hold all the time of worker pools
    • The critical queue is stuck, and no task is process

    Expected behavior

    • The critical queue's task need to be selected to process first and clean out quickly

    Screenshots

    • Setup server with 3 queue Screen Shot 2021-08-20 at 09 38 28

    • Enqueue critical task like that Screen Shot 2021-08-20 at 09 41 06

    • The critical queue stuck Screen Shot 2021-08-20 at 09 37 49

    • The default queue is running on all other workers Screen Shot 2021-08-20 at 09 37 42

    Environment (please complete the following information):

    • OS: Ubuntu 18.4
    • Version of asynq package 0.18.3

    Screen Shot 2021-08-20 at 09 47 04

    bug 
    opened by duyhungtnn 10
  • add UniqueKey option to extend Unique's behaviour

    add UniqueKey option to extend Unique's behaviour

    Instead of (queue, type, payload) use any custom key. This allows to enforce unique tasks not limited to same payload and/or type.

    Tested with --redis_cluster flag.

    opened by benjajaja 9
  • [FEATURE REQUEST] Distributed concurrency support per queue / taskType

    [FEATURE REQUEST] Distributed concurrency support per queue / taskType

    Is your feature request related to a problem? Please describe. N/A

    Describe the solution you'd like Is it possible to set concurrency individually for every queue like it is supported here.

    Describe alternatives you've considered Currently I tried to wire up the server in a way where I calculate total concurrency and set max priority and assuming that the combination will provide some sort of similar effects as in gocraft/work. Storing the maxConcurrency and priority info per queue and using it for initialisation of the server when starting the adapter (a simple wrapper over asynq).

    type Adapter struct {
    	taskConfigMap map[string]taskConfig
    	tcMutex       sync.RWMutex
    }
    
    type taskConfig struct {
    	priority       int
    	maxConcurrency int
    }
    
    func (a *Adapter) server() *asynq.Server {
    	return asynq.NewServer(a.redisConnOpt, asynq.Config{
    		Concurrency:    a.concurrency(),
    		Queues:         a.queuePriorityMap(),
    	})
    }
    
    func (a *Adapter) queuePriorityMap() map[string]int {
    	a.tcMutex.RLock()
    	defer a.tcMutex.RUnlock()
    
    	m := make(map[string]int)
    
    	for tn, cfg := range f.taskConfigMap {
    		if cfg.priority == 0 {
    			m[tn] = 1
    			continue
    		}
    
    		m[tn] = cfg.priority
    	}
    
    	return m
    }
    
    func (a *Adapter) concurrency() int {
    	a.tcMutex.RLock()
    	defer a.tcMutex.RUnlock()
    
    	concurrency := 0
    	for _, cfg := range f.taskConfigMap {
    		if cfg.maxConcurrency == 0 {
    			concurrency++
    			continue
    		}
    
    		concurrency += cfg.maxConcurrency
    	}
    
    	if rtcpu := runtime.NumCPU(); concurrency < rtcpu {
    		return rtcpu
    	}
    
    	return concurrency
    }
    

    Additional context Ref: Priority Sampler from gocraft/work

    enhancement 
    opened by ajatprabha 9
  • [FEATURE REQUEST] Support for ULID in TaskMessage ID field

    [FEATURE REQUEST] Support for ULID in TaskMessage ID field

    Hey there,

    Was wondering if we could add support for ULID in the TaskMessage ID field. The first 10 characters of each ULID string encode timestamp information, which would enable the sorting of tasks by ID.

    There is another library (albeit 3rd party) that implements ULIDs with a similar API to the UUID library, though it would require a little work to to make the two interchangeable within asynq.

    • Client.go L:299 calls uuid.New, which takes no arguments, while ulid.New needs a uint64 timestamp and an io.Reader as an entropy source. time.Now() and rand.Reader (or nil) seem like sensible defaults.
    • Base.go L:264 calls uuid.MustParse. which returns a UUID type while ulid.MustParse returns a ULID type.

    Given the two differences above, it looks like a custom type would have to be implemented to support both, but it shouldn't be too much of a hassle.

    Happy to submit a PR to help implement if needed.

    Cheers.

    enhancement 
    opened by dhh93 9
  • Dynamic periodic task sync interval

    Dynamic periodic task sync interval

    Hello! Ken Hibino

    Dynamic periodic task sync interval . Can you control whether the program needs to be loaded when it runs? There may be too many tasks, and the tasks will not change frequently. Frequent loading may be unfriendly to the program.

    enhancement 
    opened by tinnkai 0
  • [BUG] The task cannot be resumed after the worker is shut down while the task is running

    [BUG] The task cannot be resumed after the worker is shut down while the task is running

    Describe the bug When the task is running, I try to shut down the worker, but restarting the worker task cannot recover normally, and the status of the Asynq Monitoring task is active. and cannot be closed

    To Reproduce Steps to reproduce the behavior (Code snippets if applicable):

    go tasks.TaskWorker()
    

    TaskWorker

    	// start server
    	if err := srv.Start(mux); err != nil {
    		common.LOG.Error(fmt.Sprintf("could not start server: %v", err))
    	}
    
    	quit := make(chan os.Signal, 1)
    	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
    	<-quit // wait for termination signal
    	log.Println("[asynq] Receive exit signal task worker exit")
    	srv.Shutdown()
    
    

    try restarting the asynq worker

    2022/08/03 - 14:29:31.445       INFO    /Users/micheng/code/luban/controller/application/deploy.go:297  2022-08-03 14:29:31 - [task-rpc] - [updated_replicas:1, replicas:2, available_replicas:1, observed_generation:189] waiting...
    2022/08/03 - 14:29:34.577       INFO    /Users/micheng/code/luban/controller/application/deploy.go:297  2022-08-03 14:29:34 - [task-rpc] - [updated_replicas:1, replicas:2, available_replicas:1, observed_generation:189] waiting...
    2022/08/03 - 14:29:37.699       INFO    /Users/micheng/code/luban/controller/application/deploy.go:297  2022-08-03 14:29:37 - [task-rpc] - [updated_replicas:1, replicas:2, available_replicas:1, observed_generation:189] waiting...
    2022/08/03 - 14:29:40.803       INFO    /Users/micheng/code/luban/controller/application/deploy.go:297  2022-08-03 14:29:40 - [task-rpc] - [updated_replicas:1, replicas:2, available_replicas:1, observed_generation:189] waiting...
    [GIN] 2022/08/03 - 14:29:41 | 200 |   26.725875ms |             ::1 | GET      "/task/monitor/api/queues/critical/completed_tasks?page=1&size=100"
    Receive SIGTERM ,exiting gracefully....
    2022/08/03 14:29:43 [asynq] Receive exit signal task scheduler exit
    2022/08/03 14:29:43 [asynq] Receive exit signal task worker exit
    asynq: pid=49878 2022/08/03 06:29:43.051790 INFO: Starting graceful shutdown
    asynq: pid=49878 2022/08/03 06:29:43.051778 INFO: Scheduler shutting down
    2022/08/03 14:29:43 [agent] Receive exit signal web server exit
    2022/08/03 14:29:43 Shutting down server...
    2022/08/03 14:29:43 [agent] Receive exit signal rpc server exit
    

    Expected behavior A clear and concise description of what you expected to happen.

    Screenshots image

    Environment (please complete the following information):

    • OS: MacOS
    • Version asynq v0.19.0 asynqmon v0.4.0
    bug 
    opened by zijiwork 1
  • [BUG] The cancellation of the task has not taken effect, and the task continues to run

    [BUG] The cancellation of the task has not taken effect, and the task continues to run

    Describe the bug Maximum retry setting 0, I tried to cancel the task, but the running task is still running and has not been canceled image image

    I tried to cancel the task with tools and webui, but they all failed。I don't know how to change it. Can you give me an example?

    bug 
    opened by dxl3811051 1
  • How to create a queue based on user id

    How to create a queue based on user id

    how can I create a queue based on the user id with the same task type example

    USER A has a list of tasks 1,2,3 USER B has a list of tasks 4,5,6 USER C has a task list of 7,8,9

    it should be like this

    1. A 1
    2. B 4
    3. C 7

    It's not like this

    1. A 1
    2. A 2
    3. A 3

    conditions are

    1. One user can only run one task at the same time
    2. Maximum Concurrency is 10
    3. Only one task type
    opened by arioki1 0
  • What does Asynq Logs Signify?

    What does Asynq Logs Signify?

    I have a job queue for video processing and batch SMS sends. It works but I want to be more clear on what the logs mean. When my server starts I run this in main()

    go sms_workers.BuildWorkerServer()   // Server for SMS job queue
    go video_workers.BuildWorkerServer() // Server for video job queue
    

    Both run this code:

    func BuildWorkerServer() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: jobQueueAddr}, asynq.Config{ Concurrency: 10, // total concurrent workers Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, }, }, ) mux := asynq.NewServeMux() mux.HandleFunc(video_queue.TypeVideoProcess, video_queue.HandleVideoProcessTask) if err := srv.Run(mux); err != nil { log.Printf("Could not run Job Queue Server: %v", err) } }

    I see this in my log:

    asynq: pid=41428 2022/07/16 22:27:11.256248 INFO: Starting processing
    asynq: pid=41428 2022/07/16 22:27:11.256248 INFO: Starting processing
    asynq: pid=41428 2022/07/16 22:27:11.257265 INFO: Send signal TERM or INT to terminate the process
    asynq: pid=41428 2022/07/16 22:27:11.256762 INFO: Send signal TERM or INT to terminate the process
    

    Please advise. I am running each server using go routines so Im thinking the pid's should be different? The goal is for concurrency in this scenario. Havent faced any issues but I havent tested for scale (in interest of money, each process is expensive as it is connected to AWS Rekognition $$$$$)

    opened by lovgrandma 0
Releases(v0.23.0)
  • v0.23.0(Apr 12, 2022)

    This version adds Task Aggregation feature and includes a few improvements and fixes.

    Task Aggregation

    This is a new feature which allows you to enqueue multiple tasks successively, and have them passed to the Handler together rather than individually. The feature allows you to batch multiple successive operations into one, in order to save on costs, optimize caching, or batch notifications, for example. See the Wiki page for details.


    Added

    • Group option is introduced to enqueue task in a group.
    • GroupAggregator and related types are introduced for task aggregation feature.
    • GroupGracePeriod, GroupMaxSize, GroupMaxDelay, and GroupAggregator fields are added to Config.
    • Inspector has new methods related to "aggregating tasks".
    • Group field is added to TaskInfo.
    • (CLI): group ls command is added
    • (CLI): task ls supports listing aggregating tasks via --state=aggregating --group=<GROUP> flags
    • Enable rediss url parsing support: Thanks to @eleboucher

    Fixed

    • Fixed overflow issue with 32-bit systems (For details, see https://github.com/hibiken/asynq/pull/426): Thanks to @assimon!
    Source code(tar.gz)
    Source code(zip)
  • v0.22.1(Feb 20, 2022)

  • v0.22.0(Feb 19, 2022)

    This version improves worker crash recovery by introducing a concept of worker lease for an active task. It also adds an optional field to Config to customize the context passed to the Handler.

    Important Note: Since this version changes the logic of crash recovery, update of the library version should be done with a clean restart of the process. In other words, please make sure that the process running asynq.Server shutdown cleanly before restarting the process with this new version.

    Added

    • BaseContext is introduced in Config to specify callback hook to provide a base context from which Handler context is derived
    • IsOrphaned field is added to TaskInfo to describe a task left in active state with no worker processing it.

    Changed

    • Server now recovers tasks with an expired lease. Recovered tasks are retried/archived with ErrLeaseExpired error.
    Source code(tar.gz)
    Source code(zip)
  • v0.21.0(Jan 22, 2022)

    Added

    • PeriodicTaskManager is added. Prefer using this over Scheduler as it has better support for dynamic periodic tasks. See the wiki page for an example of using PeriodicTaskManager
    • The asynq stats command now supports a --json option, making its output a JSON object
    • Introduced new configuration for DelayedTaskCheckInterval. See godoc for more details.
    Source code(tar.gz)
    Source code(zip)
  • v0.20.0(Dec 19, 2021)

    This release includes changes to support Prometheus integration in the Web UI. The integration is optional, there shouldn't be any changes in performance or behavior of the library in this release.

    Added

    • Package x/metrics is added.
    • Tool tools/metrics_exporter binary is added.
    • ProcessedTotal and FailedTotal fields were added to QueueInfo struct.
    Source code(tar.gz)
    Source code(zip)
  • v0.19.1(Dec 12, 2021)

    This release includes a few additions and one fix.

    Added

    • Latency field is added to QueueInfo.
    • EnqueueContext method is added to Client.

    Fixed

    • Fixed an error when user pass a duration less than 1s to Unique option
    Source code(tar.gz)
    Source code(zip)
  • v0.19.0(Nov 6, 2021)

    This release includes an introduction of task retention after successful processing. With Retention option, you can specify how long the task should be retained in the queue as a completed task.

    Changed

    • NewTask takes Option as variadic argument
    • Bumped minimum supported go version to 1.14 (i.e. go1.14 or higher is required).

    Added

    • Retention option is added to allow user to specify task retention duration after completion.
    • TaskID option is added to allow user to specify task ID.
    • ErrTaskIDConflict sentinel error value is added.
    • ResultWriter type is added and provided through Task.ResultWriter method.
    • TaskInfo has new fields CompletedAt, Result and Retention.

    Removed

    • Client.SetDefaultOptions is removed. Use NewTask instead to pass default options for tasks.
    Source code(tar.gz)
    Source code(zip)
  • v0.18.6(Oct 3, 2021)

    This release fixes CLI import issue

    Changed

    • Updated github.com/go-redis/redis version to v8

    Fixes

    • Fixes tool import issue described in #325
    Source code(tar.gz)
    Source code(zip)
  • v0.18.5(Sep 1, 2021)

    This release includes one addition to the server config.

    Added

    IsFailure config option is added to Config to determine whether error returned from Handler counts as a failure.

    Source code(tar.gz)
    Source code(zip)
  • v0.18.4(Aug 18, 2021)

  • v0.18.3(Aug 9, 2021)

    This release includes minor change around task typename and critical performance fix for tooling (CLI, Web UI).

    Changed

    • Changed Queue function to not to convert the provided queue name to lowercase. Queue names are now case-sensitive.
    • QueueInfo.MemoryUsage is now an approximate usage value.

    Fixed

    • Fixed latency issue around memory usage (see https://github.com/hibiken/asynq/issues/309)
    Source code(tar.gz)
    Source code(zip)
  • v0.18.2(Jul 15, 2021)

  • v0.18.1(Jul 4, 2021)

    This release includes a minor change and critical fix in the task recovering logic (tasks left in active state due to server crash).

    Changed

    • Changed to execute task recovering logic when server starts up; Previously it needed to wait for a minute for task recovering logic to exeucte.

    Fixed

    • Fixed task recovering logic to execute every minute
    Source code(tar.gz)
    Source code(zip)
  • v0.18.0(Jun 29, 2021)

    Asynq 0.18 includes major API changes and changes to the tooling (WebUI and CLI). The inspeq sub package is removed, and all types and functions from the package are moved to asynq package. The upgrade guide is available here

    Package API changes

    Task

    • NewTask function takes array of bytes as payload.
    • Task Type and Payload is accessed by a method call (previously these were fields on Task).

    Server

    • Server API has changed. Renamed Quiet to Stop. Renamed Stop to Shutdown. Note: As a result of this renaming, the behavior of Stop has changed. Please update the existing code to call Shutdown where it used to call Stop.

    Scheduler

    • Renamed Stop to Shutdown.

    Client

    • Client.Enqueue returns TaskInfo (Previously returned Result struct)

    Inspector

    • inspeq package is removed. All types and functions from the package is moved to asynq package.
    • Inspector.RunTaskByKey is replaced with Inspector.RunTask
    • Inspector.DeleteTaskByKey is replaced with Inspector.DeleteTask
    • Inspector.ArchiveTaskByKey is replaced with Inspector.ArchiveTask
    • WorkerInfo field names have changed.
    • Inspector.CancelActiveTask is renamed to Inspector.CancelProcessing
    • Inspector List methods (e.g. ListActiveTasks) returns slice of TaskInfo

    CLI changes

    • asynq task delete|run|archive commands takes take ID as well as the queue name (previously required task key) For example:
    asynq task delete --queue=QUEUE --id=TASK_ID
    

    Web UI (asynqmon) changes

    • Payload now shows "non-printable bytes" if the payload bytes are not human readable (e.g. binary format)

    Redis

    • Requires redis v4.0+ for multiple field/value pair support
    • Internal redis keys/values has changed (please see the migration guide)
    Source code(tar.gz)
    Source code(zip)
  • v0.17.2(Jun 6, 2021)

  • v0.17.1(Apr 4, 2021)

  • v0.17.0(Mar 24, 2021)

    This release includes one minor update to RedisConnOpt to support redis connection timeouts.

    Added

    • DialTimeout, ReadTimeout, and WriteTimeout options are added to RedisConnOpt types.
    Source code(tar.gz)
    Source code(zip)
  • v0.16.1(Mar 20, 2021)

  • v0.16.0(Mar 11, 2021)

  • v0.15.0(Jan 31, 2021)

    IMPORTATNT: All Inspector related code are moved to subpackage "github.com/hibiken/asynq/inspeq"

    Changed

    • Inspector related code are moved to subpackage "github.com/hibken/asynq/inspeq".
    • RedisConnOpt interface has changed slightly. If you have been passing RedisClientOpt, RedisFailoverClientOpt, or RedisClusterClientOpt as a pointer, update your code to pass as a value.
    • ErrorMsg field in RetryTask and ArchivedTask was renamed to LastError.

    Added

    • MaxRetry, Retried, LastError fields were added to all task types returned from Inspector.
    • MemoryUsage field was added to QueueStats.
    • DeleteAllPendingTasks, ArchiveAllPendingTasks were added to Inspector
    • DeleteTaskByKey and ArchiveTaskByKey now supports deleting/archiving PendingTask.
    • asynq CLI now supports deleting/archiving pending tasks.
    Source code(tar.gz)
    Source code(zip)
  • v0.14.0(Jan 14, 2021)

    IMPORANT This release includes a breaking change, please install the latest version of CLI and run asynq migrate command.

    Changed

    • Renamed DeadTask to ArchivedTask.
    • Renamed the operation Kill to Archive in Inpsector.
    • Print stack trace when Handler panics.
    • Includes a file name and a line number in the error message when recovering from a panic.

    Added

    • DefaultRetryDelayFunc is now a public API, which can be used in the custom RetryDelayFunc.
    • SkipRetry error is added to be used as a return value from Handler.
    • Servers method is added to Inspector
    • CancelActiveTask method is added to Inspector.
    • ListSchedulerEnqueueEvents method is added to Inspector.
    • SchedulerEntries method is added to Inspector.
    • DeleteQueue method is added to Inspector.
    Source code(tar.gz)
    Source code(zip)
  • v0.13.1(Nov 22, 2020)

    Fixed

    • Fixed shutdown timeout in processor. Shutdown timeout can be configured in Config.ShutdownTimeout and defaults to 8s if not specified.
    Source code(tar.gz)
    Source code(zip)
  • v0.13.0(Oct 13, 2020)

    Version 0.13.0 Adds Scheduler type for Periodic tasks

    Added

    • Scheduler type is added to enable periodic tasks. See the godoc for its APIs and wiki for the getting-started guid

    Changed

    • The interface Option has changed. See the godoc for the new interface. This would have no impact as long as you were using the exported functions to create Option (e.g. MaxRetry, Queue, etc)
    Source code(tar.gz)
    Source code(zip)
  • v0.12.0(Sep 12, 2020)

    Version 0.12.0 adds support for Redis Cluster and includes a number of breaking changes.

    IMPORTANT: If you are upgrading from a previous version, please install the latest version of the CLI go get -u github.com/hibiken/asynq/tools/asynq and run asynq migrate command. No process should be writing to Redis while you run the migration command.

    The semantics of queue have changed

    Previously, we called tasks that are ready to be processed "Enqueued tasks", and other tasks that are scheduled to be processed in the future "Scheduled tasks", etc. We changed the semantics of "Enqueue" slightly; All tasks that client pushes to Redis are Enqueued to a queue. Within a queue, tasks will transition from one state to another. Possible task states are:

    • Pending: task is ready to be processed (previously called "Enqueued")
    • Active: tasks is currently being processed (previously called "InProgress")
    • Scheduled: task is scheduled to be processed in the future
    • Retry: task failed to be processed and will be retried again in the future
    • Dead: task has exhausted all of its retries and stored for manual inspection purpose

    These semantics change is reflected in the new Inspector API and CLI commands.


    Changed

    Client

    Use ProcessIn or ProcessAt option to schedule a task instead of EnqueueIn or EnqueueAt.

    | Previously | v0.12.0 | |---|---| | client.EnqueueAt(t, task) |client.Enqueue(task, asynq.ProcessAt(t)) |
    | client.EnqueueIn(d, task)| client.Enqueue(task, asynq.ProcessIn(d)) |

    Inspector

    All Inspector methods are scoped to a queue, and the methods take qname (string) as the first argument.
    EnqueuedTask is renamed to PendingTask and its corresponding methods.
    InProgressTask is renamed to ActiveTask and its corresponding methods.
    Command "Enqueue" is replaced by the verb "Run" (e.g. EnqueueAllScheduledTasks --> RunAllScheduledTasks)

    CLI

    CLI commands are restructured to use subcommands. Commands are organized into a few management commands: To view details on any command, use asynq help <command> <subcommand>.

    • asynq stats
    • asynq queue [ls inspect history rm pause unpause]
    • asynq task [ls cancel delete kill run delete-all kill-all run-all]
    • asynq server [ls]

    Added

    RedisConnOpt

    • RedisClusterClientOpt is added to connect to Redis Cluster.
    • Username field is added to all RedisConnOpt types in order to authenticate connection when Redis ACLs are used.

    Client

    • ProcessIn(d time.Duration) Option and ProcessAt(t time.Time) Option are added to replace EnqueueIn and EnqueueAt functionality.

    Inspector

    • Queues() ([]string, error) method is added to get all queue names.
    • ClusterKeySlot(qname string) (int64, error) method is added to get queue's hash slot in Redis cluster.
    • ClusterNodes(qname string) ([]ClusterNode, error) method is added to get a list of Redis cluster nodes for the given queue.
    • Close() error method is added to close connection with redis.

    Handler

    • GetQueueName(ctx context.Context) (string, bool) helper is added to extract queue name from a context.
    Source code(tar.gz)
    Source code(zip)
  • v0.11.0(Jul 29, 2020)

    Added

    • Inspector type was added to monitor and mutate state of queues and tasks. See the godoc for details.

    • HealthCheckFunc and HealthCheckInterval fields were added to Config to allow user to specify a callback function to check for broker connection.

    Source code(tar.gz)
    Source code(zip)
  • v0.10.0(Jul 6, 2020)

    Added Features

    • Automatic recovery of tasks in the event of a worker crash
    • Automatic retry of tasks which exceeded its timeout/deadline

    Version 0.10.0 includes the following API changes:

    • (*Client).Enqueue, (*Client).EnqueueIn, and (*Client).EnqueueAt has changed to return a *Result and error. A Result struct contains metadata about task that was enqueued (e.g. ID, Queue, etc).
    • ErrorHandler signature has changed to func(context.Context, *Task, error).

    Version 0.10.0 includes the following semantics changes:

    • All tasks now require timeout or deadline. By default, timeout is set to 1800 seconds(30 mins) if neither of them are specified.
    • Tasks that exceed its deadline are automatically retried. In the previous versions, User provided Handler needed to explicitly return an error when ctx.Done channel is closed. In the new version, this is taken care of by the library. In order to avoid processing tasks when its deadline is exceeded, Handler should always check ctx.Done channel and stop processing when the channel is closed.

    Other important changes:

    • Please upgrade to the new version of asynq CLI which is compatible with the new version of the library.
    • Encoding schema for messages has changed. Please install the latest CLI and run migrate command if you have tasks enqueued with the previous version of asynq.
    Source code(tar.gz)
    Source code(zip)
  • v0.10.0.rc1(Jul 4, 2020)

    Beta version for v0.10 includes the following API changes:

    • (*Client).Enqueue, (*Client).EnqueueIn, and (*Client).EnqueueAt has changed to return a *Result and error. A Result struct contains metadata about task that was enqueued (e.g. ID, Queue, etc).
    • ErrorHandler signature has changed to func(context.Context, *Task, error). Please use GetRetryCount(ctx) and/or GetMaxRetry(ctx) to get the count values that was part of the argument list in the previous versions.

    Beta version for v0.10 includes the following semantics changes:

    • All tasks now require timeout or deadline. By default, timeout is set to 1800 seconds(i.e. 30 mins) if none of them are specified.
    • Tasks that exceed its deadline are automatically retried. In the previous versions, User provided Handler needed to explicitly return an error when ctx.Done channel is closed. In the new version, this is taken care of by the library. In order to avoid processing tasks when its deadline is exceeded, Handler should always check ctx.Done channel and stop processing when the channel is closed.

    Other important changes:

    • Please upgrade to the new version of asynq CLI which is compatible with the new version of the library.
    • Encoding schema for messages has changed. Please install the latest CLI and run migrate command if you have tasks enqueued with the previous version of asynq.
    Source code(tar.gz)
    Source code(zip)
Owner
Ken Hibino
Ken Hibino
BlobStore is a highly reliable,highly available and ultra-large scale distributed storage system

BlobStore Overview Documents Build BlobStore Deploy BlobStore Manage BlobStore License Overview BlobStore is a highly reliable,highly available and ul

CubeFS 15 Jun 30, 2022
Go Open Source, Distributed, Simple and efficient Search Engine

Go Open Source, Distributed, Simple and efficient full text search engine.

ego 6.1k Aug 8, 2022
Distributed-Services - Distributed Systems with Golang to consequently build a fully-fletched distributed service

Distributed-Services This project is essentially a result of my attempt to under

Hamza Yusuff 6 Jun 1, 2022
Fast, efficient, and scalable distributed map/reduce system, DAG execution, in memory or on disk, written in pure Go, runs standalone or distributedly.

Gleam Gleam is a high performance and efficient distributed execution system, and also simple, generic, flexible and easy to customize. Gleam is built

Chris Lu 3.1k Aug 11, 2022
Sandglass is a distributed, horizontally scalable, persistent, time sorted message queue.

Sandglass is a distributed, horizontally scalable, persistent, time ordered message queue. It was developed to support asynchronous tasks and message

Sandglass 1.5k Aug 12, 2022
Distributed lock manager. Warning: very hard to use it properly. Not because it's broken, but because distributed systems are hard. If in doubt, do not use this.

What Dlock is a distributed lock manager [1]. It is designed after flock utility but for multiple machines. When client disconnects, all his locks are

Sergey Shepelev 25 Dec 24, 2019
A simple but powerful distributed lock

nlock A simple but powerful distributed lock Get Started Download go get github.com/inuggets/nlock Usage Redis lock import lock "github.com/inuggets/

Nuggets 1 Nov 14, 2021
High performance, distributed and low latency publish-subscribe platform.

Emitter: Distributed Publish-Subscribe Platform Emitter is a distributed, scalable and fault-tolerant publish-subscribe platform built with MQTT proto

emitter 3.3k Aug 7, 2022
Dapr is a portable, event-driven, runtime for building distributed applications across cloud and edge.

Dapr is a portable, serverless, event-driven runtime that makes it easy for developers to build resilient, stateless and stateful microservices that run on the cloud and edge and embraces the diversity of languages and developer frameworks.

Dapr 18.8k Aug 8, 2022
💡 A Distributed and High-Performance Monitoring System. The next generation of Open-Falcon

夜莺简介 夜莺是一套分布式高可用的运维监控系统,最大的特点是混合云支持,既可以支持传统物理机虚拟机的场景,也可以支持K8S容器的场景。同时,夜莺也不只是监控,还有一部分CMDB的能力、自动化运维的能力,很多公司都基于夜莺开发自己公司的运维平台。开源的这部分功能模块也是商业版本的一部分,所以可靠性有保

DiDi 5k Aug 11, 2022
Build share and run your distributed applications.

sealer[ˈsiːlər] provides the way for distributed application package and delivery based on kubernetes.

Alibaba 1.6k Aug 10, 2022
short-url distributed and high-performance

durl 是一个分布式的高性能短链服务,逻辑简单,并提供了相关api接口,开发人员可以快速接入,也可以作为go初学者练手项目.

宋昂 439 Aug 6, 2022
A distributed and coördination-free log management system

OK Log is archived I hoped to find the opportunity to continue developing OK Log after the spike of its creation. Unfortunately, despite effort, no su

OK Log 3k Jul 29, 2022
JuiceFS is a distributed POSIX file system built on top of Redis and S3.

JuiceFS is a high-performance POSIX file system released under GNU Affero General Public License v3.0. It is specially optimized for the cloud-native

Juicedata, Inc 5.7k Aug 4, 2022
Golimit is Uber ringpop based distributed and decentralized rate limiter

Golimit A Distributed Rate limiter Golimit is Uber ringpop based distributed and decentralized rate limiter. It is horizontally scalable and is based

Myntra 604 Aug 9, 2022
A distributed systems library for Kubernetes deployments built on top of spindle and Cloud Spanner.

hedge A library built on top of spindle and Cloud Spanner that provides rudimentary distributed computing facilities to Kubernetes deployments. Featur

null 21 Jan 4, 2022
A distributed locking library built on top of Cloud Spanner and TrueTime.

A distributed locking library built on top of Cloud Spanner and TrueTime.

null 44 Jul 19, 2022