Simple, efficient background processing for Golang backed by RabbitMQ and Redis

Overview

CircleCI Go Report Card GitHub godoc for Joker666/cogman

logo

Table of Contents

How to use:

First add it to $GOPATH

go get github.com/Joker666/cogman

Then add configuration for rabbitmq for messaging, redis as backend, optionally mongodb as backend for re-enqueuing feature.
Start the server to consume the tasks and start the client session to send tasks to server. Start server and client.
Write task handlers and register them. Send the tasks to process. And voila!, you have set up the simplest background processing job server.

You should see something like this when everything is up and running List of services

Motivation

In python world you have Celery, in Ruby world you have Resque, SideKiq, in C# Hangfire. All of them had one thing in common, simple interface to get started. When building products in Golang, this was apparent that, there is no library with a simple interface. We have Machinery, which is an excellent library, but has a steep learning curve. Also it has tonnes of features backed in that you do not need and but is required anyway.

Also the way it handled processing of future tasks with RabbitMQ's Dead Letter Exchange, we were not very fond of it. So we decided to make our own job processing library. This is a opinionated library as it uses RabbitMQ as the message broker and Redis and optionally MongoDB backend for more features. This setup has worked great for us in production and under stress and I believe this can work for large tasks as well

Requirements

  • Go
  • RabbitMQ
  • Redis
  • MongoDB (optional)

Features

  • Task Priority
  • Persistence
  • Queue type
  • Retries
  • Multiple consumer & server
  • Concurrency
  • Redis and Mongo log
  • Re-enqueue recovered task
  • Handle Reconnection
  • UI
  • Rest API

Examples

Setup

Config

Cogman api config example.

cfg := &config.Config{
    ConnectionTimeout: time.Minute * 10, // default value 10 minutes
    RequestTimeout   : time.Second * 5,  // default value 5 second


    AmqpURI : "amqp://localhost:5672",                  // required
    RedisURI: "redis://localhost:6379/0",               // required
    MongoURI: "mongodb://root:[email protected]:27017/", // optional

    RedisTTL: time.Hour * 24 * 7,  // optional. default value 1 week
    MongoTTL: time.Hour * 24 * 30, // optional. default value 1 month

    HighPriorityQueueCount: 2,     // optional. default value 1
    LowPriorityQueueCount : 4,     // optional. default value 1
    StartRestServer:        true   // optional. default value false
}

Client & Server also has individual config file to use them separately.

Client/Server

This Cogman api call will start a client and a server.

if err := cogman.StartBackground(cfg); err != nil {
    log.Fatal(err)
}

Instead, if you want you can initiate Client & Server individually:

// Client
client, err := cogman.NewSession(cfg)
if err != nil {
    log.Fatal(err)
}

if err := client.Connect(); err != nil {
    log.Fatal(err)
}

// Server
server, err := cogman.NewServer(cfg)
if err != nil {
    log.Fatal(err)
}

go func() {
    defer server.Stop()
    if err = server.Start(); err != nil {
        log.Fatal(err)
    }
}()

Task

Tasks are grouped by two priority level. Based on that it will be assigned to a queue.

type Task struct {
    TaskID         string       // unique. ID should be assigned by Cogman.
    Name           string       // required. And Task name must be registered with a task handler
    OriginalTaskID string       // a retry task will carry it's parents ID.
    PrimaryKey     string       // optional. Client can set any key to trace a task.
    Retry          int          // default value 0.
    Prefetch       int          // optional. Number of task fetch from queue by consumer at a time.
    Payload        []byte       // required
    Priority       TaskPriority // required. High or Low
    Status         Status       // current task status
    FailError      string       // default empty. If Status is failed, it must have a value.
    Duration       *float64     // task execution time.
    CreatedAt      time.Time    // create time.
    UpdatedAt      time.Time    // last update time.
}

Worker/Task Handler

Any struct can be passed as a handler it implements below interface:

type Handler interface {
	Do(ctx context.Context, payload []byte) error
}

A function type HandlerFunc also can pass as handler.

type HandlerFunc func(ctx context.Context, payload []byte) error

func (h HandlerFunc) Do(ctx context.Context, payload []byte) error {
	return h(ctx, payload)
}

Register The Handlers

// Register task handler from Server side
server.Register(taskName, handler)
server.Register(taskName, handlerFunc)

Send Task

Sending task using Cogman API:

if err := cogman.SendTask(*task, handler); err != nil {
	log.Fatal(err)
}

// If a task handler is already registered, you can pass nil.
if err := cogman.SendTask(*task, nil); err != nil {
	log.Fatal(err)
}

Sending task using Cogman Client/Server:

// Sending task from client
if err := client.SendTask(task); err != nil {
    return err
}

Queue

Cogman queue type:

- High_Priority_Queue [default Queue]  
- Low_Priority_Queue  [lazy Queue]

There are two types queues that Cogman maintains. Default & Lazy queue. High priority tasks would be pushed to default queue and low priority task would be pushed to lazy queue. The number of each type of queues can be set by client/server through configuration. Queue won't be lost after any sort of connection interruption.

Re-Connection

Cogman Client & Server both handles reconnection. If the client loses connection, it can still take tasks, and those will be processed immediate after Cogman client gets back the connection. After Server reconnects, it will start to consume tasks without losing any task.

Re-Enqueue

Re-enqueue feature to recover all the initiated task those are lost for connection error. If client somehow loses the amqp connection, Cogman can still take the task in offline. All offline task will be re-queue after connection re-established. Cogman fetches all the offline tasks from mongo logs, and re-initiate them. Mongo connection required here. For re-enqueuing, task retry count would not change.

Feature Comparison

Comparison among the other job/task process runner.

Feature Cogman Machinery
Backend redis/mongo redis
Priorities
Re-Enqueue
Concurrency
Re-Connection
Delayed jobs
Concurrent client/server
Re-try
Persistence
UI
Rest API
Chain
Chords
Groups

Contribution

Want to contribute? Great!

To fix a bug or enhance an existing module, follow these steps:

  • Fork the repo
  • Create a new branch (git checkout -b improve-feature)
  • Make the appropriate changes in the files
  • Add changes to reflect the changes made
  • Commit your changes (git commit -am 'Improve feature')
  • Push to the branch (git push origin improve-feature)
  • Create a Pull Request

Bug / Feature Request

If you find a bug, kindly open an issue here.
If you'd like to request/add a new function, feel free to do so by opening an issue here.

License

MIT © MD Ahad Hasan

Releases(v1.0.0)
Owner
Hasan
Senior Backend/DevOps Engineer
Hasan
Simple job queues for Go backed by Redis

bokchoy Introduction Bokchoy is a simple Go library for queueing tasks and processing them in the background with workers. It should be integrated in

Florent Messa 255 Jul 13, 2022
A persistent and flexible background jobs library for go.

Jobs Development Status Jobs is no longer being actively developed. I will still try my best to respond to issues and pull requests, but in general yo

Alex Browne 489 Jul 23, 2022
Tiny library to handle background jobs.

bgjob Tiny library to handle background jobs. Use PostgreSQL to organize job queues. Highly inspired by gue Features Durable job storage At-least-ones

null 1 Nov 16, 2021
Redisq is a queue-over-redis that provides simple way to works with queues stored in Redis.

Redisq is a queue-over-redis that provides simple way to works with queues stored in Redis.

null 0 Feb 10, 2022
A simple job scheduler backed by Postgres.

A simple job scheduler backed by Postgres used in production at https://operand.ai. Setup needs two environment variables, SECRET and ENDPOINT. The se

Morgan Gallant 10 Jul 26, 2022
goInterLock is golang job/task scheduler with distributed locking mechanism (by Using Redis🔒).

goInterLock is golang job/task scheduler with distributed locking mechanism. In distributed system locking is preventing task been executed in every instant that has the scheduler,

Jay Ehsaniara 23 Jul 18, 2022
基于 Redis 和 Cron 的定时任务队列

RTask RTask 是 Golang 一款基于 Redis 和 Cron 的定时任务队列。 快速上手 您需要使用 Go Module 导入 RTask 工具包。 go get -u github.com/avtion/rtask 使用教程 package main import ( "con

Avtion 2 Oct 27, 2021
A simple Cron library for go that can execute closures or functions at varying intervals, from once a second to once a year on a specific date and time. Primarily for web applications and long running daemons.

Cron.go This is a simple library to handle scheduled tasks. Tasks can be run in a minimum delay of once a second--for which Cron isn't actually design

Robert K 210 Jul 27, 2022
clockwork - Simple and intuitive job scheduling library in Go.

clockwork A simple and intuitive scheduling library in Go. Inspired by python's schedule and ruby's clockwork libraries. Example use package main imp

null 26 Jul 27, 2022
Lightweight, fast and dependency-free Cron expression parser (due checker) for Golang (tested on v1.13 and above)

adhocore/gronx gronx is Golang cron expression parser ported from adhocore/cron-expr. Zero dependency. Very fast because it bails early in case a segm

Jitendra Adhikari 207 Jul 25, 2022
Simple, zero-dependency scheduling library for Go

go-quartz Simple, zero-dependency scheduling library for Go. About Inspired by the Quartz Java scheduler. Library building blocks Job interface. Any t

Eugene R. 911 Aug 1, 2022
Task Timer (tt) is a dead simple TUI task timer

tasktimer Task Timer (tt) is a dead simple TUI task timer Usage To get started, just run tt: tt You'll be presented with something like this: You can

Carlos Alexandro Becker 238 Jul 30, 2022
Gotask - A simple task queue is stripped when the program is written to achieve the task delivery function

gotask The simple task queue is stripped when the program is written to achieve

SaiRson 4 Feb 14, 2022
Tasqueue is a simple, lightweight distributed job/worker implementation in Go

Tasqueue Tasqueue is a simple, lightweight distributed job/worker implementation in Go Concepts tasqueue.Broker is a generic interface that provides m

Lakshay Kalbhor 241 Jul 25, 2022
A lightweight job scheduler based on priority queue with timeout, retry, replica, context cancellation and easy semantics for job chaining. Build for golang web apps.

Table of Contents Introduction What is RIO? Concern An asynchronous job processor Easy management of these goroutines and chaining them Introduction W

Supratim Samanta 49 May 3, 2022
Run Jobs on a schedule, supports fixed interval, timely, and cron-expression timers; Instrument your processes and expose metrics for each job.

A simple process manager that allows you to specify a Schedule that execute a Job based on a Timer. Schedule manage the state of this job allowing you to start/stop/restart in concurrent safe way. Schedule also instrument this Job and gather metrics and optionally expose them via uber-go/tally scope.

Sherif Abdel-Naby 57 Mar 28, 2022
Marshmallow provides a flexible and performant JSON unmarshalling in Go. It specializes in dealing with unstructured struct - when some fields are known and some aren't, with zero performance overhead nor extra coding needed.

Marshmallow Marshmallow package provides a simple API to perform flexible and performant JSON unmarshalling in Go. Marshmallow specializes in dealing

PerimeterX 184 Aug 8, 2022
Reminder is a Golang package to allow users to schedule alerts.

Reminder is a Golang package to allow users to schedule alerts. It has 4 parts: Scheduler Repeater Notifier Reminder A scheduler takes in a t

null 16 Jan 17, 2022
goCron: A Golang Job Scheduling Package.

goCron: A Golang Job Scheduling Package.

辣椒面 3k Aug 4, 2022