Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.


This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])( entities like queues, exchanges, producers and consumers in a declarative way with a single config.

Exchanges, queues and producers are going to be initialized in the background.

go-mq supports both sync and async producers.

go-mq has auto reconnects on closed connection or network error. You can configure delay between each connect try using reconnect_delay option.


go get -u


Visit godoc to get information about library API.

For those of us who preferred learn something new on practice there is working examples in example directory.


You can configure mq using mq.Config struct directly or by filling it from config file.

Supported configuration tags:

  • yaml
  • json
  • mapstructure

Available options:

dsn: "amqp://login:password@host:port/virtual_host"
reconnect_delay: 5s                     # Interval between connection tries. Check for details.
test_mode: false                        # Switches library to use mocked broker. Defaults to false.
  - name: "exchange_name"
    type: "direct"
      # Available options with default values:
      auto_delete: false
      durable: false
      internal: false
      no_wait: false
  - name: "queue_name"
    exchange: "exchange_name"
    routing_key: "route"
    # A set of arguments for the binding.
    # The syntax and semantics of these arguments depend on the exchange class.
      no_wait: false
    # Available options with default values:
      auto_delete: false
      durable: false
      exclusive: false
      no_wait: false
  - name: "producer_name"
    buffer_size: 10                      # Declare how many messages we can buffer during fat messages publishing.
    exchange: "exchange_name"
    routing_key: "route"
    sync: false                          # Specify whether producer will worked in sync or async mode.
    # Available options with default values:
      content_type:  "application/json"
      delivery_mode: 2                   # 1 - non persistent, 2 - persistent.
  - name: "consumer_name"
    queue: "queue_name"
    workers: 1                           # Workers count. Defaults to 1.
    prefetch_count: 0                    # Prefetch message count per worker.
    prefetch_size: 0                     # Prefetch message size per worker.
    # Available options with default values:
      no_ack: false
      no_local: false
      no_wait: false
      exclusive: false

Error handling

All errors are accessible via exported channel:

package main

import (


func main() {
	config := mq.Config{} // Set your configuration.
	queue, _ := mq.New(config)
	// ...

	go handleMQErrors(queue.Error())
	// Other logic.

func handleMQErrors(errors <-chan error) {
	for err := range errors {

If channel is full – new errors will be dropped.

Errors from sync producer won't be accessible from error channel because they returned directly.


There are some cases that can only be tested with real broker and some cases that can only be tested with mocked broker.

If you are able to run tests with a real broker run them with:

go test -mock-broker=0

Otherwise mock will be used.


Check releases page.

How to upgrade

From version 0.x to 1.x

  • GetConsumer() method was renamed to Consumer(). This is done to follow go guideline.

  • GetProducer() method was removed. Use instead AsyncProducer() or SyncProducer() if you want to catch net error by yourself.


Feel free to create issues with bug reports or your wishes.

  • Priority


    If I add information about the queue priority to the config file "options": { "durable": true, "args": { "x-max-priority": 9 } }

    In the console I get error panic: interface conversion: interface {} is map[string]interface {}, not map[interface {}]interface {}

    How will write priority in config file?

    opened by joker63 6
  • Enable Mocking/Testing  by exporting brokerIsMocked var ?

    Enable Mocking/Testing by exporting brokerIsMocked var ?

    Current Problem: I'd like to use the go-mq library within a service to connect to a MQ - to test that the service works fine, I want to use the a fake server.

    As I was going through the code, it seems that the variable "brokerIsMocked" must be set to true, to allow connection to faked server. From another package I can't do this - obviously.

    Am I missing something? Is there a another way to connect to a fake server? Like a flag with the mq.Config?

    Kind regards !

    PS: Nice work with the lib - really like it.

    opened by snemitz 5
  • ConnectionState understanding

    ConnectionState understanding

    There are cases when you need to understand what's going on with your MQ connection. In my case: i need to understand, that there's no problem with mq connection from my app side and make signal if there are problems with that.

    There's no method in current version, that allows you to monitor connection state. Of course we should balance between opening everything and nothing. That's why i added few constants to show connection state.

    You may point me, that there's errors channel opened outside MQ.Error(). But here's the question: how to understand that disconnect / connection problems are not an issues anymore?

    opened by doranych 4
  • Unexpected consumer behavior

    Unexpected consumer behavior

    Consumers consume messages without explicit call of Consumer.Consume method.

    i.e. we have 2 binaries one with producer, another one with consumer, based on the same code and same config.

    1. run producer (without consuming)
    2. run consumer
    3. produce several messaged to amqp server
    4. consumer consumes several messages
    5. stop producer
    6. consumer gets messages from nowhere (ofc we produced them before, they just stacked in non existent consumer in producer binary)

    In my case, Rabbit shows us 2 consumers.

    Following this messages are not suppose to be consumed without real Consumer.Consume call

    If you may guide where should i dig, i might be able to fix this

    opened by doranych 4
  • Cluster connection

    Cluster connection


    Create cluster connection For backward compatibility it's possible to use list of cluster nodes in single string separated by comma

    dsn:  amqp://login:password@host1:port/virtual_host,amqp://login:password@host2:port/virtual_host,amqp://login:password@host3:port/virtual_host

    Same scheme is typically used to describe kafka cluster connection


    You always have rabbit cluster even if there's only one node in it. Right? I think it's possible to iterate over cluster nodes before re-connection to the same node

    opened by doranych 3
  • Read all messages in a queue

    Read all messages in a queue

    Is there an established method to grab all the messages in a certain queue? I don't want to reinvent the wheel, and I wasn't sure where else to ask. Thanks

    opened by t56k 3
  • Test for reconnection are randomly breaking

    Test for reconnection are randomly breaking

    STR Run: go test -race -mock-broker=1

    Expected result:

    ok	2.318s

    Actual result:

    --- FAIL: TestMq_Reconnect (0.32s)
    	mq_test.go:294: Consumer did not read messages. Produced 2, read 0

    Notes: Test works only with mocked broker. Can't reproduce bug with enabled debugger or with -run=Reconnect flag.

    bug help wanted 
    opened by cheshir 3
  • Extra arguments for a queue

    Extra arguments for a queue

    How to pass extra arguments for a queue?

    I tried the following configuration without success:

        - name: "uh.traffic.limit_workers"
          exchange: ""
          routing_key: "namespaces_activity"
            durable: true
              'x-max-priority': 9
    opened by gustavosbarreto 1
