Go client to reliable queues based on Redis Cluster Streams

Overview

Ami

Go client to reliable queues based on Redis Cluster Streams.

Godoc Coverage Status Go Report Card Go

Consume/produce performance

Performance is dependent from:

  • Redis Cluster nodes count;
  • ping RTT from client to Redis Cluster master nodes;
  • network speed between nodes;
  • message sizes;
  • Ami configuration.

As example, 10-nodes Redis Cluster with half of nodes in other datacenter (50 msec ping), 1 master/1 slave, with message "{}" got:

$ go run examples/performance/main.go
Produced 1000000 in 3.423883 sec, rps 292066.022156
Consumed 151000 in 1.049238 sec, rps 143913.931722
Acked 151000 in 0.973587 sec, rps 155096.612263

Producer example

	type errorLogger struct{}

	func (l *errorLogger) AmiError(err error) {
		println("Got error from Ami:", err.Error())
	}

	pr, err := ami.NewProducer(
		ami.ProducerOptions{
			ErrorNotifier:     &errorLogger{},
			Name:              "ruthie",
			PendingBufferSize: 10000000,
			PipeBufferSize:    50000,
			PipePeriod:        time.Microsecond * 1000,
			ShardsCount:       10,
		},
		&redis.ClusterOptions{
			Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
			ReadTimeout:  time.Second * 60,
			WriteTimeout: time.Second * 60,
		},
	)
	if err != nil {
		panic(err)
	}

	for i := 0; i < 10000; i++ {
		pr.Send("{}")
	}

	pr.Close()

Consumer example

	type errorLogger struct{}

	func (l *errorLogger) AmiError(err error) {
		println("Got error from Ami:", err.Error())
	}

	cn, err := ami.NewConsumer(
		ami.ConsumerOptions{
			Consumer:          "alice",
			ErrorNotifier:     &errorLogger{},
			Name:              "ruthie",
			PendingBufferSize: 10000000,
			PipeBufferSize:    50000,
			PipePeriod:        time.Microsecond * 1000,
			PrefetchCount:     100,
			ShardsCount:       10,
		},
		&redis.ClusterOptions{
			Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
			ReadTimeout:  time.Second * 60,
			WriteTimeout: time.Second * 60,
		},
	)
	if err != nil {
		panic(err)
	}

	c := cn.Start()

	wg := sync.WaitGroup{}
	wg.Add(1)

	go func() {
		for {
			m, more := <-c
			if !more {
				break
			}
			println("Got", m.Body, "ID", m.ID)
			cn.Ack(m)
		}
		wg.Done()
	}()

	time.Sleep(time.Second)

	cn.Stop()
	wg.Wait()

	cn.Close()
You might also like...
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Golang push server cluster
Golang push server cluster

gopush-cluster gopush-cluster is a go push server cluster. Features light weight high performance pure golang implementation message expired offline m

Redis as backend for Queue Package
Redis as backend for Queue Package

redis Redis as backend for Queue package Setup start the redis server redis-server start the redis cluster, see the config # server 01 mkdir server01

A simple visits tracker built using Go and Redis

visits-tracker A simple visits tracker built using Go and Redis Let me note that this is a really bad idea if you are not backing up Redis since it li

go broker interface,you can use kafka,redis,pulsar etc.

broker go broker interface,you can use kafka,redis,pulsar etc. pulsar in docker run pulsar in docker docker run -dit \ --name pulsar-sever \ -p 6650:

Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

golang client library to Viessmann Vitotrol web service

Package go-vitotrol provides access to the Viessmann™ Vitotrol™ cloud API for controlling/monitoring boilers. See https://www.viessmann.com/app_vitoda

Golang client for NATS, the cloud native messaging system.

NATS - Go Client A Go client for the NATS messaging system. Installation # Go client go get github.com/nats-io/nats.go/ # Server go get github.com/na

RabbitMQ Reconnection client

rmqconn RabbitMQ Reconnection for Golang Wrapper over amqp.Connection and amqp.Dial. Allowing to do a reconnection when the connection is broken befor

Releases(2.6.3)
Owner
Andrey Kuzmin
Andrey Kuzmin
redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue redisqueue provides a producer and consumer of a queue that uses Redis streams. Features A Producer struct to make enqueuing messages easy.

Robin Joseph 96 Dec 29, 2022
Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.

About This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])(https://www.amqp.org) entities like queues, excha

Alex 81 Dec 28, 2022
A tiny wrapper over amqp exchanges and queues 🚌 ✨

Rabbus ?? ✨ A tiny wrapper over amqp exchanges and queues. In memory retries with exponential backoff for sending messages. Protect producer calls wit

Rafael Jesus 95 Dec 18, 2022
Testing message queues with RabbitMQ

Rabbit-MessageQueue Just a repository of RabbitMQ simple usage for queueing messages. You can use this as a sender or a receiver. More information is

Jawady Muhammad Habib 2 Mar 10, 2022
Neutrino Corporation 2 Apr 12, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Ken Hibino 5k Dec 30, 2022
RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue

RapidMQ RapidMQ is a pure, extremely productive, lightweight and reliable library for managing of the local messages queue in the Go programming langu

Vadim Shakun 65 Sep 27, 2022
💨A well crafted go packages that help you build robust, reliable, maintainable microservices.

Hippo A Microservices Toolkit. Hippo is a collection of well crafted go packages that help you build robust, reliable, maintainable microservices. It

Ahmed 141 Aug 11, 2022
GTA(Go Task Async) is a lightweight reliable asynchronous task and transaction message library for Golang

GTA (Go Task Async) is a lightweight and reliable asynchronous task and transaction message library for by golang.

Kevin Su 12 Jun 4, 2022
The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

houseme 5 Oct 20, 2022