Machinery is an asynchronous task queue/job queue based on distributed message passing.

Overview

Machinery

Machinery is an asynchronous task queue/job queue based on distributed message passing.

Travis Status for RichardKnop/machinery godoc for RichardKnop/machinery codecov for RichardKnop/machinery

Go Report Card GolangCI OpenTracing Badge

Sourcegraph for RichardKnop/machinery Donate Bitcoin


V2 Experiment

Please be advised that V2 is work in progress and breaking changes can and will happen until it is ready.

You can use the current V2 in order to avoid having to import all dependencies for brokers and backends you are not using.

Instead of factory, you will need to inject broker and backend objects to the server constructor:

import (
  "github.com/RichardKnop/machinery/v2"
  backendsiface "github.com/RichardKnop/machinery/v1/backends/iface"
  brokersiface "github.com/RichardKnop/machinery/v1/brokers/iface"
)

var broker brokersiface.Broker
var backend backendsiface.Backend
server, err := machinery.NewServer(cnf, broker, backend)
if err != nil {
  // do something with the error
}

First Steps

Add the Machinery library to your $GOPATH/src:

go get github.com/RichardKnop/machinery/v1

First, you will need to define some tasks. Look at sample tasks in example/tasks/tasks.go to see a few examples.

Second, you will need to launch a worker process with one of these commands (v2 is recommended since it doesn't import dependencies for all brokers / backends, only those you actually need):

go run example/v2/amqp/main.go worker
go run example/v2/redigo/main.go worker // Redis with redigo driver
go run example/v2/go-redis/main.go worker // Redis with Go Redis driver

go run example/v1/amqp/main.go worker
go run example/v1/redis/main.go worker

Example worker

Finally, once you have a worker running and waiting for tasks to consume, send some tasks with one of these commands (v2 is recommended since it doesn't import dependencies for all brokers / backends, only those you actually need):

go run example/v2/amqp/main.go send
go run example/v2/redigo/main.go send // Redis with redigo driver
go run example/v2/go-redis/main.go send // Redis with Go Redis driver

go run example/v1/amqp/main.go send
go run example/v1/redis/main.go send

You will be able to see the tasks being processed asynchronously by the worker:

Example worker receives tasks

Configuration

The config package has convenience methods for loading configuration from environment variables or a YAML file. For example, load configuration from environment variables:

cnf, err := config.NewFromEnvironment()

Or load from YAML file:

cnf, err := config.NewFromYaml("config.yml", true)

Second boolean flag enables live reloading of configuration every 10 seconds. Use false to disable live reloading.

Machinery configuration is encapsulated by a Config struct and injected as a dependency to objects that need it.

Lock

Redis

Use Redis URL in one of these formats:

redis://[password@]host[port][/db_num]

For example:

  1. redis://localhost:6379, or with password redis://password@localhost:6379

Broker

A message broker. Currently supported brokers are:

AMQP

Use AMQP URL in the format:

amqp://[username:password@]@host[:port]

For example:

  1. amqp://guest:guest@localhost:5672

AMQP also supports multiples brokers urls. You need to specify the URL separator in the MultipleBrokerSeparator field.

Redis

Use Redis URL in one of these formats:

redis://[password@]host[port][/db_num]
redis+socket://[password@]/path/to/file.sock[:/db_num]

For example:

  1. redis://localhost:6379, or with password redis://password@localhost:6379
  2. redis+socket://password@/path/to/file.sock:/0
AWS SQS

Use AWS SQS URL in the format:

https://sqs.us-east-2.amazonaws.com/123456789012

See AWS SQS docs for more information. Also, configuring AWS_REGION is required, or an error would be thrown.

To use a manually configured SQS Client:

var sqsClient = sqs.New(session.Must(session.NewSession(&aws.Config{
  Region:         aws.String("YOUR_AWS_REGION"),
  Credentials:    credentials.NewStaticCredentials("YOUR_AWS_ACCESS_KEY", "YOUR_AWS_ACCESS_SECRET", ""),
  HTTPClient:     &http.Client{
    Timeout: time.Second * 120,
  },
})))
var visibilityTimeout = 20
var cnf = &config.Config{
  Broker:          "YOUR_SQS_URL"
  DefaultQueue:    "machinery_tasks",
  ResultBackend:   "YOUR_BACKEND_URL",
  SQS: &config.SQSConfig{
    Client: sqsClient,
    // if VisibilityTimeout is nil default to the overall visibility timeout setting for the queue
    // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
    VisibilityTimeout: &visibilityTimeout,
    WaitTimeSeconds: 30,
  },
}
GCP Pub/Sub

Use GCP Pub/Sub URL in the format:

gcppubsub://YOUR_GCP_PROJECT_ID/YOUR_PUBSUB_SUBSCRIPTION_NAME

To use a manually configured Pub/Sub Client:

pubsubClient, err := pubsub.NewClient(
    context.Background(),
    "YOUR_GCP_PROJECT_ID",
    option.WithServiceAccountFile("YOUR_GCP_SERVICE_ACCOUNT_FILE"),
)

cnf := &config.Config{
  Broker:          "gcppubsub://YOUR_GCP_PROJECT_ID/YOUR_PUBSUB_SUBSCRIPTION_NAME"
  DefaultQueue:    "YOUR_PUBSUB_TOPIC_NAME",
  ResultBackend:   "YOUR_BACKEND_URL",
  GCPPubSub: config.GCPPubSubConfig{
    Client: pubsubClient,
  },
}

DefaultQueue

Default queue name, e.g. machinery_tasks.

ResultBackend

Result backend to use for keeping task states and results.

Currently supported backends are:

Redis

Use Redis URL in one of these formats:

redis://[password@]host[port][/db_num]
redis+socket://[password@]/path/to/file.sock[:/db_num]

For example:

  1. redis://localhost:6379, or with password redis://password@localhost:6379
  2. redis+socket://password@/path/to/file.sock:/0
  3. cluster redis://host1:port1,host2:port2,host3:port3
  4. cluster with password redis://pass@host1:port1,host2:port2,host3:port3
Memcache

Use Memcache URL in the format:

memcache://host1[:port1][,host2[:port2],...[,hostN[:portN]]]

For example:

  1. memcache://localhost:11211 for a single instance, or
  2. memcache://10.0.0.1:11211,10.0.0.2:11211 for a cluster
AMQP

Use AMQP URL in the format:

amqp://[username:password@]@host[:port]

For example:

  1. amqp://guest:guest@localhost:5672

Keep in mind AMQP is not recommended as a result backend. See Keeping Results

MongoDB

Use Mongodb URL in the format:

mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]

For example:

  1. mongodb://localhost:27017/taskresults

See MongoDB docs for more information.

ResultsExpireIn

How long to store task results for in seconds. Defaults to 3600 (1 hour).

AMQP

RabbitMQ related configuration. Not necessary if you are using other broker/backend.

  • Exchange: exchange name, e.g. machinery_exchange
  • ExchangeType: exchange type, e.g. direct
  • QueueBindingArguments: an optional map of additional arguments used when binding to an AMQP queue
  • BindingKey: The queue is bind to the exchange with this key, e.g. machinery_task
  • PrefetchCount: How many tasks to prefetch (set to 1 if you have long running tasks)

DynamoDB

DynamoDB related configuration. Not necessary if you are using other backend.

  • TaskStatesTable: Custom table name for saving task states. Default one is task_states, and make sure to create this table in your AWS admin first, using TaskUUID as table's primary key.
  • GroupMetasTable: Custom table name for saving group metas. Default one is group_metas, and make sure to create this table in your AWS admin first, using GroupUUID as table's primary key. For example:
dynamodb:
  task_states_table: 'task_states'
  group_metas_table: 'group_metas'

If these tables are not found, an fatal error would be thrown.

If you wish to expire the records, you can configure the TTL field in AWS admin for these tables. The TTL field is set based on the ResultsExpireIn value in the Server's config. See https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html for more information.

Redis

Redis related configuration. Not necessary if you are using other backend.

See: config (TODO)

GCPPubSub

GCPPubSub related configuration. Not necessary if you are using other backend.

See: config (TODO)

Custom Logger

You can define a custom logger by implementing the following interface:

type Interface interface {
  Print(...interface{})
  Printf(string, ...interface{})
  Println(...interface{})

  Fatal(...interface{})
  Fatalf(string, ...interface{})
  Fatalln(...interface{})

  Panic(...interface{})
  Panicf(string, ...interface{})
  Panicln(...interface{})
}

Then just set the logger in your setup code by calling Set function exported by github.com/RichardKnop/machinery/v1/log package:

log.Set(myCustomLogger)

Server

A Machinery library must be instantiated before use. The way this is done is by creating a Server instance. Server is a base object which stores Machinery configuration and registered tasks. E.g.:

import (
  "github.com/RichardKnop/machinery/v1/config"
  "github.com/RichardKnop/machinery/v1"
)

var cnf = &config.Config{
  Broker:        "amqp://guest:guest@localhost:5672/",
  DefaultQueue:  "machinery_tasks",
  ResultBackend: "amqp://guest:guest@localhost:5672/",
  AMQP: &config.AMQPConfig{
    Exchange:     "machinery_exchange",
    ExchangeType: "direct",
    BindingKey:   "machinery_task",
  },
}

server, err := machinery.NewServer(cnf)
if err != nil {
  // do something with the error
}

Workers

In order to consume tasks, you need to have one or more workers running. All you need to run a worker is a Server instance with registered tasks. E.g.:

worker := server.NewWorker("worker_name", 10)
err := worker.Launch()
if err != nil {
  // do something with the error
}

Each worker will only consume registered tasks. For each task on the queue the Worker.Process() method will be run in a goroutine. Use the second parameter of server.NewWorker to limit the number of concurrently running Worker.Process() calls (per worker). Example: 1 will serialize task execution while 0 makes the number of concurrently executed tasks unlimited (default).

Tasks

Tasks are a building block of Machinery applications. A task is a function which defines what happens when a worker receives a message.

Each task needs to return an error as a last return value. In addition to error tasks can now return any number of arguments.

Examples of valid tasks:

func Add(args ...int64) (int64, error) {
  sum := int64(0)
  for _, arg := range args {
    sum += arg
  }
  return sum, nil
}

func Multiply(args ...int64) (int64, error) {
  sum := int64(1)
  for _, arg := range args {
    sum *= arg
  }
  return sum, nil
}

// You can use context.Context as first argument to tasks, useful for open tracing
func TaskWithContext(ctx context.Context, arg Arg) error {
  // ... use ctx ...
  return nil
}

// Tasks need to return at least error as a minimal requirement
func DummyTask(arg string) error {
  return errors.New(arg)
}

// You can also return multiple results from the task
func DummyTask2(arg1, arg2 string) (string, string, error) {
  return arg1, arg2, nil
}

Registering Tasks

Before your workers can consume a task, you need to register it with the server. This is done by assigning a task a unique name:

server.RegisterTasks(map[string]interface{}{
  "add":      Add,
  "multiply": Multiply,
})

Tasks can also be registered one by one:

server.RegisterTask("add", Add)
server.RegisterTask("multiply", Multiply)

Simply put, when a worker receives a message like this:

{
  "UUID": "48760a1a-8576-4536-973b-da09048c2ac5",
  "Name": "add",
  "RoutingKey": "",
  "ETA": null,
  "GroupUUID": "",
  "GroupTaskCount": 0,
  "Args": [
    {
      "Type": "int64",
      "Value": 1,
    },
    {
      "Type": "int64",
      "Value": 1,
    }
  ],
  "Immutable": false,
  "RetryCount": 0,
  "RetryTimeout": 0,
  "OnSuccess": null,
  "OnError": null,
  "ChordCallback": null
}

It will call Add(1, 1). Each task should return an error as well so we can handle failures.

Ideally, tasks should be idempotent which means there will be no unintended consequences when a task is called multiple times with the same arguments.

Signatures

A signature wraps calling arguments, execution options (such as immutability) and success/error callbacks of a task so it can be sent across the wire to workers. Task signatures implement a simple interface:

// Arg represents a single argument passed to invocation fo a task
type Arg struct {
  Type  string
  Value interface{}
}

// Headers represents the headers which should be used to direct the task
type Headers map[string]interface{}

// Signature represents a single task invocation
type Signature struct {
  UUID           string
  Name           string
  RoutingKey     string
  ETA            *time.Time
  GroupUUID      string
  GroupTaskCount int
  Args           []Arg
  Headers        Headers
  Immutable      bool
  RetryCount     int
  RetryTimeout   int
  OnSuccess      []*Signature
  OnError        []*Signature
  ChordCallback  *Signature
}

UUID is a unique ID of a task. You can either set it yourself or it will be automatically generated.

Name is the unique task name by which it is registered against a Server instance.

RoutingKey is used for routing a task to correct queue. If you leave it empty, the default behaviour will be to set it to the default queue's binding key for direct exchange type and to the default queue name for other exchange types.

ETA is a timestamp used for delaying a task. if it's nil, the task will be published for workers to consume immediately. If it is set, the task will be delayed until the ETA timestamp.

GroupUUID, GroupTaskCount are useful for creating groups of tasks.

Args is a list of arguments that will be passed to the task when it is executed by a worker.

Headers is a list of headers that will be used when publishing the task to AMQP queue.

Immutable is a flag which defines whether a result of the executed task can be modified or not. This is important with OnSuccess callbacks. Immutable task will not pass its result to its success callbacks while a mutable task will prepend its result to args sent to callback tasks. Long story short, set Immutable to false if you want to pass result of the first task in a chain to the second task.

RetryCount specifies how many times a failed task should be retried (defaults to 0). Retry attempts will be spaced out in time, after each failure another attempt will be scheduled further to the future.

RetryTimeout specifies how long to wait before resending task to the queue for retry attempt. Default behaviour is to use fibonacci sequence to increase the timeout after each failed retry attempt.

OnSuccess defines tasks which will be called after the task has executed successfully. It is a slice of task signature structs.

OnError defines tasks which will be called after the task execution fails. The first argument passed to error callbacks will be the error string returned from the failed task.

ChordCallback is used to create a callback to a group of tasks.

Supported Types

Machinery encodes tasks to JSON before sending them to the broker. Task results are also stored in the backend as JSON encoded strings. Therefor only types with native JSON representation can be supported. Currently supported types are:

  • bool
  • int
  • int8
  • int16
  • int32
  • int64
  • uint
  • uint8
  • uint16
  • uint32
  • uint64
  • float32
  • float64
  • string
  • []bool
  • []int
  • []int8
  • []int16
  • []int32
  • []int64
  • []uint
  • []uint8
  • []uint16
  • []uint32
  • []uint64
  • []float32
  • []float64
  • []string

Sending Tasks

Tasks can be called by passing an instance of Signature to an Server instance. E.g:

import (
  "github.com/RichardKnop/machinery/v1/tasks"
)

signature := &tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

asyncResult, err := server.SendTask(signature)
if err != nil {
  // failed to send the task
  // do something with the error
}

Delayed Tasks

You can delay a task by setting the ETA timestamp field on the task signature.

// Delay the task by 5 seconds
eta := time.Now().UTC().Add(time.Second * 5)
signature.ETA = &eta

Retry Tasks

You can set a number of retry attempts before declaring task as failed. Fibonacci sequence will be used to space out retry requests over time. (See RetryTimeout for details.)

// If the task fails, retry it up to 3 times
signature.RetryCount = 3

Alternatively, you can return tasks.ErrRetryTaskLater from your task and specify duration after which the task should be retried, e.g.:

return tasks.NewErrRetryTaskLater("some error", 4 * time.Hour)

Get Pending Tasks

Tasks currently waiting in the queue to be consumed by workers can be inspected, e.g.:

server.GetBroker().GetPendingTasks("some_queue")

Currently only supported by Redis broker.

Keeping Results

If you configure a result backend, the task states and results will be persisted. Possible states:

const (
	// StatePending - initial state of a task
	StatePending = "PENDING"
	// StateReceived - when task is received by a worker
	StateReceived = "RECEIVED"
	// StateStarted - when the worker starts processing the task
	StateStarted = "STARTED"
	// StateRetry - when failed task has been scheduled for retry
	StateRetry = "RETRY"
	// StateSuccess - when the task is processed successfully
	StateSuccess = "SUCCESS"
	// StateFailure - when processing of the task fails
	StateFailure = "FAILURE"
)

When using AMQP as a result backend, task states will be persisted in separate queues for each task. Although RabbitMQ can scale up to thousands of queues, it is strongly advised to use a better suited result backend (e.g. Memcache) when you are expecting to run a large number of parallel tasks.

// TaskResult represents an actual return value of a processed task
type TaskResult struct {
  Type  string      `bson:"type"`
  Value interface{} `bson:"value"`
}

// TaskState represents a state of a task
type TaskState struct {
  TaskUUID  string        `bson:"_id"`
  State     string        `bson:"state"`
  Results   []*TaskResult `bson:"results"`
  Error     string        `bson:"error"`
}

// GroupMeta stores useful metadata about tasks within the same group
// E.g. UUIDs of all tasks which are used in order to check if all tasks
// completed successfully or not and thus whether to trigger chord callback
type GroupMeta struct {
  GroupUUID      string   `bson:"_id"`
  TaskUUIDs      []string `bson:"task_uuids"`
  ChordTriggered bool     `bson:"chord_triggered"`
  Lock           bool     `bson:"lock"`
}

TaskResult represents a slice of return values of a processed task.

TaskState struct will be serialized and stored every time a task state changes.

GroupMeta stores useful metadata about tasks within the same group. E.g. UUIDs of all tasks which are used in order to check if all tasks completed successfully or not and thus whether to trigger chord callback.

AsyncResult object allows you to check for the state of a task:

taskState := asyncResult.GetState()
fmt.Printf("Current state of %v task is:\n", taskState.TaskUUID)
fmt.Println(taskState.State)

There are couple of convenient methods to inspect the task status:

asyncResult.GetState().IsCompleted()
asyncResult.GetState().IsSuccess()
asyncResult.GetState().IsFailure()

You can also do a synchronous blocking call to wait for a task result:

results, err := asyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
  // getting result of a task failed
  // do something with the error
}
for _, result := range results {
  fmt.Println(result.Interface())
}

Error Handling

When a task returns with an error, the default behavior is to first attempty to retry the task if it's retriable, otherwise log the error and then eventually call any error callbacks.

To customize this, you can set a custom error handler on the worker which can do more than just logging after retries fail and error callbacks are trigerred:

worker.SetErrorHandler(func (err error) {
  customHandler(err)
})

Workflows

Running a single asynchronous task is fine but often you will want to design a workflow of tasks to be executed in an orchestrated way. There are couple of useful functions to help you design workflows.

Groups

Group is a set of tasks which will be executed in parallel, independent of each other. E.g.:

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

group, _ := tasks.NewGroup(&signature1, &signature2)
asyncResults, err := server.SendGroup(group, 0) //The second parameter specifies the number of concurrent sending tasks. 0 means unlimited.
if err != nil {
  // failed to send the group
  // do something with the error
}

SendGroup returns a slice of AsyncResult objects. So you can do a blocking call and wait for the result of groups tasks:

for _, asyncResult := range asyncResults {
  results, err := asyncResult.Get(time.Duration(time.Millisecond * 5))
  if err != nil {
    // getting result of a task failed
    // do something with the error
  }
  for _, result := range results {
    fmt.Println(result.Interface())
  }
}

Chords

Chord allows you to define a callback to be executed after all tasks in a group finished processing, e.g.:

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

signature3 := tasks.Signature{
  Name: "multiply",
}

group := tasks.NewGroup(&signature1, &signature2)
chord, _ := tasks.NewChord(group, &signature3)
chordAsyncResult, err := server.SendChord(chord, 0) //The second parameter specifies the number of concurrent sending tasks. 0 means unlimited.
if err != nil {
  // failed to send the chord
  // do something with the error
}

The above example executes task1 and task2 in parallel, aggregates their results and passes them to task3. Therefore what would end up happening is:

multiply(add(1, 1), add(5, 5))

More explicitly:

(1 + 1) * (5 + 5) = 2 * 10 = 20

SendChord returns ChordAsyncResult which follows AsyncResult's interface. So you can do a blocking call and wait for the result of the callback:

results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
  // getting result of a chord failed
  // do something with the error
}
for _, result := range results {
  fmt.Println(result.Interface())
}

Chains

Chain is simply a set of tasks which will be executed one by one, each successful task triggering the next task in the chain. E.g.:

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

signature3 := tasks.Signature{
  Name: "multiply",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 4,
    },
  },
}

chain, _ := tasks.NewChain(&signature1, &signature2, &signature3)
chainAsyncResult, err := server.SendChain(chain)
if err != nil {
  // failed to send the chain
  // do something with the error
}

The above example executes task1, then task2 and then task3. When a task is completed successfully, the result is appended to the end of list of arguments for the next task in the chain. Therefore what would end up happening is:

multiply(4, add(5, 5, add(1, 1)))

More explicitly:

  4 * (5 + 5 + (1 + 1))   # task1: add(1, 1)        returns 2
= 4 * (5 + 5 + 2)         # task2: add(5, 5, 2)     returns 12
= 4 * (12)                # task3: multiply(4, 12)  returns 48
= 48

SendChain returns ChainAsyncResult which follows AsyncResult's interface. So you can do a blocking call and wait for the result of the whole chain:

results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
  // getting result of a chain failed
  // do something with the error
}
for _, result := range results {
  fmt.Println(result.Interface())
}

Periodic Tasks & Workflows

Machinery now supports scheduling periodic tasks and workflows. See examples bellow.

Periodic Tasks

import (
  "github.com/RichardKnop/machinery/v1/tasks"
)

signature := &tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

err := server.RegisterPeriodTask("0 6 * * ?", "periodic-task", signature)
if err != nil {
  // failed to register periodic task
}

Periodic Groups

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

group, _ := tasks.NewGroup(&signature1, &signature2)
err := server.RegisterPeriodGroup("0 6 * * ?", "periodic-group", group)
if err != nil {
  // failed to register periodic group
}

Periodic Chains

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

signature3 := tasks.Signature{
  Name: "multiply",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 4,
    },
  },
}

chain, _ := tasks.NewChain(&signature1, &signature2, &signature3)
err := server.RegisterPeriodChain("0 6 * * ?", "periodic-chain", chain)
if err != nil {
  // failed to register periodic chain
}

Chord

import (
  "github.com/RichardKnop/machinery/v1/tasks"
  "github.com/RichardKnop/machinery/v1"
)

signature1 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 1,
    },
    {
      Type:  "int64",
      Value: 1,
    },
  },
}

signature2 := tasks.Signature{
  Name: "add",
  Args: []tasks.Arg{
    {
      Type:  "int64",
      Value: 5,
    },
    {
      Type:  "int64",
      Value: 5,
    },
  },
}

signature3 := tasks.Signature{
  Name: "multiply",
}

group := tasks.NewGroup(&signature1, &signature2)
chord, _ := tasks.NewChord(group, &signature3)
err := server.RegisterPeriodChord("0 6 * * ?", "periodic-chord", chord)
if err != nil {
  // failed to register periodic chord
}

Development

Requirements

  • Go
  • RabbitMQ (optional)
  • Redis
  • Memcached (optional)
  • MongoDB (optional)

On OS X systems, you can install requirements using Homebrew:

brew install go
brew install rabbitmq
brew install redis
brew install memcached
brew install mongodb

Or optionally use the corresponding Docker containers:

docker run -d -p 5672:5672 rabbitmq
docker run -d -p 6379:6379 redis
docker run -d -p 11211:11211 memcached
docker run -d -p 27017:27017 mongo
docker run -d -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:latest

Dependencies

Since Go 1.11, a new recommended dependency management system is via modules.

This is one of slight weaknesses of Go as dependency management is not a solved problem. Previously Go was officially recommending to use the dep tool but that has been abandoned now in favor of modules.

Testing

Easiest (and platform agnostic) way to run tests is via docker-compose:

make ci

This will basically run docker-compose command:

(docker-compose -f docker-compose.test.yml -p machinery_ci up --build -d) && (docker logs -f machinery_sut &) && (docker wait machinery_sut)

Alternative approach is to setup a development environment on your machine.

In order to enable integration tests, you will need to install all required services (RabbitMQ, Redis, Memcache, MongoDB) and export these environment variables:

export AMQP_URL=amqp://guest:guest@localhost:5672/
export REDIS_URL=localhost:6379
export MEMCACHE_URL=localhost:11211
export MONGODB_URL=localhost:27017

To run integration tests against an SQS instance, you will need to create a "test_queue" in SQS and export these environment variables:

export SQS_URL=https://YOUR_SQS_URL
export AWS_ACCESS_KEY_ID=YOUR_AWS_ACCESS_KEY_ID
export AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET_ACCESS_KEY
export AWS_DEFAULT_REGION=YOUR_AWS_DEFAULT_REGION

Then just run:

make test

If the environment variables are not exported, make test will only run unit tests.

Comments
  • Redis type issue still persists in v1.9.9

    Redis type issue still persists in v1.9.9

    Upgraded to new version v1.9.9 and got the same error as previous issue #625

    # github.com/RichardKnop/machinery/v1/backends/redis
    ../../../go-code/pkg/mod/github.com/!richard!knop/[email protected]/v1/backends/redis/goredis.go:57:43: cannot use b.rclient (type "github.com/go-redis/redis/v8".UniversalClient) as type *"github.com/go-redis/redis/v8".Client in argument to goredis.NewPool: need type assertion
    

    From Code: Screen Shot 2020-11-23 at 4 50 37 PM

    Mods:

    module github.com/...
    
    go 1.15
    
    require (
    	github.com/RichardKnop/machinery v1.9.9
    	github.com/go-redis/redis/v8 v8.4.0 // indirect
    	github.com/gomodule/redigo v2.0.0+incompatible
    )
    
    opened by mrz1836 17
  • Feature Request: Delay task

    Feature Request: Delay task

    Ability to delay the tasks for processing e.g run task in 5 Min.

    Could be implemented by adding a Execute Time to the task signature, and by user a helper method e.g

    func (t *TaskSignature) Delay(d time.Duration) 
    

    When the worker gets the task, it holds onto it until the execution time and then runs it.

    You want a store a time to execute and not just the delay as you want to guarantee that the task is run as close as possible to it, If the worker crashes the delay would start again otherwise.

    enhancement 
    opened by owenhaynes 15
  • add periodic task

    add periodic task

    support periodic task

    I add a distributed lock ensures that each scheduled task will only be published once. And the life cycle of a lock is the same as the schedule of a scheduled task

    enhancement 
    opened by KarlTango 10
  •  backend mongo json changes

    backend mongo json changes

    @RichardKnop This changes is required to get the backend result into json format. i tested the integration test with go test command and i am getting success .if you have found anything please let me know.

    opened by surendratiwari3 9
  • Lost periodic task

    Lost periodic task

    Hi

    I have periodic task, but sometime it has been lost. Server use redis.

    Task registration. this is one of the attempts: err = machineryServer.RegisterPeriodicGroup("*/5 * * * *", "fiveMinutesGroup", 0, fiveMinutesGroup.Tasks...)

    example of data: lost

    If I change time description to err = machineryServer.RegisterPeriodicGroup("@every 5m", "fiveMinutesGroup", 0, fiveMinutesGroup.Tasks...) it working fine, but not precision

    how do I figure out what the problem is? Time on the server synced by chronyd(for testing I try disable chroned, and enable ntpd, but result is some)

    Thanks

    opened by agvol 8
  • fix redis broker quit very slow

    fix redis broker quit very slow

    #439

    1. Use select default is more logical
    2. Add NormalTasksPollPeriod param to control redis BLPOP timeout, depending on your business scenario choice suitable value.
    opened by pavlelee 8
  • inject signature into the context

    inject signature into the context

    Rationale behind this PR:

    I want to:

    1. able to interrupt/cancel a running job.
    2. running job able to send progress

    both of these require the job func to know the task id so that I can save status to a db. e.g. Redis

    I could make it an argument in the task function and send the task id on Send, but I reckon having the signature in the context make more sense.

    See example: https://github.com/jackielii/process

    opened by jackielii 8
  • Error when pulling from Github (mongo client)

    Error when pulling from Github (mongo client)

    We've been using Machinery for some time - suddenly this morning all attempts to pull from github (go get -u) are resulting in:

    ../../../github.com/RichardKnop/machinery/v1/backends/mongo/mongodb.go:302:32: cannot use uri (type string) as type *options.ClientOptions in argument to mongo.NewClient

    It's happening during download from Github, not during the Go build.

    Can't see any changes recently in the repo (we successfully deployed yesterday).

    Any ideas?

    opened by seonixx 8
  • feat(tracing): implement trace context propagation using the Opentracing api

    feat(tracing): implement trace context propagation using the Opentracing api

    This PR enables context propagation using newly introduced functions while maintaining backwards compatibility:

    • SendTaskWithContext
    • SendGroupWithContext
    • SendChordWithContext
    • SendChainWithContext

    Essentially what happens is that if one of these functions is called it will try to inject the trace context found in context.Context into the signature's headers before calling the original non-WithContext function. On the worker side an attempt is made to extract that context again and add it to the tasks context. All these actions will simply start a new span if no trace context is found. As an extra the trace span will be available in the called function's context if the registered function has context.Context as a first argument so the trace can travel on. To get a good view of what a user should enable to get the benefits of tracing check example/machinery.go and uncomment the jaeger code in example/tracers/jaeger.go. Below is a screenshot of the jaeger's ui after a run of the example. Note that I've disabled the long running task so the graph is prettier. There are also some screen shots of the tags captured by the spans.

    What is missing is a bit more documentation in the readme and some tests that verify the traces are actually propagating.

    screenshot from 2018-02-25 16-21-04 screenshot from 2018-02-25 16-22-05 screenshot from 2018-02-25 16-22-58

    opened by rio 8
  • `Connection Reset By Peer` When sending lots of tasks by `SendGroup`

    `Connection Reset By Peer` When sending lots of tasks by `SendGroup`

    I use SendGroup to send my tasks, when I send 10 tasks, it works well. However, if I send 100 tasks, it tells me read tcp 127.0.0.1:56064->127.0.0.1:56379: read: connection reset by peer.Therefore, they won't be execute. I do not know how to solve this problem.

    opened by Jolly23 8
  • Secret Information is logged when new worker is launched.

    Secret Information is logged when new worker is launched.

    Problem: When a new worker is launched the LaunchAsync function is called internally and it logs the configuration of the worker which exposes my AMQP URL and ResultBackend URL which is not ideal in any production environment. Code

    Example log:

    INFO: 2019/07/10 10:27:00 worker.go:46 Launching a worker with the following settings: INFO: 2019/07/10 10:27:00 worker.go:47 - Broker: amqp://guest:guest@rabbitmq:5672 INFO: 2019/07/10 10:27:00 worker.go:49 - DefaultQueue: test INFO: 2019/07/10 10:27:00 worker.go:53 - ResultBackend: mongodb://mongodb:27017/test INFO: 2019/07/10 10:27:00 worker.go:55 - AMQP: broker INFO: 2019/07/10 10:27:00 worker.go:56 - Exchange: broker INFO: 2019/07/10 10:27:00 worker.go:57 - ExchangeType: direct INFO: 2019/07/10 10:27:00 worker.go:58 - BindingKey: test INFO: 2019/07/10 10:27:00 worker.go:59 - PrefetchCount: 1

    Possible Solutions:

    1. Redact and log secret values like amqp://gu***:gu***@rabbi***:5672.
    2. Set a log level in the machinery server configuration so that the developer can opt to not Print INFO level logs in Production environments.

    @RichardKnop What is your take on this ? We are planning to use machinery extensively in our organization and are ready to contribute.

    opened by yolossn 7
  • compile error: impossible type assertion

    compile error: impossible type assertion

    I got an error while I tring to compile my project with machinery/v1 Here is the log:

    
    external/com_google_cloud_go_pubsub/message.go:42:42: impossible type assertion:
            *psAckHandler does not implement "cloud.google.com/go/internal/pubsub".AckHandler (missing OnAckWithResult method)
    external/com_google_cloud_go_pubsub/message.go:70:27: cannot use ackh (type *psAckHandler) as type "cloud.google.com/go/internal/pubsub".AckHandler in argument to "cloud.google.com/go/internal/pubsub".NewMessage:
            *psAckHandler does not implement "cloud.google.com/go/internal/pubsub".AckHandler (missing OnAckWithResult method)
    compilepkg: error running subcommand external/go_sdk/pkg/tool/linux_amd64/compile: exit status 2
    Target //workflow/cmd:workflow failed to build
    

    And then, I found that Machinery depends on cloud.google.com/[email protected], but my project depends on v0.105.0. The different between this two version is about a interface in cloud.google.com/go/internal/pubsub/message.go

    type AckHandler interface {
    
            // both declare in  v0.76.0 and v0.105.0
    	OnAck() 
    	OnNack()
    
            // only declare in v0.105.0
    	OnAckWithResult() *AckResult
    	OnNackWithResult() *AckResult
    }
    

    What should I do with this error? Beg for reply, thanks a lot

    opened by mzychaco 1
  • Failed delivery of delivery tag: 0

    Failed delivery of delivery tag: 0

    Hi, I am using v1 and amqp as a broker.

    While publishing some message I am getting below error.
    EnqueueRequest: SendMessage: Publish message error: Failed delivery of delivery tag: 0

    Please correct if I am wrong, delivery tag 0 meaning failed to publish single message.

    Also I want to replicate the same but could not do this. Can anyone help me here? Thanks!

    opened by call-stack 0
  • Receiving message will not be processed until visibility timeout when worker starts to quit

    Receiving message will not be processed until visibility timeout when worker starts to quit

    Hi, I'm using v1 and sqs as a broker.

    When the message receipt is in progress and worker starts to shutdown, the worker will wait for the message to be received from the broker but will not process it and the task will end up not to be executed until visibility timeout.

    Worker should wait until the receiving message is finished processing, and then shutdown gracefully.

    The main reason why this is happening is that the worker stops to consume new task by closing b.stopChan before all the message is finished receiving. https://github.com/RichardKnop/machinery/blob/master/v1/common/broker.go#L117

    Closing deliveries before the task process is finished will also make the receiving message not able to be processed. https://github.com/RichardKnop/machinery/blob/master/v1/brokers/sqs/sqs.go#L86

    opened by tomo25 1
  • Prevent more than one instance of task running at the same time

    Prevent more than one instance of task running at the same time

    I am wondering if it's possible to obtain a behaviour in v1 so a task is not ran if it's already running. For example, if request 1 pushes taskX and request2 pushes taskX only a second later, assuming taskX pushed by request 1 is still running, can we ensure that taskX pushed by request2 will be skipped.

    opened by gurpreetg 0
  • Configuration with Redis Sentinel

    Configuration with Redis Sentinel

    Does anyone have an example of a configuration file for a redis Sentinel connection?

    I'm able to write a task to a redis sentinel cluster, but am unable to read from the redis cluster, worker just stays at waiting for running tasks.

    opened by xlanor 0
Releases(v2.0.11)
Owner
Richard Knop
I'm an experienced software engineer, open source contributor. I have mostly focused on backend programming in Go & Python
Richard Knop
A lightweight, distributed and reliable message queue based on Redis

nmq A lightweight, distributed and reliable message queue based on Redis Get Started Download go get github.com/inuggets/nmq Usage import "github.com

Nuggets 2 Nov 22, 2021
A Multi Consumer per Message Queue with persistence and Queue Stages.

CrimsonQ A Multi Consumer per Message Queue with persistence and Queue Stages. Under Active Development Crimson Queue allows you to have multiple cons

Yousef Wadi 11 Jul 30, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Ken Hibino 5k Dec 30, 2022
Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.

Cadence Visit cadenceworkflow.io to learn about Cadence. This repo contains the source code of the Cadence server. To implement workflows, activities

Uber Open Source 6.5k Jan 9, 2023
Alertmanager go message broker - A simple message broker made to integrate with alertmanager/prometheus

Alertmanager message broker Prerequisites Go 1.16+ Sqllite driver About: The alertmanager message broker is a project made to meet some of my needs to

Davi AraĂșjo 0 Dec 27, 2021
A single binary, simple, message queue.

MiniQueue A stupid simple, single binary message queue using HTTP/2. Most messaging workloads don't require enormous amounts of data, endless features

Tom Arrell 125 Nov 9, 2022
KubeMQ is a Kubernetes native message queue broker

KubeMQ Community is the open-source version of KubeMQ, the Kubernetes native message broker. More about KubeMQ

Temur Yunusov 0 Nov 20, 2021
Simple docker container to publish a fixed message to a specified queue. Created to be used with k8s CRON scheduling.

RabbitMQ Publish CRON Simple docker container to publish a fixed message to a specified rabbitmq exchange. Created to be used as part of a Kubernetes

Daniel Emery 0 Dec 20, 2021
Distributed Lab 3: Message Broker in Go

Distributed Lab 3: Message Broker in Go Using the lab sheet There are two ways to use the lab sheet, you can either: create a new repo from this templ

null 0 Oct 29, 2021
Persistent queue in Go based on BBolt

Persistent queue Persistent queue based on bbolt DB. Supposed to be used as embeddable persistent queue to any Go application. Features: messages are

Aleksandr Baryshnikov 2 Jun 30, 2022
🔊Minimalist message bus implementation for internal communication

?? Bus Bus is a minimalist event/message bus implementation for internal communication. It is heavily inspired from my event_bus package for Elixir la

Mustafa Turan 282 Jan 3, 2023
An n:m message multiplexer written in Go

What is Gollum? Gollum is an n:m multiplexer that gathers messages from different sources and broadcasts them to a set of destinations. Gollum origina

trivago N.V. 926 Dec 23, 2022
A library for scheduling when to dispatch a message to a channel

gosd go-schedulable-dispatcher (gosd), is a library for scheduling when to dispatch a message to a channel. Implementation The implementation provides

Alexander Sniffin 21 Sep 27, 2022
:incoming_envelope: A fast Message/Event Hub using publish/subscribe pattern with support for topics like* rabbitMQ exchanges for Go applications

Hub ?? A fast enough Event Hub for go applications using publish/subscribe with support patterns on topics like rabbitMQ exchanges. Table of Contents

Leandro Lugaresi 125 Dec 17, 2022
Go simple async message bus

?? message-bus Go simple async message bus. ?? ABOUT Contributors: RafaƂ Lorenz Want to contribute ? Feel free to send pull requests! Have problems, b

RafaƂ Lorenz 240 Dec 29, 2022
The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

houseme 5 Oct 20, 2022
GopherSay allow you to display a message said by a cute random Gopher.

GopherSay About Welcome in GopherSay! GopherSay is inspired by Cowsay program. GopherSay allow you to display a message said by a cute random Gopher.

Aurelie Vache 19 Nov 23, 2022
Golang module/tool for decoding proto buffer without message definition.

Golang module/tool for decoding proto buffer without message definition.

null 42 Nov 11, 2022
Testing message queues with RabbitMQ

Rabbit-MessageQueue Just a repository of RabbitMQ simple usage for queueing messages. You can use this as a sender or a receiver. More information is

Jawady Muhammad Habib 2 Mar 10, 2022