Tasqueue is a simple, lightweight distributed job/worker implementation in Go

Overview

Tasqueue

Tasqueue is a simple, lightweight distributed job/worker implementation in Go

Concepts

  • tasqueue.Broker is a generic interface that provides methods to enqueue and consume messages from a queue. Currently supported brokers are redis and nats-jetstream.
  • tasqueue.Results is a generic interface that provides methods to store the state/results of task messages. Currently supported result stores are redis and nats-jetstream.
  • tasqueue.Handler is a function type that accepts []byte payloads. Users need to register such handlers with the server. It is upto the handler to decode (if required) the []byte messages and process them in any manner.
  • tasqueue.Task holds the data for a basic unit of work. This data includes the handler name which will process the task, a []byte payload (encoded in any manner, if required). More options described below.

Basic example

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"

	"github.com/kalbhor/tasqueue"
	redis_broker "github.com/kalbhor/tasqueue/brokers/redis"
	redis_results "github.com/kalbhor/tasqueue/results/redis"
)

type SumPayload struct {
	Arg1 int `json:"arg1"`
	Arg2 int `json:"arg2"`
}

// SumProcessor prints the sum of two integer arguements.
func SumProcessor(b []byte) error {
	var pl SumPayload
	if err := json.Unmarshal(b, &pl); err != nil {
		return err
	}

	fmt.Println(pl.Arg1 + pl.Arg2)

	return nil
}

func main() {
	// Create a new tasqueue server with redis results & broker.
	srv := tasqueue.NewServer(redis_broker.New(redis_broker.Options{
		Addrs:    []string{"127.0.0.1:6379"},
		Password: "",
		DB:       0,
	}), redis_results.New(redis_results.Options{
		Addrs:    []string{"127.0.0.1:6379"},
		Password: "",
		DB:       0,
	}))

	// Register a handler called "add"
	srv.RegisterHandler("add", SumProcessor)

	// Encode the payload passed to the handler and create a task.
	b, _ := json.Marshal(SumPayload{Arg1: 5, Arg2: 4})
	t, err := tasqueue.NewTask("add", b)
	if err != nil {
		log.Fatal(err)
	}

	ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)

	// Place the task
	srv.AddTask(ctx, t)

	// Start the tasqueue workers. (blocking function)
	srv.Start(ctx)

	fmt.Println("exit..")
}

Features and Options

Task

Features

  • Task chains : A new "chain" of tasks can be formed by using tasqueue.NewChain(tasks ...*Task). Each subsequent task will be placed after the successful execution of current task.

Options

  • Cron based schedule : tasqueue.Schedule("* * * * *")
  • Custom Queue (important to run server on custom queue as well) : tasqueue.CustomQueue("custom-q")
  • Custom value for maximum retries : tasqueue.MaxRetry(5)

Options can be passed while creating a new task : func NewTask(handler string, payload []byte, opts ...Opts)

Server/Worker

Options

  • Custom Queue (important to set on task as well) : tasqueue.CustomQueue("custom-q")
  • Custom concurrency : tasqueue.Concurrency(5)

Options can be passed while starting the server worker. func (s *Server) Start(ctx context.Context, opts ...Opts)

Comments
  • Replacing logrus with logf.

    Replacing logrus with logf.

    • Replaces logrus with logf.
    • Modifies the NewServer API to take a logger from the user.
    • Modifies brokers,results backends to explicitly take a logf logger.

    Closes #15

    opened by iamd3vil 3
  • Configuring concurrency per task

    Configuring concurrency per task

    Currently Concurrency and Queue is part of ServerOpts. This restricts all tasks registered on the server to have the same concurrency. The only way to configure different concurrency is to have a different server per task. I think it would be better if it's possible to define concurrency per task. This applies to the Queue as well. We can have different queues for each task as well. This would make it more efficient than everything in the same queue.

    What are your thoughts on this?

    opened by iamd3vil 3
  • Tasqueue fails to consume from Redis task queue

    Tasqueue fails to consume from Redis task queue

    I ran into this error while running the Redis example. I've logged the GroupMessage for inspecting the details of the group.

    Logs:

    INFO[0000] added handler: add                           
    INFO[0000] starting processor..                         
    INFO[0000] starting processor..                         
    INFO[0000] starting processor..                         
    INFO[0000] starting processor..                         
    INFO[0000] starting processor..                         
    INFO[0000] starting task consumer..                     
    INFO[0000] receiving from consumer..                    
    INFO[0000] receiving from consumer..                    
    INFO[0000] receiving from consumer..                    
    INFO[0000] receiving from consumer..                    
    INFO[0001] getting job : 10f6330e-535e-48d3-81f1-dd63a3b71ac9 
    INFO[0001] getting job : 4362c034-d30d-496b-81d6-42258409b341 
    INFO[0001] getting job : 9c4bee18-9314-4f20-b1ac-4e991149905d 
    {GroupMeta:{UUID:ca2fb538-c350-46af-b578-adde150befc0 Status:successful JobStatus:map[10f6330e-535e-48d3-81f1-dd63a3b71ac9:successful 4362c034-d30d-496b-81d6-42258409b341:successful 9c4bee18-9314-4f20-b1ac-4e991149905d:successful]} Group:0xc00000e2b8}
    ERRO[0001] error consuming from redis queue: redis: nil 
    INFO[0001] tasqueue:tasks: no tasks to consume..        
    INFO[0001] receiving from consumer..                    
    {GroupMeta:{UUID:ca2fb538-c350-46af-b578-adde150befc0 Status:successful JobStatus:map[10f6330e-535e-48d3-81f1-dd63a3b71ac9:successful 4362c034-d30d-496b-81d6-42258409b341:successful 9c4bee18-9314-4f20-b1ac-4e991149905d:successful]} Group:0xc00000e408}
    ERRO[0002] error consuming from redis queue: redis: nil 
    INFO[0002] tasqueue:tasks: no tasks to consume..        
    INFO[0002] receiving from consumer..                    
    {GroupMeta:{UUID:ca2fb538-c350-46af-b578-adde150befc0 Status:successful JobStatus:map[10f6330e-535e-48d3-81f1-dd63a3b71ac9:successful 4362c034-d30d-496b-81d6-42258409b341:successful 9c4bee18-9314-4f20-b1ac-4e991149905d:successful]} Group:0xc00000e4e0}
    
    opened by burntcarrot 2
  • Replacing logrus with logf

    Replacing logrus with logf

    Should we replace logrus with logf (github.com/zerodha/logf) ?

    Also there should be a way to configure logging for both the lib and all the brokers, result backends. Currently we just do INFO messages which users of the lib might not want.

    If you are okay with this, I will send a PR and we can discuss the API.

    opened by iamd3vil 1
  • Concurrency and Queue per Task

    Concurrency and Queue per Task

    This PR removes Concurrency & Queue from ServerOpts and adds them to TaskOpts. This allows configuring concurrency & queue per task.

    Closes #5

    P.S: I updated README.md and example. Also ran an autoformatter on README.md.

    opened by iamd3vil 1
  • Add previous job results to JobCtx in case a job needs the results from the previous job in the chain

    Add previous job results to JobCtx in case a job needs the results from the previous job in the chain

    This PR adds PrevJobResults to Meta which is passed to the next job in a chain.

    Also I have modified the Enqueue method to use added enqueueWithMeta method. This was we can modify the meta and still reuse the function. Let me know if this looks good or any changes you think are needed.

    opened by iamd3vil 1
  •  Add TTL to job, managed by the library

    Add TTL to job, managed by the library

    It would be useful to add a TTL field to any job, past which the job can be "expired". All of this should be broker/store agnostic and managed by the library itself.

    I have jotted below few approaches from the top of my head

    1. Using a "janitor" go-routine periodically checking for stale jobs. This will require additional methods from the store Store must be able to do one of :
    • Provide a method to query jobs based on the ttl field
    • Provide a wildcard scan method to read all jobs (the library will iterate and check each job)
    1. The library can setup a scheduled job to mark a job as expired. Tests will have to be performed to check the reliability and overhead of such a scheduling (since this may spawn a scheduled job for each new job).

    An additional Dequeue method will be required in all brokers for the above methods to work

    There is a wip branch available at feat/ttl.

    enhancement 
    opened by kalbhor 0
Owner
Lakshay Kalbhor
Python & Go programmer
Lakshay Kalbhor
Job worker service that provides an API to run arbitrary Linux processes.

Job Scheduler Summary Prototype job worker service that provides an API to run arbitrary Linux processes. Overview Library The library (Worker) is a r

Renato Guimarães 8 May 26, 2022
Executes jobs in separate GO routines. Provides Timeout, StartTime controls. Provides Cancel all running job before new job is run.

jobExecutor Library to execute jobs in GO routines. Provides for Job Timeout/Deadline (MaxDuration()) Job Start WallClock control (When()) Add a job b

Eswaran SK 0 Jan 10, 2022
A zero-dependencies and lightweight go library for job scheduling

A zero-dependencies and lightweight go library for job scheduling.

null 3 Aug 3, 2022
A programmable, observable and distributed job orchestration system.

?? Overview Odin is a programmable, observable and distributed job orchestration system which allows for the scheduling, management and unattended bac

James McDermott 445 Sep 27, 2022
Machinery is an asynchronous task queue/job queue based on distributed message passing.

Machinery Machinery is an asynchronous task queue/job queue based on distributed message passing. V2 Experiment First Steps Configuration Lock Broker

Richard Knop 6.5k Sep 14, 2022
goInterLock is golang job/task scheduler with distributed locking mechanism (by Using Redis🔒).

goInterLock is golang job/task scheduler with distributed locking mechanism. In distributed system locking is preventing task been executed in every instant that has the scheduler,

Jay Ehsaniara 26 Sep 19, 2022
clockwork - Simple and intuitive job scheduling library in Go.

clockwork A simple and intuitive scheduling library in Go. Inspired by python's schedule and ruby's clockwork libraries. Example use package main imp

null 26 Jul 27, 2022
Simple job queues for Go backed by Redis

bokchoy Introduction Bokchoy is a simple Go library for queueing tasks and processing them in the background with workers. It should be integrated in

Florent Messa 255 Jul 13, 2022
A simple job scheduler backed by Postgres.

A simple job scheduler backed by Postgres used in production at https://operand.ai. Setup needs two environment variables, SECRET and ENDPOINT. The se

Morgan Gallant 12 Sep 10, 2022
You had one job, or more then one, which can be done in steps

Leprechaun Leprechaun is tool where you can schedule your recurring tasks to be performed over and over. In Leprechaun tasks are recipes, lets observe

Strahinja 88 Aug 26, 2022
Job scheduling made easy.

scheduler Job scheduling made easy. Scheduler allows you to schedule recurrent jobs with an easy-to-read syntax. Inspired by the article Rethinking Cr

Carles Cerezo Guzmán 397 Sep 14, 2022
goCron: A Golang Job Scheduling Package.

goCron: A Golang Job Scheduling Package.

辣椒面 3.1k Sep 25, 2022
Run Jobs on a schedule, supports fixed interval, timely, and cron-expression timers; Instrument your processes and expose metrics for each job.

A simple process manager that allows you to specify a Schedule that execute a Job based on a Timer. Schedule manage the state of this job allowing you to start/stop/restart in concurrent safe way. Schedule also instrument this Job and gather metrics and optionally expose them via uber-go/tally scope.

Sherif Abdel-Naby 57 Mar 28, 2022
golang job dispatcher

go-gearman The shardingkey is hashed to the same queue, each of which is bound to a worker.

fengyun.rui 17 Apr 4, 2022
xxl-job 对应的golang客户端

xxl-job-go-client xxl-job 对应的golang客户端 提供Elasticsearch 日志组件,把job执行过程写入elasticsearch方便跟踪查询 func main() { exec := xxl.NewExecutor( xxl.ServerAd

Ronnie 6 Aug 26, 2022
a self terminating concurrent job queue for indeterminate workloads in golang

jobtracker - a self terminating concurrent job queue for indeterminate workloads in golang This library is primarily useful for technically-recursive

maia tillie arson crimew 9 Sep 6, 2022
Cloud-native, enterprise-level cron job platform for Kubernetes

Furiko Furiko is a cloud-native, enterprise-level cron and adhoc job platform for Kubernetes. The main website for documentation and updates is hosted

Furiko 190 Sep 11, 2022
Lightweight, fast and dependency-free Cron expression parser (due checker) for Golang (tested on v1.13 and above)

adhocore/gronx gronx is Golang cron expression parser ported from adhocore/cron-expr. Zero dependency. Very fast because it bails early in case a segm

Jitendra Adhikari 212 Sep 19, 2022
Distributed Task Scheduling System|分布式定时任务调度平台

Crocodile Distributed Task Scheduling System English | 中文 Introduction A distributed task scheduling system based on Golang that supports http request

labulaka521 869 Sep 20, 2022