ZenQ - A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer

Overview

ZenQ

A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer

Features

  • Much faster than native channels in both SPSC (single-producer-single-consumer) and MPSC (multi-producer-single-consumer) modes in terms of time/op
  • More resource efficient in terms of memory_allocation/op and num_allocations/op evident while benchmarking large batch size inputs
  • Handles the case where NUM_WRITER_GOROUTINES > NUM_CPU_CORES much better than native channels
  • Selection from multiple ZenQs just like golang's select{} ensuring fair selection and no starvation

Benchmarks to support the above claims here

Installation

You need Golang 1.18.x or above since this package uses generics

$ go get github.com/alphadose/[email protected]

Usage

  1. Simple Read/Write
package main

import (
	"fmt"

	"github.com/alphadose/zenq"
)

type payload struct {
	alpha int
	beta  string
}

func main() {
	zq := zenq.New[payload]()

	for j := 0; j < 5; j++ {
		go func() {
			for i := 0; i < 20; i++ {
				zq.Write(payload{
					alpha: i,
					beta:  fmt.Sprint(i),
				})
			}
		}()
	}

	for i := 0; i < 100; i++ {
        	var data payload = zq.Read()
		fmt.Printf("%+v\n", data)
	}
}
  1. Selection from multiple ZenQs just like golang's native select{}. The selection process is fair i.e no single ZenQ gets starved
package main

import (
	"fmt"

	"github.com/alphadose/zenq"
)

type custom1 struct {
	alpha int
	beta  string
}

type custom2 struct {
	gamma int
}

var (
	zq1 = zenq.New[int]()
	zq2 = zenq.New[string]()
	zq3 = zenq.New[custom1]()
	zq4 = zenq.New[*custom2]()
)

func main() {
	go looper(intProducer)
	go looper(stringProducer)
	go looper(custom1Producer)
	go looper(custom2Producer)

	for i := 0; i < 40; i++ {

		// Selection occurs here
		switch data := zenq.Select(zq1, zq2, zq3, zq4).(type) {
		case int:
			fmt.Printf("Received int %d\n", data)
		case string:
			fmt.Printf("Received string %s\n", data)
		case custom1:
			fmt.Printf("Received custom data type number 1 %#v\n", data)
		case *custom2:
			fmt.Printf("Received pointer %#v\n", data)
		}
	}
}

func intProducer(ctr int) { zq1.Write(ctr) }

func stringProducer(ctr int) { zq2.Write(fmt.Sprint(ctr * 10)) }

func custom1Producer(ctr int) { zq3.Write(custom1{alpha: ctr, beta: fmt.Sprint(ctr)}) }

func custom2Producer(ctr int) { zq4.Write(&custom2{gamma: 1 << ctr}) }

func looper(producer func(ctr int)) {
	for i := 0; i < 10; i++ {
		producer(i)
	}
}

Benchmarks

Benchmarking code available here

Note that if you run the benchmarks with --race flag then ZenQ will perform slower because the --race flag slows down the atomic operations in golang. Under normal circumstances, ZenQ will outperform golang native channels.

Hardware Specs

❯ neofetch
                    'c.          [email protected]
                 ,xNMM.          ----------------------
               .OMMMMo           OS: macOS 12.3 21E230 arm64
               OMMM0,            Host: MacBookAir10,1
     .;loddo:' loolloddol;.      Kernel: 21.4.0
   cKMMMMMMMMMMNWMMMMMMMMMM0:    Uptime: 6 hours, 41 mins
 .KMMMMMMMMMMMMMMMMMMMMMMMWd.    Packages: 86 (brew)
 XMMMMMMMMMMMMMMMMMMMMMMMX.      Shell: zsh 5.8
;MMMMMMMMMMMMMMMMMMMMMMMM:       Resolution: 1440x900
:MMMMMMMMMMMMMMMMMMMMMMMM:       DE: Aqua
.MMMMMMMMMMMMMMMMMMMMMMMMX.      WM: Rectangle
 kMMMMMMMMMMMMMMMMMMMMMMMMWd.    Terminal: iTerm2
 .XMMMMMMMMMMMMMMMMMMMMMMMMMMk   Terminal Font: FiraCodeNerdFontComplete-Medium 16 (normal)
  .XMMMMMMMMMMMMMMMMMMMMMMMMK.   CPU: Apple M1
    kMMMMMMMMMMMMMMMMMMMMMMd     GPU: Apple M1
     ;KMMMMMMMWXXWMMMMMMMk.      Memory: 1370MiB / 8192MiB
       .cooc,.    .,coo:.

Terminology

  • NUM_WRITERS -> The number of goroutines concurrently writing to ZenQ/Channel
  • INPUT_SIZE -> The number of input payloads to be passed through ZenQ/Channel from producers to consumer
Computed from benchstat of 30 benchmarks each via go test -benchmem -bench=. benchmarks/simple/*.go

name                                     time/op
_Chan_NumWriters1_InputSize600-8          23.2µs ± 1%
_ZenQ_NumWriters1_InputSize600-8          18.1µs ± 1%
_Chan_NumWriters3_InputSize60000-8        5.52ms ± 3%
_ZenQ_NumWriters3_InputSize60000-8        2.67ms ± 6%
_Chan_NumWriters8_InputSize6000000-8       680ms ± 1%
_ZenQ_NumWriters8_InputSize6000000-8       308ms ± 4%
_Chan_NumWriters100_InputSize6000000-8     1.56s ± 6%
_ZenQ_NumWriters100_InputSize6000000-8     519ms ± 2%
_Chan_NumWriters1000_InputSize7000000-8    1.98s ± 1%
_ZenQ_NumWriters1000_InputSize7000000-8    441ms ±11%
_Chan_Million_Blocking_Writers-8           10.4s ± 3%
_ZenQ_Million_Blocking_Writers-8           8.56s ±24%

name                                     alloc/op
_Chan_NumWriters1_InputSize600-8           0.00B
_ZenQ_NumWriters1_InputSize600-8           0.00B
_Chan_NumWriters3_InputSize60000-8          110B ±68%
_ZenQ_NumWriters3_InputSize60000-8        23.6B ±107%
_Chan_NumWriters8_InputSize6000000-8       585B ±234%
_ZenQ_NumWriters8_InputSize6000000-8       411B ±299%
_Chan_NumWriters100_InputSize6000000-8    44.7kB ±35%
_ZenQ_NumWriters100_InputSize6000000-8    19.7kB ±78%
_Chan_NumWriters1000_InputSize7000000-8    483kB ±10%
_ZenQ_NumWriters1000_InputSize7000000-8  1.13kB ±602%
_Chan_Million_Blocking_Writers-8           553MB ± 0%
_ZenQ_Million_Blocking_Writers-8          95.5MB ± 0%

name                                     allocs/op
_Chan_NumWriters1_InputSize600-8            0.00
_ZenQ_NumWriters1_InputSize600-8            0.00
_Chan_NumWriters3_InputSize60000-8          0.00
_ZenQ_NumWriters3_InputSize60000-8          0.00
_Chan_NumWriters8_InputSize6000000-8       2.20 ±218%
_ZenQ_NumWriters8_InputSize6000000-8       0.90 ±344%
_Chan_NumWriters100_InputSize6000000-8       163 ±18%
_ZenQ_NumWriters100_InputSize6000000-8      47.0 ±79%
_Chan_NumWriters1000_InputSize7000000-8    1.79k ± 6%
_ZenQ_NumWriters1000_InputSize7000000-8    2.00 ±550%
_Chan_Million_Blocking_Writers-8           2.00M ± 0%
_ZenQ_Million_Blocking_Writers-8            995k ± 0%

The above results show that ZenQ is more efficient than channels in all 3 metrics i.e time/op, mem_alloc/op and num_allocs/op for the following tested cases:-

  1. SPSC
  2. MPSC with NUM_WRITER_GOROUTINES < NUM_CPU_CORES
  3. MPSC with NUM_WRITER_GOROUTINES > NUM_CPU_CORES

Cherry on the Cake

In SPSC mode ZenQ is faster than channels by 98 seconds in case of input size 6 * 108

❯ go run benchmarks/simple/main.go

With Input Batch Size: 60 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 64.875µs
ZenQ Runner completed transfer in: 9µs
====================================================================

With Input Batch Size: 600 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 70.958µs
ZenQ Runner completed transfer in: 44.958µs
====================================================================

With Input Batch Size: 6000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 967.417µs
ZenQ Runner completed transfer in: 518.916µs
====================================================================

With Input Batch Size: 6000000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 1.191589458s
ZenQ Runner completed transfer in: 144.895583ms
====================================================================

With Input Batch Size: 600000000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 1m52.671809708s
ZenQ Runner completed transfer in: 14.356517042s
====================================================================

For a select{} based transfer experiment these are the results

❯ go run benchmarks/selector/main.go

Chan Select Runner completed transfer in: 2m42.313942333s
ZenQ Select Runner completed transfer in: 41.938121583s

Code available here

Issues
  • fatal error: casgstatus: waiting for Gwaiting but is Grunnable

    fatal error: casgstatus: waiting for Gwaiting but is Grunnable

    A fatal error occurs during benchmarking

    fatal error: casgstatus: waiting for Gwaiting but is Grunnable

    Benchmark Code: https://github.com/lemon-mint/golang-q-benchmark go version go1.18.4 windows/amd64

    goos: windows
    goarch: amd64
    pkg: github.com/lemon-mint/golang-q-benchmark
    cpu: Intel(R) Core(TM) i7-4790K CPU @ 4.00GHz
    BenchmarkZenQ-8   	fatal error: casgstatus: waiting for Gwaiting but is Grunnable
    
    runtime stack:
    runtime.throw({0x930baf?, 0xc000084000?})
    	C:/Program Files/Go/src/runtime/panic.go:992 +0x76
    runtime.casgstatus(0xc0002844e0, 0x4, 0x1)
    	C:/Program Files/Go/src/runtime/proc.go:978 +0x385
    runtime.ready(0xc0002844e0, 0xc0002844e0?, 0x10?)
    	C:/Program Files/Go/src/runtime/proc.go:857 +0x71
    runtime.goready.func1()
    	C:/Program Files/Go/src/runtime/proc.go:372 +0x26
    runtime.systemstack()
    	C:/Program Files/Go/src/runtime/asm_amd64.s:469 +0x4e
    
    goroutine 87 [running]:
    runtime.systemstack_switch()
    	C:/Program Files/Go/src/runtime/asm_amd64.s:436 fp=0xc0003f1df8 sp=0xc0003f1df0 pc=0x8531e0
    runtime.goready(0x85df1e?, 0xc00040fce0?)
    	C:/Program Files/Go/src/runtime/proc.go:371 +0x47 fp=0xc0003f1e28 sp=0xc0003f1df8 pc=0x82a547
    github.com/alphadose/zenq/v2.safe_ready(0x8f1785?)
    	G:/go/pkg/mod/github.com/alphadose/zenq/[email protected]/lib_runtime_linkage.go:176 +0x54 fp=0xc0003f1e50 sp=0xc0003f1e28 pc=0x8efe54
    github.com/alphadose/zenq/v2.(*ThreadParker[...]).Ready(0xc000442860?)
    	G:/go/pkg/mod/github.com/alphadose/zenq/[email protected]/thread_parker.go:64 +0x95 fp=0xc0003f1eb8 sp=0xc0003f1e50 pc=0x8f1855
    github.com/alphadose/zenq/v2.(*ZenQ[...]).Read(0xc000072640)
    	G:/go/pkg/mod/github.com/alphadose/zenq/[email protected]/zenq.go:188 +0xfa fp=0xc0003f1f20 sp=0xc0003f1eb8 pc=0x8f1a3a
    github.com/lemon-mint/golang-q-benchmark.BenchmarkZenQ.func1(0xc00020c020)
    	g:/work/golang-q-benchmark/main_test.go:24 +0xb8 fp=0xc0003f1f80 sp=0xc0003f1f20 pc=0x8f0958
    testing.(*B).RunParallel.func1()
    	C:/Program Files/Go/src/testing/benchmark.go:788 +0xcb fp=0xc0003f1fe0 sp=0xc0003f1f80 pc=0x8ab0cb
    runtime.goexit()
    	C:/Program Files/Go/src/runtime/asm_amd64.s:1571 +0x1 fp=0xc0003f1fe8 sp=0xc0003f1fe0 pc=0x855541
    created by testing.(*B).RunParallel
    	C:/Program Files/Go/src/testing/benchmark.go:781 +0x105
    
    goroutine 1 [chan receive]:
    testing.(*B).doBench(0xc00013c240)
    	C:/Program Files/Go/src/testing/benchmark.go:285 +0x7f
    testing.(*benchContext).processBench(0xc0000040f0, 0x238?)
    	C:/Program Files/Go/src/testing/benchmark.go:589 +0x3aa
    testing.(*B).run(0xc00013c240?)
    	C:/Program Files/Go/src/testing/benchmark.go:276 +0x67
    testing.(*B).Run(0xc00013c000, {0x9280ec?, 0x2f82bb486508?}, 0x932a68)
    	C:/Program Files/Go/src/testing/benchmark.go:677 +0x453
    testing.runBenchmarks.func1(0xc00013c000?)
    	C:/Program Files/Go/src/testing/benchmark.go:550 +0x6e
    testing.(*B).runN(0xc00013c000, 0x1)
    	C:/Program Files/Go/src/testing/benchmark.go:193 +0x102
    testing.runBenchmarks({0x92f7db, 0x28}, 0xa69b60?, {0xa0f3e0, 0x4, 0x40?})
    	C:/Program Files/Go/src/testing/benchmark.go:559 +0x3f2
    testing.(*M).Run(0xc0000701e0)
    	C:/Program Files/Go/src/testing/testing.go:1726 +0x811
    main.main()
    	_testmain.go:53 +0x1aa
    

    image

    opened by lemon-mint 4
  • Running with --race causes a crash although no race conditions were detected

    Running with --race causes a crash although no race conditions were detected

    Most likely due to handling the goroutine pointer (stored as unsafe.Pointer) without

    if race.Enabled {
    race.Acquire(ptr)
    }
    

    but the race internal package cannot be imported as a module

    opened by alphadose 0
  • More Benchmarks for your feedback

    More Benchmarks for your feedback

    Hi, here is some feedback for you to get results on different architecture Ubuntu 22.04 AMD Ryzen Threadripper 3960X 128 GB RAM

    Simple

    ~/go/pkg/mod/github.com/alphadose/[email protected]$ go run benchmarks/simple/main.go
    With Input Batch Size: 60 and Num Concurrent Writers: 1
    
    Native Channel Runner completed transfer in: 11.462µs
    ZenQ Runner completed transfer in: 4.739µs
    ====================================================================
    
    With Input Batch Size: 600 and Num Concurrent Writers: 1
    
    Native Channel Runner completed transfer in: 32.522µs
    ZenQ Runner completed transfer in: 28.795µs
    ====================================================================
    
    With Input Batch Size: 6000 and Num Concurrent Writers: 1
    
    Native Channel Runner completed transfer in: 449.323µs
    ZenQ Runner completed transfer in: 254.583µs
    ====================================================================
    
    With Input Batch Size: 6000000 and Num Concurrent Writers: 1
    
    Native Channel Runner completed transfer in: 365.257669ms
    ZenQ Runner completed transfer in: 447.14544ms
    ====================================================================
    
    With Input Batch Size: 600000000 and Num Concurrent Writers: 1
    
    Native Channel Runner completed transfer in: 38.346896012s
    ZenQ Runner completed transfer in: 48.59434176s
    ====================================================================
    
    

    Selector

    Chan Select Runner completed transfer in: 4m32.220766401s
    ZenQ Select Runner did not complete transfer after 30min+ (tested both Ubuntu and Windows)
    
    opened by jdvjdv82 19
Releases(v2.7.1)
  • 2.1.1(May 24, 2022)

    Improve the performance of select by adding direct_send and optimistic first pass read approach

    ZenQ Select() is now comparable to channels although not as fast for large batch sizes

    To improve ZenQ's selection performance even beyond this, the goroutine pointer itself has to be mutated for even faster data transfer during selections

    With Input Batch Size: 60 and Num Concurrent Writers: 4
    
    Chan Select Runner completed transfer in: 55.375µs
    ZenQ Select Runner completed transfer in: 56.125µs
    ====================================================================
    
    With Input Batch Size: 600 and Num Concurrent Writers: 4
    
    Chan Select Runner completed transfer in: 181.334µs
    ZenQ Select Runner completed transfer in: 432.75µs
    ====================================================================
    
    With Input Batch Size: 6000 and Num Concurrent Writers: 4
    
    Chan Select Runner completed transfer in: 936.042µs
    ZenQ Select Runner completed transfer in: 4.410916ms
    ====================================================================
    
    With Input Batch Size: 600000 and Num Concurrent Writers: 4
    
    Chan Select Runner completed transfer in: 140.836375ms
    ZenQ Select Runner completed transfer in: 380.546875ms
    ====================================================================
    
    Source code(tar.gz)
    Source code(zip)
  • 2.0.0(May 24, 2022)

    • Selection added, no polling
    • Queue closing added, (now every call to Read() will also return whether the queue is currently exhausted or not)
    • Performance improved
    Source code(tar.gz)
    Source code(zip)
  • 1.5.0(May 9, 2022)

    name                                     old time/op    new time/op    delta
    _ZenQ_NumWriters1_InputSize600-8           16.5µs ± 1%    17.9µs ± 1%   +8.65%  (p=0.000 n=28+29)
    _ZenQ_NumWriters3_InputSize60000-8         2.85ms ± 0%    2.67ms ± 6%   -6.11%  (p=0.000 n=23+30)
    _ZenQ_NumWriters8_InputSize6000000-8        417ms ± 0%     313ms ± 5%  -24.83%  (p=0.000 n=23+29)
    _ZenQ_NumWriters100_InputSize6000000-8      741ms ± 3%     516ms ± 2%  -30.40%  (p=0.000 n=29+30)
    _ZenQ_NumWriters1000_InputSize7000000-8     1.05s ± 1%     0.45s ± 9%  -57.58%  (p=0.000 n=28+30)
    _ZenQ_Million_Blocking_Writers-8            7.01s ±44%    10.98s ± 4%  +56.54%  (p=0.000 n=30+28)
    
    name                                     old alloc/op   new alloc/op   delta
    _ZenQ_NumWriters1_InputSize600-8            0.00B          0.00B          ~     (all equal)
    _ZenQ_NumWriters3_InputSize60000-8         28.9B ±111%    34.8B ±127%     ~     (p=0.268 n=30+29)
    _ZenQ_NumWriters8_InputSize6000000-8        885B ±163%     671B ±222%     ~     (p=0.208 n=30+30)
    _ZenQ_NumWriters100_InputSize6000000-8     16.2kB ±66%   13.3kB ±100%     ~     (p=0.072 n=30+30)
    _ZenQ_NumWriters1000_InputSize7000000-8    62.4kB ±82%    2.4kB ±210%  -96.20%  (p=0.000 n=30+30)
    _ZenQ_Million_Blocking_Writers-8           95.9MB ± 0%    95.5MB ± 0%   -0.41%  (p=0.000 n=28+30)
    
    name                                     old allocs/op  new allocs/op  delta
    _ZenQ_NumWriters1_InputSize600-8             0.00           0.00          ~     (all equal)
    _ZenQ_NumWriters3_InputSize60000-8           0.00           0.00          ~     (all equal)
    _ZenQ_NumWriters8_InputSize6000000-8        2.07 ±142%     1.40 ±186%     ~     (p=0.081 n=30+30)
    _ZenQ_NumWriters100_InputSize6000000-8       53.5 ±50%     31.8 ±100%  -40.60%  (p=0.000 n=30+30)
    _ZenQ_NumWriters1000_InputSize7000000-8       525 ±39%        6 ±227%  -98.95%  (p=0.000 n=30+30)
    _ZenQ_Million_Blocking_Writers-8            1.00M ± 0%     0.99M ± 0%   -0.41%  (p=0.000 n=28+29)
    
    Source code(tar.gz)
    Source code(zip)
  • 1.4.0(May 6, 2022)

  • 1.3.0(May 4, 2022)

    In previous version of ZenQ, the tests timed out in cases of a million number of goroutines but this is handled in this version by parking extra goroutines

    name                                     old time/op    new time/op    delta
    _Chan_NumWriters1_InputSize600-8           23.2µs ± 0%    24.2µs ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters1_InputSize600-8           16.3µs ± 0%    16.7µs ± 0%   ~     (p=1.000 n=1+1)
    _Chan_NumWriters3_InputSize60000-8         5.55ms ± 0%    6.19ms ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters3_InputSize60000-8         2.80ms ± 0%    2.85ms ± 0%   ~     (p=1.000 n=1+1)
    _Chan_NumWriters8_InputSize6000000-8        694ms ± 0%     730ms ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters8_InputSize6000000-8        446ms ± 0%     421ms ± 0%   ~     (p=1.000 n=1+1)
    _Chan_NumWriters100_InputSize6000000-8      1.59s ± 0%     1.60s ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters100_InputSize6000000-8      490ms ± 0%     736ms ± 0%   ~     (p=1.000 n=1+1)
    _Chan_NumWriters1000_InputSize7000000-8     1.97s ± 0%     1.97s ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters1000_InputSize7000000-8     769ms ± 0%    1052ms ± 0%   ~     (p=1.000 n=1+1)
    
    
    name                                     old alloc/op   new alloc/op   delta
    _Chan_NumWriters1_InputSize600-8            0.00B          0.00B        ~     (all equal)
    _ZenQ_NumWriters1_InputSize600-8            0.00B          0.00B        ~     (all equal)
    _Chan_NumWriters3_InputSize60000-8          63.0B ± 0%     96.0B ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters3_InputSize60000-8          44.0B ± 0%      0.0B        ~     (p=1.000 n=1+1)
    _Chan_NumWriters8_InputSize6000000-8       1.76kB ± 0%    0.24kB ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters8_InputSize6000000-8       1.52kB ± 0%    0.31kB ± 0%   ~     (p=1.000 n=1+1)
    _Chan_NumWriters100_InputSize6000000-8     51.2kB ± 0%    35.9kB ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters100_InputSize6000000-8     5.27kB ± 0%   18.94kB ± 0%   ~     (p=1.000 n=1+1)
    _Chan_NumWriters1000_InputSize7000000-8     492kB ± 0%     492kB ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters1000_InputSize7000000-8    16.6kB ± 0%    34.3kB ± 0%   ~     (p=1.000 n=1+1)
    
    name                                     old allocs/op  new allocs/op  delta
    _Chan_NumWriters1_InputSize600-8             0.00           0.00        ~     (all equal)
    _ZenQ_NumWriters1_InputSize600-8             0.00           0.00        ~     (all equal)
    _Chan_NumWriters3_InputSize60000-8           0.00           0.00        ~     (all equal)
    _ZenQ_NumWriters3_InputSize60000-8           0.00           0.00        ~     (all equal)
    _Chan_NumWriters8_InputSize6000000-8         5.00 ± 0%      2.00 ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters8_InputSize6000000-8         3.00 ± 0%      1.00 ± 0%   ~     (p=1.000 n=1+1)
    _Chan_NumWriters100_InputSize6000000-8        163 ± 0%       147 ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters100_InputSize6000000-8       12.0 ± 0%      49.0 ± 0%   ~     (p=1.000 n=1+1)
    _Chan_NumWriters1000_InputSize7000000-8     1.81k ± 0%     1.77k ± 0%   ~     (p=1.000 n=1+1)
    _ZenQ_NumWriters1000_InputSize7000000-8      40.0 ± 0%     357.0 ± 0%   ~     (p=1.000 n=1+1)
    
    Source code(tar.gz)
    Source code(zip)
  • 1.2.0(May 2, 2022)

    By changing the spinning lastCommittedIndex to slot based CAS operations, we gain a lot of performance as compared to channels for the following tested cases:-

    1. Single Producer Single Consumer (gain of around 94 seconds with input size 6 * 108)
    2. Multi Producer Single Consumer where NUM_WRITER_GOROUTINES < NUM_CPU_CORES
    3. Multi Producer Single Consumer where NUM_WRITER_GOROUTINES > NUM_CPU_CORES as illustrated below
    Benchmark_Chan_NumWriters1000_InputSize7000000-8    1  1984590667 ns/op  497344 B/op   1817 allocs/op
    Benchmark_ZenQ_NumWriters1000_InputSize7000000-8    2   777197480 ns/op    8736 B/op     21 allocs/op
    
    Source code(tar.gz)
    Source code(zip)
Owner
Anish Mukherjee
A wanderer in the world of code
Anish Mukherjee
Machinery is an asynchronous task queue/job queue based on distributed message passing.

Machinery Machinery is an asynchronous task queue/job queue based on distributed message passing. V2 Experiment First Steps Configuration Lock Broker

Richard Knop 6.4k Aug 8, 2022
A Multi Consumer per Message Queue with persistence and Queue Stages.

CrimsonQ A Multi Consumer per Message Queue with persistence and Queue Stages. Under Active Development Crimson Queue allows you to have multiple cons

Yousef Wadi 11 Jul 30, 2022
Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Gue is Golang queue on top of PostgreSQL that uses transaction-level locks.

Vladimir Garvardt 96 Aug 5, 2022
Golang Delay Queue

[gdq] Golang Delay Queue GDQ is a library that leverage db or cache to be setup as a delay queue. For current version, Only redis can adapt to this li

Patrick Maurits Sangian 1 Jan 15, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Ken Hibino 3.8k Aug 1, 2022
RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue

RapidMQ RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue in the Go programming langu

Vadim Shakun 63 Apr 24, 2022
redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue redisqueue provides a producer and consumer of a queue that uses Redis streams. Features A Producer struct to make enqueuing messages easy.

Robin Joseph 84 Aug 1, 2022
Uniqush is a free and open source software system which provides a unified push service for server side notification to apps on mobile devices.

Homepage Download Blog/News @uniqush Introduction Uniqush (\ˈyü-nə-ku̇sh\ "uni" pronounced as in "unified", and "qush" pronounced as in "cushion") is

Uniqush 1.4k Jul 31, 2022
A single binary, simple, message queue.

MiniQueue A stupid simple, single binary message queue using HTTP/2. Most messaging workloads don't require enormous amounts of data, endless features

Tom Arrell 120 Aug 3, 2022
dque is a fast, embedded, durable queue for Go

dque - a fast embedded durable queue for Go dque is: persistent -- survives program restarts scalable -- not limited by your RAM, but by your disk spa

Jon Carlson 702 Aug 3, 2022
Queue with NATS Jetstream to remove all the erlangs from cloud

Saf in Persian means Queue. One of the problems, that we face on projects with queues is deploying RabbitMQ on the cloud which brings us many challenges for CPU load, etc. I want to see how NATS with Jetstream can work as the queue to replace RabbitMQ.

Parham Alvani 11 Jan 7, 2022
A fast durable queue for Go

pqueue - a fast durable queue for Go pqueue is thread-safety, serves environments where more durability is required (e.g., outages last longer than me

Linh Tran Tuan 12 Jun 10, 2022
Redis as backend for Queue Package

redis Redis as backend for Queue package Setup start the redis server redis-server start the redis cluster, see the config # server 01 mkdir server01

golang-queue 10 Apr 23, 2022
NSQ as backend for Queue Package

NSQ as backend for Queue Package

golang-queue 10 Jul 4, 2022
Kudruk helps you to create queue channels and manage them gracefully.

kudruk Channels are widely used as queues. kudruk (means queue in Turkish) helps you to easily create queue with channel and manage the data in the qu

Erhan Yakut 8 Feb 21, 2022
Chanman helps you to create queue channels and manage them gracefully.

chanman Channels are widely used as queues. chanman (Channel Manager) helps you to easily create queue with channel and manage the data in the queue.

Erhan Yakut 9 Oct 16, 2021
A simple persistent directory-backed FIFO queue.

pqueue pqueue is a simple persistent directory-backed FIFO queue. It provides the typical queue interface Enqueue and Dequeue and may store any byte s

Philipp C. Heckel 4 May 7, 2022
Persistent queue in Go based on BBolt

Persistent queue Persistent queue based on bbolt DB. Supposed to be used as embeddable persistent queue to any Go application. Features: messages are

Aleksandr Baryshnikov 2 Jun 30, 2022
A lightweight, distributed and reliable message queue based on Redis

nmq A lightweight, distributed and reliable message queue based on Redis Get Started Download go get github.com/inuggets/nmq Usage import "github.com

Nuggets 2 Nov 22, 2021