Queue
Queue is a Golang library for spawning and managing a Goroutine pool, Alloowing you to create multiple worker according to limit CPU number of machine.
Features
- Support buffered channel queue.
- Support NSQ (A realtime distributed messaging platform) as backend.
- Support NATS (Connective Technology for Adaptive Edge & Distributed Systems) as backend.
Installation
Install the stable version:
go get github.com/appleboy/queue
Install the latest verison:
go get github.com/appleboy/queue@master
Usage
The first step to create a new job as QueueMessage
interface:
type job struct {
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
The second step to create the new worker, use the buffered channel as an example, you can use the stop
channel to terminate the job immediately after shutdown the queue service if need.
// define the worker
w := simple.NewWorker(
simple.WithQueueNum(taskN),
simple.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
return err
}
}
rets <- v.Message
return nil
}),
)
or use the NSQ as backend, see the worker example:
// define the worker
w := nsq.NewWorker(
nsq.WithAddr("127.0.0.1:4150"),
nsq.WithTopic("example"),
nsq.WithChannel("foobar"),
// concurrent job number
nsq.WithMaxInFlight(10),
nsq.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
return err
}
}
rets <- v.Message
return nil
}),
)
or use the NATS as backend, see the worker example:
w := nats.NewWorker(
nats.WithAddr("127.0.0.1:4222"),
nats.WithSubj("example"),
nats.WithQueue("foobar"),
nats.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
return err
}
}
rets <- v.Message
return nil
}),
)
The third step to create a queue and initialize multiple workers, receive all job messages:
// define the queue
q, err := queue.NewQueue(
queue.WithWorkerCount(5),
queue.WithWorker(w),
)
if err != nil {
log.Fatal(err)
}
// start the five worker
q.Start()
// assign tasks in queue
for i := 0; i < taskN; i++ {
go func(i int) {
q.Queue(&job{
Name: "foobar",
Message: fmt.Sprintf("handle the job: %d", i+1),
})
}(i)
}
// wait until all tasks done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
// shutdown the service and notify all the worker
q.Shutdown()
// wait all jobs are complete.
q.Wait()
Full example code as below or try it in playground.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/appleboy/queue"
"github.com/appleboy/queue/simple"
)
type job struct {
Name string
Message string
}
func (j *job) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}
func main() {
taskN := 100
rets := make(chan string, taskN)
// define the worker
w := simple.NewWorker(
simple.WithQueueNum(taskN),
simple.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
return err
}
}
rets <- "Hi, " + v.Name + ", " + v.Message
return nil
}),
)
// define the queue
q, err := queue.NewQueue(
queue.WithWorkerCount(5),
queue.WithWorker(w),
)
if err != nil {
log.Fatal(err)
}
// start the five worker
q.Start()
// assign tasks in queue
for i := 0; i < taskN; i++ {
go func(i int) {
q.Queue(&job{
Name: "foobar",
Message: fmt.Sprintf("handle the job: %d", i+1),
})
}(i)
}
// wait until all tasks done
for i := 0; i < taskN; i++ {
fmt.Println("message:", <-rets)
time.Sleep(50 * time.Millisecond)
}
// shutdown the service and notify all the worker
q.Shutdown()
// wait all jobs are complete.
q.Wait()
}