Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.

Overview

Build Status codecov Go Report Card Sourcegraph HitCount GoDoc License

About

This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])(https://www.amqp.org) entities like queues, exchanges, producers and consumers in a declarative way with a single config.

Exchanges, queues and producers are going to be initialized in the background.

go-mq supports both sync and async producers.

go-mq has auto reconnects on closed connection or network error. You can configure delay between each connect try using reconnect_delay option.

Install

go get -u github.com/cheshir/go-mq

API

Visit godoc to get information about library API.

For those of us who preferred learn something new on practice there is working examples in example directory.

Configuration

You can configure mq using mq.Config struct directly or by filling it from config file.

Supported configuration tags:

  • yaml
  • json
  • mapstructure

Available options:

dsn: "amqp://login:password@host:port/virtual_host"
reconnect_delay: 5s                     # Interval between connection tries. Check https://golang.org/pkg/time/#ParseDuration for details.
test_mode: false                        # Switches library to use mocked broker. Defaults to false.
exchanges:
  - name: "exchange_name"
    type: "direct"
    options:
      # Available options with default values:
      auto_delete: false
      durable: false
      internal: false
      no_wait: false
queues:
  - name: "queue_name"
    exchange: "exchange_name"
    routing_key: "route"
    # A set of arguments for the binding.
    # The syntax and semantics of these arguments depend on the exchange class.
    binding_options:
      no_wait: false
    # Available options with default values:
    options:
      auto_delete: false
      durable: false
      exclusive: false
      no_wait: false
producers:
  - name: "producer_name"
    buffer_size: 10                      # Declare how many messages we can buffer during fat messages publishing.
    exchange: "exchange_name"
    routing_key: "route"
    sync: false                          # Specify whether producer will worked in sync or async mode.
    # Available options with default values:
    options:
      content_type:  "application/json"
      delivery_mode: 2                   # 1 - non persistent, 2 - persistent.
consumers:
  - name: "consumer_name"
    queue: "queue_name"
    workers: 1                           # Workers count. Defaults to 1.
    prefetch_count: 0                    # Prefetch message count per worker.
    prefetch_size: 0                     # Prefetch message size per worker.
    # Available options with default values:
    options:
      no_ack: false
      no_local: false
      no_wait: false
      exclusive: false

Error handling

All errors are accessible via exported channel:

package main

import (
	"log"

	"github.com/cheshir/go-mq"
)

func main() {
	config := mq.Config{} // Set your configuration.
	queue, _ := mq.New(config)
	// ...

	go handleMQErrors(queue.Error())
	
	// Other logic.
}

func handleMQErrors(errors <-chan error) {
	for err := range errors {
		log.Println(err)
	}
}

If channel is full – new errors will be dropped.

Errors from sync producer won't be accessible from error channel because they returned directly.

Tests

There are some cases that can only be tested with real broker and some cases that can only be tested with mocked broker.

If you are able to run tests with a real broker run them with:

go test -mock-broker=0

Otherwise mock will be used.

Changelog

Check releases page.

How to upgrade

From version 0.x to 1.x

  • GetConsumer() method was renamed to Consumer(). This is done to follow go guideline.

  • GetProducer() method was removed. Use instead AsyncProducer() or SyncProducer() if you want to catch net error by yourself.

Epilogue

Feel free to create issues with bug reports or your wishes.

Comments
  • Priority

    Priority

    If I add information about the queue priority to the config file "options": { "durable": true, "args": { "x-max-priority": 9 } }

    In the console I get error panic: interface conversion: interface {} is map[string]interface {}, not map[interface {}]interface {}

    How will write priority in config file?

    bug 
    opened by joker63 6
  • Enable Mocking/Testing  by exporting brokerIsMocked var ?

    Enable Mocking/Testing by exporting brokerIsMocked var ?

    Current Problem: I'd like to use the go-mq library within a service to connect to a MQ - to test that the service works fine, I want to use the a fake server.

    As I was going through the code, it seems that the variable "brokerIsMocked" must be set to true, to allow connection to faked server. From another package I can't do this - obviously.

    Am I missing something? Is there a another way to connect to a fake server? Like a flag with the mq.Config?

    Kind regards !

    PS: Nice work with the lib - really like it.

    opened by snemitz 5
  • ConnectionState understanding

    ConnectionState understanding

    There are cases when you need to understand what's going on with your MQ connection. In my case: i need to understand, that there's no problem with mq connection from my app side and make signal if there are problems with that.

    There's no method in current version, that allows you to monitor connection state. Of course we should balance between opening everything and nothing. That's why i added few constants to show connection state.

    You may point me, that there's errors channel opened outside MQ.Error(). But here's the question: how to understand that disconnect / connection problems are not an issues anymore?

    opened by doranych 4
  • Unexpected consumer behavior

    Unexpected consumer behavior

    Consumers consume messages without explicit call of Consumer.Consume method.

    i.e. we have 2 binaries one with producer, another one with consumer, based on the same code and same config.

    1. run producer (without consuming)
    2. run consumer
    3. produce several messaged to amqp server
    4. consumer consumes several messages
    5. stop producer
    6. consumer gets messages from nowhere (ofc we produced them before, they just stacked in non existent consumer in producer binary)

    In my case, Rabbit shows us 2 consumers.

    Following this messages are not suppose to be consumed without real Consumer.Consume call

    If you may guide where should i dig, i might be able to fix this

    opened by doranych 4
  • Cluster connection

    Cluster connection

    Suggestion

    Create cluster connection For backward compatibility it's possible to use list of cluster nodes in single string separated by comma

    dsn:  amqp://login:password@host1:port/virtual_host,amqp://login:password@host2:port/virtual_host,amqp://login:password@host3:port/virtual_host
    

    Same scheme is typically used to describe kafka cluster connection

    Discuss

    You always have rabbit cluster even if there's only one node in it. Right? I think it's possible to iterate over cluster nodes before re-connection to the same node

    opened by doranych 3
  • Read all messages in a queue

    Read all messages in a queue

    Is there an established method to grab all the messages in a certain queue? I don't want to reinvent the wheel, and I wasn't sure where else to ask. Thanks

    question 
    opened by t56k 3
  • Test for reconnection are randomly breaking

    Test for reconnection are randomly breaking

    STR Run: go test -race -mock-broker=1

    Expected result:

    PASS
    ok  	github.com/cheshir/go-mq	2.318s
    

    Actual result:

    --- FAIL: TestMq_Reconnect (0.32s)
    	mq_test.go:294: Consumer did not read messages. Produced 2, read 0
    

    Notes: Test works only with mocked broker. Can't reproduce bug with enabled debugger or with -run=Reconnect flag.

    bug help wanted 
    opened by cheshir 3
  • Extra arguments for a queue

    Extra arguments for a queue

    How to pass extra arguments for a queue?

    I tried the following configuration without success:

    queues:
        - name: "uh.traffic.limit_workers"
          exchange: "uh.api.events"
          routing_key: "namespaces_activity"
          options:
            durable: true
            args:
              'x-max-priority': 9
    
    opened by gustavosbarreto 1
Releases(v2.0.0)
Owner
Alex
In love with go.
Alex
An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`

Go RabbitMQ Client Library This is a Go AMQP 0.9.1 client maintained by the RabbitMQ core team. It was originally developed by Sean Treadway. Differen

RabbitMQ 671 Jan 1, 2023
A tiny wrapper over amqp exchanges and queues 🚌 ✨

Rabbus ?? ✨ A tiny wrapper over amqp exchanges and queues. In memory retries with exponential backoff for sending messages. Protect producer calls wit

Rafael Jesus 95 Dec 18, 2022
Testing message queues with RabbitMQ

Rabbit-MessageQueue Just a repository of RabbitMQ simple usage for queueing messages. You can use this as a sender or a receiver. More information is

Jawady Muhammad Habib 2 Mar 10, 2022
Golang AMQP wrapper for RabbitMQ with better API

go-rabbitmq Golang AMQP wrapper for RabbitMQ with better API Table of Contents Background Features Usage Installation Connect to RabbitMQ Declare Queu

Hadi Hidayat Hammurabi 9 Dec 21, 2022
golang amqp rabbitmq produce consume

Step 1: Container Run Container docker run -itp 9001:9001 --name go_temp -v /usr/local/project/temp/go_amqp/:/home/ -d golang:1.16.6 Enter Container

null 0 Nov 26, 2021
Tool for collect statistics from AMQP (RabbitMQ) broker. Good for cloud native service calculation.

amqp-statisticator Tool for collect statistics around your AMQP broker. For example RabbitMQ expose a lot information trought the management API, but

Jan Seidl 0 Dec 13, 2021
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

CloudHut 2.9k Jan 3, 2023
Go client to reliable queues based on Redis Cluster Streams

Ami Go client to reliable queues based on Redis Cluster Streams. Consume/produce performance Performance is dependent from: Redis Cluster nodes count;

Andrey Kuzmin 26 Dec 12, 2022
:incoming_envelope: A fast Message/Event Hub using publish/subscribe pattern with support for topics like* rabbitMQ exchanges for Go applications

Hub ?? A fast enough Event Hub for go applications using publish/subscribe with support patterns on topics like rabbitMQ exchanges. Table of Contents

Leandro Lugaresi 125 Dec 17, 2022
A wrapper of streadway/amqp that provides reconnection logic and sane defaults

go-rabbitmq Wrapper of streadway/amqp that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐ Supporte

Lane Wagner 432 Dec 28, 2022
Go library to build event driven applications easily using AMQP as a backend.

event Go library to build event driven applications easily using AMQP as a backend. Publisher package main import ( "github.com/creekorful/event" "

Aloïs Micard 0 Dec 5, 2021
Gorabbit - Simple library for AMQP Rabbit MQ publish subscribe

gorabbit Rabbit MQ Publish & Subscribe Simple library for AMQP Rabbit MQ publish

Pande Putu Widya Oktapratama 2 Oct 4, 2022
Declarative streaming ETL for mundane tasks, written in Go

Benthos is a high performance and resilient stream processor, able to connect various sources and sinks in a range of brokering patterns and perform h

Ashley Jeffs 5.5k Dec 28, 2022
Abstraction layer for simple rabbitMQ connection, messaging and administration

Jazz Abstraction layer for quick and simple rabbitMQ connection, messaging and administration. Inspired by Jazz Jackrabbit and his eternal hatred towa

SOCIFI Ltd. 17 Dec 12, 2022
RabbitMQ wire tap and swiss army knife

rabtap - RabbitMQ wire tap Swiss army knife for RabbitMQ. Tap/Pub/Sub messages, create/delete/bind queues and exchanges, inspect broker. Contents Feat

null 231 Dec 28, 2022
🚀 Golang, Go Fiber, RabbitMQ, MongoDB, Docker, Kubernetes, GitHub Actions and Digital Ocean

Bookings Solução de cadastro de usuários e reservas. Tecnologias Utilizadas Golang MongoDB RabbitMQ Github Actions Docker Hub Docker Kubernetes Digita

Jailton Junior 7 Feb 18, 2022
RabbitMQ Reconnection client

rmqconn RabbitMQ Reconnection for Golang Wrapper over amqp.Connection and amqp.Dial. Allowing to do a reconnection when the connection is broken befor

Babiv Sergey 20 Sep 27, 2022
An easy-to-use CLI client for RabbitMQ.

buneary, pronounced bun-ear-y, is an easy-to-use RabbitMQ command line client for managing exchanges, managing queues and publishing messages to exchanges.

Dominik Braun 56 Sep 3, 2022
A user friendly RabbitMQ library written in Golang.

TurboCookedRabbit A user friendly RabbitMQ library written in Golang to help use streadway/amqp. Based on my work found at CookedRabbit. Work Recently

Tristan (HouseCat) Hyams 103 Jan 6, 2023