A lightweight transactional message bus on top of RabbitMQ

Overview

CircleCI Go Report Card Coverage Status GitHub release

grabbit

A lightweight transactional message bus on top of RabbitMQ supporting:

  1. Supported Messaging Styles
    • One Way (Fire and forget)
    • Publish/Subscribe
    • Aync Command/Reply
    • Blocking Command/Reply (RPC)
  2. Transactional message processing
  3. Message Orchestration via the Saga pattern
  4. At least once reliable messaging via Transaction Outbox and Publisher Confirms
  5. Retry and backoffs
  6. Structured logging
  7. Reporting Metrics via Prometheus
  8. Distributed Tracing via OpenTracing
  9. Extensible serialization with default support for gob, protobuf and avro

Stable release

the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.

Supported transactional resources

  1. MySql > 8.0 (InnoDB)

Basic Usage

  • For a complete sample application see the vacation booking sample app in the examples directory

The following outlines the basic usage of grabbit. For a complete view of how you would use grabbit including how to write saga's and handle deadlettering refer to grabbit/tests package

import (
  "github.com/wework/grabbit/gbus"
  "github.com/wework/grabbit/gbus/builder"
)

Define a message

type SomeMessage struct {}

func(SomeMessage) SchemaName() string{
   return "some.unique.namespace.somemessage"
}

Creating a transactional GBus instance

gb := builder.
        New().
    Bus("connection string to RabbitMQ").
    Txnl("mysql", "connection string to mysql").
    WithConfirms().
    Build("name of your service")

Register a command handler

handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error{
    cmd, ok := message.Payload.(*SomeCommand)
    if ok {
      fmt.Printf("handler invoked with  message %v", cmd)
            return nil
    }

        return fmt.Errorf("failed to handle message")
  }

gb.HandleMessage(SomeCommand{}, handler)

Register an event handler

eventHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) {
    evt, ok := message.Payload.(*SomeEvent)
    if ok {
      fmt.Printf("handler invoked with event %v", evt)
            return nil
    }

        return fmt.Errorf("failed to handle event")
  }

gb.HandleEvent("name of exchange", "name of topic", SomeEvent{}, eventHandler)

Start the bus

gb.Start()
defer gb.Shutdown()

Send a command

gb.Send(context.Background(), "name of service you are sending the command to", gbus.NewBusMessage(SomeCommand{}))

Publish an event

gb.Publish(context.Background(), "name of exchange", "name of topic", gbus.NewBusMessage(SomeEvent{}))

RPC style call

request := gbus.NewBusMessage(SomeRPCRequest{})
reply := gbus.NewBusMessage(SomeRPCReply{})
timeOut := 2 * time.Second

reply, e := gb.RPC(context.Background(), "name of service you are sending the request to", request, reply, timeOut)

if e != nil{
  fmt.Printf("rpc call failed with error %v", e)
} else{
  fmt.Printf("rpc call returned with reply %v", reply)
}

Testing

  1. ensure that you have the dependencies installed: go get -v -t -d ./...
  2. make sure to first: docker-compose up -V -d
  3. then to run the tests: go test ./...
Comments
  • Optimistic locking violation when saving saga instances should retry handler until success

    Optimistic locking violation when saving saga instances should retry handler until success

    When a saga instance gets invoked and persisted to the saga store an optimistic locking error may occur due to concurrent invocations. Currently when this happens the general retry strategy retries the message until success or until the set max retry count is exceeded and the message gets rejected.

    In highly concurrent saga's this may cause unneeded operational efforts due to failed messages. In case of concurrency violations we should retry the message until success.

    bug 
    opened by rhinof 7
  • first commit towards supporting persistent timeouts

    first commit towards supporting persistent timeouts

    Timeouts now are not passed as bus messages to avoid the need to deal with serialization. Timeout interface changed so that handling a timeout now accepts a transaction and a bus instance instead of an invocation and a bus message #32

    opened by rhinof 3
  • added metrics for transactional outbox

    added metrics for transactional outbox

    The following metrics were added outbox_total_records: reports the total amount of records currently in the outbox outbox_pending_delivery: reports the total amount of records pending delivery currently in the outbox outbox_pending_removal: reports the total amount of records that were sent and pending removal currently in the outbox

    #36

    opened by rhinof 2
  • Add generic handler metrics with the message type as a label

    Add generic handler metrics with the message type as a label

    Is your feature request related to a problem? Please describe. The handler metrics that grabbit exposes are per handler which makes creating generic dashboards difficult.

    Describe the solution you'd like I'd like generic metrics with the message type as a label, for the result counters the result would be a label as well

    enhancement 
    opened by danielwitz 2
  • Allow configuring max retries

    Allow configuring max retries

    retries

    Provide a way to externally configure the maximum amount of retries handlers get executed before declaring the message as poison and rejecting it

    Closes #35

    opened by avigailberger 2
  • migrations fail when building a Bus for test db

    migrations fail when building a Bus for test db

    Describe the bug Getting an error when building a Bus - in the migrations part, test db ONLY: migrator: error while running migrations: error executing golang migration: Error 1060: Duplicate column name 'started_by_request_of_svc'. This error happens in sagaStoreAddSagaCreatorDetails, and causes the test to run forever (doesn't even panic). Note: when building a bus for the app itself - no error there. The errors occur in circleCI too. Table when running test: grabbit_testinvoicematching_sagas. Table when running app: grabbit_invoicematchingweb_sagas.

    If I delete the test DB entirely between tests - the migration works for the first time we build the bus. If I don't delete the test db between tests - I get that error in the second test onwards...

    To Reproduce Steps to reproduce the behavior:

    1. Build a test db twice...
    2. Or: simply run 2 tests that use the bus builder in IM.
    opened by avigailberger 1
  • 👌 feat(message:deduplication) implementing the feature #33

    👌 feat(message:deduplication) implementing the feature #33

    A way to manage message de-duplication in grabbit.

    Implementation details:

    • Each instance would set its own policy, it can be the default - None, reject duplicates - Reject and ack duplicates - Ack
    • Each service can set a duration to store each message in the duplicates table
    • Once a message is received from grabbit we check if the message-id is in our table
    • If it's in the table we invoke the policy
    • If it's not in the table we add the message, using our internal transaction, to be stored in the db
    • Once the processing is complete we and the internal tx is committed the message-id is also committed into the DB.
    • In the background we have an additional scheduler running which deletes messages that are older then what is set in the policy (currently hardcoded every minute to reduce the number of messages deleted in iteration to a minute)

    This PR closes issue #33

    enhancement 
    opened by vladshub 3
  • Allow saga's to override correlation logic.

    Allow saga's to override correlation logic.

    By default, grabbit correlates a message to the correct saga instance when a handler replies to an incoming message. There are many cases in which you would like to interact with a saga instance and send it messages not in the context of replying to a specific message originating from the saga.

    This is an improvement and should provide a better API than the one developed for #172

    enhancement 
    opened by rhinof 1
  • Fetching saga instances from the saga store should be optimized

    Fetching saga instances from the saga store should be optimized

    The current implementation of the mysql saga store, when fetching saga instances by sage type, fetches all saved instances in one round trip which may cause a performance issue when there are large amounts of saga instances that need to be fetched.

    In order to prevent a potential performance hit the fetching logic should be optimized to include paging and potentially parallel the fetching using a set of goroutines

    tech debt 
    opened by rhinof 0
  • Invoking the RPC interface from a handler fails

    Invoking the RPC interface from a handler fails

    opened by rhinof 0
Releases(v1.1.8)
  • v1.1.8(Dec 2, 2019)

    Point release containing additional functionality and bug fixes

    Improvements

    • Added metric that records the execution time of the entire saga, from it's creation until deletion #224

    Fixed Issues

    • Headers get placed on messages even if no value for the header is set #221
    Source code(tar.gz)
    Source code(zip)
  • v1.1.7(Nov 2, 2019)

  • v1.1.6(Oct 20, 2019)

    Point release containing additional functionality and bug fixes

    Improvements

    • Transactional outbox configuration #37
    • Transactional outbox metrics #202
    • The ability for producers to specify an idempotency key on the BusMessage. #106

    Fixed Issues

    • Messages get re-queued When GlobalRawMessageHandler returns an error #188
    • AMQP channel shuts down when the GlobalRawMessageHandler returns an error #187
    • Wrong started by saga identifier value was set when creating a new saga #194
    • When a saga instance is not found in the saga store do not reject the message #196
    • The target service was not resolved properly in SagaInvocation #195
    • Saga store log entries do not contain the _service field #206
    • Logger is lost at initialization #200
    • Outbox stops trying sending a message after 50 attempts #203
    • Handlers not resolved properly for messages returned from dead lettering #191
    • migration entry with incorrect name #212
    • Not all contextual data was logged when invoking handlers #217
    Source code(tar.gz)
    Source code(zip)
  • v1.1.5(Sep 29, 2019)

  • v1.1.4(Sep 28, 2019)

    point release with some few Improvements and fixies

    Improvements

    • Improved documentation #38
    • Support logging of emperror errors #178
    • Provide the ability to create a BusMessage targeting a specific saga instance #134

    Fixed Issues

    • Logging via the invocation interface does not log contextual data #175
    Source code(tar.gz)
    Source code(zip)
  • v1.1.3(Sep 25, 2019)

    Point release containing bug fixes and additional functionality

    Improvements

    • allow getting the saga id of the current invoked saga #169
    • Improved wording of saga documentation article (#164)

    Fixed Issues

    • fixing ReplyToInitiator not working when the initiator sends a message via the RPC interface (#163)
    • fixed logging issues (#167)
    Source code(tar.gz)
    Source code(zip)
  • v1.1.2(Sep 17, 2019)

    Improvements

    • added reply to initiator functionality to sagas #157
    • added generic handler metrics with message type as the label #144

    Fixed Issues

    • increased outbox VARCHAR column length to 2048 #155
    • Fix handle empty body #156
    • set the correct Type and Content-Type headers on outgoing messages #160
    Source code(tar.gz)
    Source code(zip)
  • v1.1.1(Aug 31, 2019)

    Point release rolling up fixes and improvements for the following issues

    Improvements

    • Improved observability via opentracing #141
    • Ability to handle raw amqp messages #137

    Fixed Issues

    • When a dead letter handler panics grabbit fails to reject the message #135
    • Worker leaks message channels #148
    • Handling incoming messages with an empty body stops consumer #146
    • Migrations table name is not consistent with naming convention #139
    Source code(tar.gz)
    Source code(zip)
  • v1.1.0(Aug 18, 2019)

    https://github.com/wework/grabbit/projects/1

    Improvments

    • Add the ability for handlers to track the number of execution attempts #95
    • Expose the logger in the invocation #87
    • Allow configuring max retries #84
    • Report message handling metrics #103
    • Persistent timeout manager #32
    • Saga timeouts should be reported via a metric #113
    • Improved documentation #38
    • Support database migrations for grabbit mysql persistent store #42
    • Cleanup logs #50
    • Allow Sending Raw Messages #102
    • Separate amqp connection for ingress and egress #46

    Fixed Issues

    • Shutting down txoutbox leaves dangling go routines #55
    • Running two separate bus instances for the same service causes high CPU utilization #124
    • A completing saga action may get executed twice #109
    • Dead Letter Handler does not nack/ack after Processing #104
    • Messages that originate from a saga and routed to the same service get rejected #64
    • Closing rows when querying transactional outbox fails #91
    • Saga configuration functions only run when saga created #74
    • Transactions managed incorrectly between retries #81
    Source code(tar.gz)
    Source code(zip)
  • v1.0.3(Jun 25, 2019)

  • v1.0.1(May 1, 2019)

  • v1.0.0(Apr 24, 2019)

    grabbit 1.0.0

    This is the initial release of grabbit and includes the following core functionality:

    • One way, request-reply and publish-subscribe message exchange
    • At least once messaging semantics via transactional outbox
    • Transactions
    • Support for distributed saga's
    • Deadlettering
    • Support for inter-service tracing via opentrace
    • Protobuf, Avro and gob message serialization
    Source code(tar.gz)
    Source code(zip)
Owner
WeWork
Working with and contributing to the open source community
WeWork
GoogleBookAPI is built on top of flogo, a flow based application.

GoogleBookAPI Example GoogleBookAPI is built on top of flogo, a flow based application. Upon launch for first time, the application creates a topic go

Abhishek K 0 Nov 19, 2021
rpipe relays message between child process and redis pubsub channel.

rpipe rpipe relays message between child process and redis pubsub channel. rpipe subscribes Redis channel named HOSTNAME. rpipe spawns child process a

sng2c 5 Jan 27, 2022
Remark42 is a self-hosted, lightweight, and simple comment engine

Remark42 is a self-hosted, lightweight, and simple (yet functional) comment engine, which doesn't spy on users. It can be embedded into blogs, articles or any other place where readers add comments.

Umputun 4.1k Dec 28, 2022
Best lightweight, powerful and really fast Api with Golang (Fiber, REL, Dbmate) PostgreSqL

Best lightweight, powerful and really fast Api with Golang (Fiber, REL, Dbmate) PostgreSqL

Elias Champi 1 Dec 26, 2021
Alibaba iLogtail : The Lightweight Collector of SLS in Alibaba Cloud

Alibaba iLogtail - The Lightweight Collector of SLS in Alibaba Cloud | 中文版本 iLogtail was born for observable scenarios and has many production-level f

Alibaba 1.1k Dec 27, 2022
Super lightweight, easy-to-develop, general purpose golang framework

Super lightweight, easy-to-develop, general purpose golang framework ??

Kazmer Dome 0 Jun 14, 2022
Prometheus Common Data Exporter can parse JSON, XML, yaml or other format data from various sources (such as HTTP response message, local file, TCP response message and UDP response message) into Prometheus metric data.

Prometheus Common Data Exporter Prometheus Common Data Exporter 用于将多种来源(如http响应报文、本地文件、TCP响应报文、UDP响应报文)的Json、xml、yaml或其它格式的数据,解析为Prometheus metric数据。

null 7 May 18, 2022
Sending emails using email server talking to RabbitMQ and send grid server sending emails to email ids consumed from RabbitMQ

Sending emails using email server talking to RabbitMQ and send grid server sending emails to email ids consumed from RabbitMQ

Shivanshu Raj Shrivastava 2 Oct 27, 2022
🔊Minimalist message bus implementation for internal communication

?? Bus Bus is a minimalist event/message bus implementation for internal communication. It is heavily inspired from my event_bus package for Elixir la

Mustafa Turan 282 Jan 3, 2023
Go simple async message bus

?? message-bus Go simple async message bus. ?? ABOUT Contributors: Rafał Lorenz Want to contribute ? Feel free to send pull requests! Have problems, b

Rafał Lorenz 240 Dec 29, 2022
Messagebus - Simple Message Bus Written in Golang

MessageBus Simple Message Bus Written in Golang How to Use go get gopkg.io/Usada

Usada Pekora 2 Apr 21, 2022
:incoming_envelope: A fast Message/Event Hub using publish/subscribe pattern with support for topics like* rabbitMQ exchanges for Go applications

Hub ?? A fast enough Event Hub for go applications using publish/subscribe with support patterns on topics like rabbitMQ exchanges. Table of Contents

Leandro Lugaresi 125 Dec 17, 2022
Testing message queues with RabbitMQ

Rabbit-MessageQueue Just a repository of RabbitMQ simple usage for queueing messages. You can use this as a sender or a receiver. More information is

Jawady Muhammad Habib 2 Mar 10, 2022
top in container - Running the original top command in a container

Running the original top command in a container will not get information of the container, many metrics like uptime, users, load average, tasks, cpu, memory, are about the host in fact. topic(top in container) will retrieve those metrics from container instead, and shows the status of the container, not the host.

silenceshell 131 Dec 2, 2022
Golang package that generates clean, responsive HTML e-mails for sending transactional mail

Hermes Hermes is the Go port of the great mailgen engine for Node.js. Check their work, it's awesome! It's a package that generates clean, responsive

Mathieu Cornic 2.6k Dec 28, 2022
Software Transactional Locks

Software Transactional Locks Package stl provides multiple atomic dynamic shared/exclusive locks, based on Software Transactional Memory (STM) concurr

Grigory Zubankov 25 Nov 5, 2022
Replacement of ApacheBench(ab), support for transactional requests, support for command line and package references to HTTP stress testing tool.

stress stress is an HTTP stress testing tool. Through this tool, you can do a stress test on the HTTP service and get detailed test results. It is ins

Wenjia Xiong 37 Aug 23, 2022
A demonstration of the transactional outbox messaging pattern (+ Log Trailing) with Amazon DynamoDB (+ Streams) written in Go.

Transactional Outbox Pattern in Amazon DynamoDB A demonstration of the transactional outbox messaging pattern (+ Log Trailing) with Amazon DynamoDB (+

A. Ruiz 3 Apr 12, 2022
Neutrino Corporation 2 Apr 12, 2022
Tidb - An open-source NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads

What is TiDB? TiDB ("Ti" stands for Titanium) is an open-source NewSQL database

null 0 Jan 5, 2022