A wrapper of streadway/amqp that provides reconnection logic and sane defaults

Overview

go-rabbitmq

Wrapper of streadway/amqp that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful

Supported by Qvault

Deploy

Motivation

Streadway's AMQP library is currently the most robust and well-supported Go client I'm aware of. It's a fantastic option and I recommend starting there and seeing if it fulfills your needs. Their project has made an effort to stay within the scope of the AMQP protocol, as such, no reconnection logic and few ease-of-use abstractions are provided.

The goal with go-rabbitmq is to still provide most all of the nitty-gritty functionality of AMQP, but to make it easier to work with via a higher-level API. Particularly:

  • Automatic reconnection
  • Multithreaded consumers via a handler function
  • Reasonable defaults
  • Flow control handling

⚙️ Installation

Outside of a Go module:

go get github.com/wagslane/go-rabbitmq

🚀 Quick Start Consumer

Default options

consumer, err := rabbitmq.NewConsumer("amqp://user:[email protected]")
if err != nil {
    log.Fatal(err)
}
err = consumer.StartConsuming(
    func(d rabbitmq.Delivery) bool {
        log.Printf("consumed: %v", string(d.Body))
        // true to ACK, false to NACK
        return true
    },
    "my_queue",
    []string{"routing_key1", "routing_key2"}
)
if err != nil {
    log.Fatal(err)
}

With options

consumer, err := rabbitmq.NewConsumer(
    "amqp://user:[email protected]",
    // can pass nothing for no logging
    func(opts *rabbitmq.ConsumerOptions) {
        opts.Logging = true
    },
)
if err != nil {
    log.Fatal(err)
}
err = consumer.StartConsuming(
    func(d rabbitmq.Delivery) bool {
        log.Printf("consumed: %v", string(d.Body))
        // true to ACK, false to NACK
        return true
    },
    "my_queue",
    []string{"routing_key1", "routing_key2"},
    // can pass nothing here for defaults
    func(opts *rabbitmq.ConsumeOptions) {
        opts.QueueDurable = true
        opts.Concurrency = 10
        opts.QOSPrefetch = 100
    },
)
if err != nil {
    log.Fatal(err)
}

🚀 Quick Start Publisher

Default options

publisher, returns, err := rabbitmq.NewPublisher("amqp://user:[email protected]")
if err != nil {
    log.Fatal(err)
}
err = publisher.Publish([]byte("hello, world"), []string{"routing_key"})
if err != nil {
    log.Fatal(err)
}

With options

publisher, returns, err := rabbitmq.NewPublisher(
    "amqp://user:[email protected]",
    // can pass nothing for no logging
    func(opts *rabbitmq.PublisherOptions) {
        opts.Logging = true
    },
)
if err != nil {
    log.Fatal(err)
}
err = publisher.Publish(
    []byte("hello, world"),
    []string{"routing_key"},
    // leave blank for defaults
    func(opts *rabbitmq.PublishOptions) {
        opts.DeliveryMode = rabbitmq.Persistent
        opts.Mandatory = true
        opts.ContentType = "application/json"
    },
)
if err != nil {
    log.Fatal(err)
}

go func() {
    for r := range returns {
        log.Printf("message returned from server: %s", string(r.Body))
    }
}()

💬 Contact

Twitter Follow

Submit an issue (above in the issues tab)

Transient Dependencies

My goal is to keep dependencies limited to 1, github.com/streadway/amqp.

👏 Contributing

I love help! Contribute by forking the repo and opening pull requests. Please ensure that your code passes the existing tests and linting, and write tests to test your changes if applicable.

All pull requests should be submitted to the main branch.

Comments
  • Consumer won't reconnect

    Consumer won't reconnect

    Hi,

    Somehow when network got disconnected for a minutes or so and then network up again, I got a log

    gorabbit: rabbit consumer goroutine closed
    

    After that, consumer stops receiving new message. Why wont it reconnect?

    I'm using version 0.6.2, and my code is (more or less) like this

    rabbitmqConsumer, err := rabbitmq.NewConsumer(config.constructURL(), amqp.Config{}, rabbitmq.WithConsumerOptionsLogging)
    ...
    rabbitmqConsumer.StartConsuming(
      func(message rabbitmq.Delivery) bool {
        return true
      },
      "",
      []string{""},
      func(options *rabbitmq.ConsumeOptions) {
        options.QueueExclusive = true
        options.ConsumerExclusive = true
        options.QueueDurable = true
        options.BindingExchange = &rabbitmq.BindingExchangeOptions{
          Name:    "my_rabbitmq_topic",
          Kind:    "fanout",
          Durable: true,
        }
      },
    )
    
    opened by nicklaros 31
  • Add log level to Printf method of the Logger interface

    Add log level to Printf method of the Logger interface

    Currently it is not possible to define which log-level message has: info or warning or error. Need to extend Printf method with logLevel variable. I can suggest PR.

    It's a breaking change, but don't really see how such feature can be added without breaking change.

    opened by metalrex100 11
  • runs consumer reconnection by any rabbit error

    runs consumer reconnection by any rabbit error

    I think that may be good idea to run consumer reconnection by any error because there is situations when internet connection to rabbit service breaks for a while and after that consumer not working anymore. It might be worth adding some field for set up this behavior to the consumer settings fields, but I'm not sure.

    opened by fortyanov 7
  • Possible data race

    Possible data race

    After updating to the version I get:

      ==================
      WARNING: DATA RACE
      Write at 0x00c000474220 by goroutine 33:
        github.com/wagslane/go-rabbitmq.(*Publisher).startNotifyFlowHandler()
            go/pkg/mod/github.com/wagslane/[email protected]/publish.go:234 +0x70
    
      Previous read at 0x00c000474220 by goroutine 24:
        github.com/wagslane/go-rabbitmq.NewPublisher()
            go/pkg/mod/github.com/wagslane/[email protected]/publish.go:167 +0x3b3
        git.vonroll-infratec.com/go/meas/pkg/queue.NewPublisher()
            ws/go/meas/pkg/queue/rab_pub.go:48 +0x604
        git.vonroll-infratec.com/go/meas/pkg/queue.(*RabbitSuite).SetupTest()
            ws/go/meas/pkg/queue/rabbit_test.go:43 +0x119
        github.com/stretchr/testify/suite.Run.func1()
            go/pkg/mod/github.com/rzajac/[email protected]/suite/suite.go:148 +0x8b4
        testing.tRunner()
            /usr/local/go/src/testing/testing.go:1193 +0x202
    
      Goroutine 33 (running) created at:
        github.com/wagslane/go-rabbitmq.(*Publisher).startNotifyHandlers()
            go/pkg/mod/github.com/wagslane/[email protected]/publish.go:229 +0x164
        github.com/wagslane/go-rabbitmq.NewPublisher.func1()
            go/pkg/mod/github.com/wagslane/[email protected]/publish.go:160 +0x44
    
      Goroutine 24 (running) created at:
        testing.(*T).Run()
            /usr/local/go/src/testing/testing.go:1238 +0x5d7
        github.com/stretchr/testify/suite.runTests()
            go/pkg/mod/github.com/rzajac/[email protected]/suite/suite.go:203 +0xf7
        github.com/stretchr/testify/suite.Run()
            go/pkg/mod/github.com/rzajac/[email protected]/suite/suite.go:176 +0x944
        git.vonroll-infratec.com/go/meas/pkg/queue.TestRabbit()
            ws/go/meas/pkg/queue/rabbit_test.go:19 +0xa4
        testing.tRunner()
            /usr/local/go/src/testing/testing.go:1193 +0x202
      ==================
    

    in my tests.

    opened by rzajac 7
  • Allow consumers to disable requeuing of messages

    Allow consumers to disable requeuing of messages

    Currently, the delivery is either acknowledge or not based on the bool return value of a consumer. But it is also automatically requeued. In some cases I do not want the delivery to be requeued if its body contains for example invalid JSON and it needs to be send to for example a dead letter queue for further introspection.

    I can imagine that a quick solution would be to change the return value of the consumer to something like func(d Delivery) (bool, bool) where the second bool indicates if it should be requeued or not. I can easily create a PR for that behaviour but wanted to check it first.

    opened by robinvdvleuten 5
  • Implementation for graceful shutdown for consumers

    Implementation for graceful shutdown for consumers

    There seems to be missing case while handling SIGINT, SIGTERM signals in the implementation, if the consumers are running in goroutine. One possible implementation can be stop listening to new messages (closing the channel), and provide deadline context to running goroutines.

    Would like to know if there can more elegant way for handling these case, also open for contributing.

    opened by alter123 4
  • How to publish with confirms

    How to publish with confirms

    confirms := make(chan amqp.Confirmation)
    ch.NotifyPublish(confirms)
    go func() {
        for confirm := range confirms {
    	    if confirm.Ack {
    		    // code when messages is confirmed
    		    log.Printf("Confirmed")
    	    } else {
    		    // code when messages is nack-ed
    		    log.Printf("Nacked")
    	    }
        }
    }()
    
    
    opened by ofttryaj 4
  • Add backoff duration option

    Add backoff duration option

    This adds an option to allow specifying the backoff duration.
    Continuously doubling the backoff duration does not seem to be a good service-friendly choice.

    I also thought about adding a "callback" function to allow the user to specify a calculation method of the duration but this looks like not needed overhead. What is your impression?

    opened by 3mb3dw0rk5 4
  • how to support rabbit mq consumer rebalance?

    how to support rabbit mq consumer rebalance?

    I found that mq is very long, then start more consumers don't take effect.

    Then, I restart old consumers, these new consumers begin to consume messages in the very long queue.

    Does rabbitmq support rebalance? And how to support rabbit mq consumer rebalance?

    opened by xj-zh-dev 4
  • Replace logger with a Logger interface to allow for custom loggers.

    Replace logger with a Logger interface to allow for custom loggers.

    Hey, great project! I'm using streadway/amqp a lot and this project basically takes away a lot of pain :)

    In order to test this a bit I needed a custom logger so I swapped the logger struct with a Logger interface. I've also replaced some log.Println() with just log.Printf() and added some logging around Ack and Nack.

    It should be backwards compatible and I even added an example.

    Let me know what you think, I've only tested this in my local setup. Thank you.

    opened by tomarus 4
  • Declare Improvements

    Declare Improvements

    This PR contains breaking changes to the API, but it has some huge improvements:

    • queue, exchange and binding declaration is now simplified and much clearer to use
    • declaration of queues, exchanges and bindings is also possible if the Publisher is used (#43 )
    • updated examples have been added too
    opened by h44z 3
  • AutoReconnect only reconnect the consumer to 1 queue instead of all the queues

    AutoReconnect only reconnect the consumer to 1 queue instead of all the queues

    Sorry I'm new to golang, I just created 1 consumer to consume multiple queues. But when the connection dropped, and the reconnect mechanism started, it successfully reconnect but only to 1 queue. Below is my code for POC

    // main.go
    package main
    
    import (
    	"log"
    	"strings"
    	"sync"
    
    	. "poc-reconnect/worker/east"
    	. "poc-reconnect/worker/west"
    
    	rabbitmq "github.com/wagslane/go-rabbitmq"
    )
    
    var workerList = make(map[string]interface {
    	Execute(wg *sync.WaitGroup, concurrent int)
    })
    
    func main() {
    	amqpAddress := "amqp://guest:[email protected]:5672/poc"
    
    	// initialize consumer
    	consumer, err := rabbitmq.NewConsumer(
    		amqpAddress, rabbitmq.Config{},
    	)
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer consumer.Close()
    
    	// Register the workers
    	workerList["west"] = West{Consumer: consumer}
    	workerList["east"] = East{Consumer: consumer}
    
    	// worker concurrent configuration
    	workerConcurrent := map[string]int{
    		"west": 2,
    		"east": 2,
    	}
    
    	wg := sync.WaitGroup{}
    
    	// run all worker
    	for worker, thread := range workerConcurrent {
    		wg.Add(1)
    		go workerList[strings.ToLower(worker)].Execute(&wg, thread)
    	}
    	wg.Wait()
    }
    
    // worker/east/east.go
    package east
    
    import (
    	"log"
    	"sync"
    
    	rabbitmq "github.com/wagslane/go-rabbitmq"
    )
    
    type East struct {
    	Consumer rabbitmq.Consumer
    }
    
    const work_queue = "east_queue"
    
    func (e East) Execute(wg *sync.WaitGroup, concurrent int) {
    	defer wg.Done()
    
    	forever := make(chan bool)
    	// Subscribing to the queue
    	err := e.Consumer.StartConsuming(
    		func(d rabbitmq.Delivery) rabbitmq.Action {
    			log.Printf("Received message from %s with content: %s\n", work_queue, d.Body)
    			// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
    			return rabbitmq.Ack
    		},
    		work_queue,
    		[]string{work_queue},
    		rabbitmq.WithConsumeOptionsConcurrency(concurrent),
    		rabbitmq.WithConsumeOptionsQueueDurable,
    		rabbitmq.WithConsumeOptionsBindingExchangeName("exchange"),
    		rabbitmq.WithConsumeOptionsBindingExchangeKind("direct"),
    		rabbitmq.WithConsumeOptionsBindingExchangeDurable,
    	)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	log.Printf("Subscribing to %s for getting messages on %d goroutines\n", work_queue, concurrent)
    
    	<-forever
    }
    
    // worker/west/west.go
    package west
    
    import (
    	"log"
    	"sync"
    
    	rabbitmq "github.com/wagslane/go-rabbitmq"
    )
    
    type West struct {
    	Consumer rabbitmq.Consumer
    }
    
    const work_queue = "west_queue"
    
    func (w West) Execute(wg *sync.WaitGroup, concurrent int) {
    	defer wg.Done()
    
    	forever := make(chan bool)
    	// Subscribing to the queue
    	err := w.Consumer.StartConsuming(
    		func(d rabbitmq.Delivery) rabbitmq.Action {
    			log.Printf("Received message from %s with content: %s\n", work_queue, d.Body)
    			// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
    			return rabbitmq.Ack
    		},
    		work_queue,
    		[]string{work_queue},
    		rabbitmq.WithConsumeOptionsConcurrency(concurrent),
    		rabbitmq.WithConsumeOptionsQueueDurable,
    		rabbitmq.WithConsumeOptionsBindingExchangeName("exchange"),
    		rabbitmq.WithConsumeOptionsBindingExchangeKind("direct"),
    		rabbitmq.WithConsumeOptionsBindingExchangeDurable,
    	)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	log.Printf("Subscribing to %s for getting messages on %d goroutines\n", work_queue, concurrent)
    
    	<-forever
    }
    

    On the first run, the consumer correctly initialized image image

    But after I force close the connection through the RabbitMQ UI to trigger the reconnect mechanism, it only reconnect to 1 queue image image

    Is this bug or did I do something wrong in my code?

    Thanks

    opened by FR19 0
  • Truncated messages

    Truncated messages

    Hello, Sometimes I receive truncated message. {"position":{"latitude":0.0,"longitude":0.0,"altitude":0.0},"device":{"id":1,"name":"1111","phone":nu It supposed to be like this: {"position":{"latitude":0.0,"longitude":0.0,"altitude":0.0},"device":{"id":1,"name":"1111","phone":null}}

    And sometimes I receive message with extra string too. {"position":{"latitude":0.0,"longitude":0.0,"altitude":0.0},"device":{"id":1,"name":"1111","phone":null}}1111","phone":null}}

    How to solve this issue?

    opened by aditya-achmad 0
  • when server side been maintained, the client will not reconnect success after server side start.

    when server side been maintained, the client will not reconnect success after server side start.

    how to handle when server side been maintained, the client will not reconnect success after server side start.

    2022/08/09 03:07:15 gorabbit ERROR: attempting to reconnect to amqp server after close with error: Exception (501) Reason: "read tcp IP:54358->IP:5075: i/o timeout"
    2022/08/09 03:07:15 gorabbit INFO: waiting 1s seconds to attempt to reconnect to amqp server
    2022/08/09 03:07:16 gorabbit WARN: successfully reconnected to amqp server
    2022/08/09 03:07:16 gorabbit INFO: successful recovery from: Exception (501) Reason: "read tcp IP:54358->IP:5075: i/o timeout"
    2022/08/09 03:07:16 gorabbit INFO: Processing messages on 10 goroutines
    2022/08/10 13:03:52 gorabbit INFO: closing consumer...
    2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
    2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
    2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
    2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
    2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
    2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
    2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
    2022/08/10 13:03:52 gorabbit INFO: amqp channel closed gracefully
    2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
    2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
    2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
    
    opened by busyfree 0
  • Error handling on channel closing while reconnecting.

    Error handling on channel closing while reconnecting.

    Hello, probably here error handling missed.

    It looks like even if it needs to be ignored here it would be good to add some logging in case of error. It is the wrong behavior of Close and the developer needs to know that something not good happened.

    opened by ldmi3i 0
  • Declare process for Publisher | Part 2

    Declare process for Publisher | Part 2

    Split #80 into two parts. This part contains:

    • declaration of queues, exchanges and bindings is also possible if the Publisher is used (#43 )
    • updated examples
    opened by h44z 0
Owner
Lane Wagner
I like Go and Rust, and I tolerate JavaScript and Python. @wagslane on twitter.
Lane Wagner
RabbitMQ Reconnection client

rmqconn RabbitMQ Reconnection for Golang Wrapper over amqp.Connection and amqp.Dial. Allowing to do a reconnection when the connection is broken befor

Babiv Sergey 19 May 26, 2022
A tiny wrapper over amqp exchanges and queues 🚌 ✨

Rabbus ?? ✨ A tiny wrapper over amqp exchanges and queues. In memory retries with exponential backoff for sending messages. Protect producer calls wit

Rafael Jesus 93 May 29, 2022
Golang AMQP wrapper for RabbitMQ with better API

go-rabbitmq Golang AMQP wrapper for RabbitMQ with better API Table of Contents Background Features Usage Installation Connect to RabbitMQ Declare Queu

Hadi Hidayat Hammurabi 7 Sep 6, 2022
Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.

About This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])(https://www.amqp.org) entities like queues, excha

Alex 77 Sep 27, 2022
Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.

Cadence Visit cadenceworkflow.io to learn about Cadence. This repo contains the source code of the Cadence server. To implement workflows, activities

Uber Open Source 6.3k Sep 24, 2022
Go library to build event driven applications easily using AMQP as a backend.

event Go library to build event driven applications easily using AMQP as a backend. Publisher package main import ( "github.com/creekorful/event" "

Aloïs Micard 0 Dec 5, 2021
golang amqp rabbitmq produce consume

Step 1: Container Run Container docker run -itp 9001:9001 --name go_temp -v /usr/local/project/temp/go_amqp/:/home/ -d golang:1.16.6 Enter Container

null 0 Nov 26, 2021
Tool for collect statistics from AMQP (RabbitMQ) broker. Good for cloud native service calculation.

amqp-statisticator Tool for collect statistics around your AMQP broker. For example RabbitMQ expose a lot information trought the management API, but

Jan Seidl 0 Dec 13, 2021
Gorabbit - Simple library for AMQP Rabbit MQ publish subscribe

gorabbit Rabbit MQ Publish & Subscribe Simple library for AMQP Rabbit MQ publish

Pande Putu Widya Oktapratama 1 Jan 7, 2022
A tiny wrapper around NSQ topic and channel :rocket:

Event Bus NSQ A tiny wrapper around go-nsq topic and channel. Protect nsq calls with gobreaker. Installation go get -u github.com/rafaeljesus/nsq-even

Rafael Jesus 75 Sep 27, 2022
goczmq is a golang wrapper for CZMQ.

goczmq Introduction A golang interface to the CZMQ v4.2 API. Install Dependencies libsodium libzmq czmq For CZMQ master go get github.com/zeromq/goczm

The ZeroMQ project 521 Sep 21, 2022
Golang API wrapper for MangaDex v5's MVP API.

mangodex Golang API wrapper for MangaDex v5's MVP API. Full documentation is found here. This API is still in Open Beta, so testing may not be complet

null 33 Aug 7, 2022
High abstraction wrapper for Golang Rabbit MQ Client

GRMQ Go Rabbit MQ What are the typical use-cases of RabbitMQ broker ? We create

null 5 Jan 12, 2022
redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue redisqueue provides a producer and consumer of a queue that uses Redis streams. Features A Producer struct to make enqueuing messages easy.

Robin Joseph 87 Sep 27, 2022
Uniqush is a free and open source software system which provides a unified push service for server side notification to apps on mobile devices.

Homepage Download Blog/News @uniqush Introduction Uniqush (\ˈyü-nə-ku̇sh\ "uni" pronounced as in "unified", and "qush" pronounced as in "cushion") is

Uniqush 1.4k Sep 20, 2022
Package notify provides an implementation of the Gnome DBus Notifications Specification.

go-notify Package notify provides an implementation of the Gnome DBus Notifications Specification. Examples Display a simple notification. ntf := noti

null 62 Sep 27, 2022
⚡️ A lightweight service that will build and store your go projects binaries, Integrated with Github, Gitlab, Bitbucket and Bitbucket Server.

Rabbit A lightweight service that will build and store your go projects binaries. Rabbit is a lightweight service that will build and store your go pr

Ahmed 196 Aug 11, 2022
The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

houseme 4 Jun 25, 2022
⚡ HTTP/2 Apple Push Notification Service (APNs) push provider for Go — Send push notifications to iOS, tvOS, Safari and OSX apps, using the APNs HTTP/2 protocol.

APNS/2 APNS/2 is a go package designed for simple, flexible and fast Apple Push Notifications on iOS, OSX and Safari using the new HTTP/2 Push provide

Adam Jones 2.7k Sep 22, 2022