🔊Minimalist message bus implementation for internal communication

Overview

🔊 Bus

GoDoc Build Status Coverage Status Go Report Card GitHub license

Bus is a minimalist event/message bus implementation for internal communication. It is heavily inspired from my event_bus package for Elixir language.

API

The method names and arities/args are stable now. No change should be expected on the package for the version 1.x.x except any bug fixes.

Installation

Via go packages: go get github.com/mustafaturan/bus

Usage

Configure

The package requires a unique id generator to assign ids to events. You can write your own function to generate unique ids or use a package that provides unique id generation functionality.

The bus package respect to software design choice of the packages/projects. It supports both singleton and dependency injection to init a bus instance.

Hint: Check the demo project for the singleton configuration.

Here is a sample initilization using monoton id generator:

import (
    "github.com/mustafaturan/bus"
    "github.com/mustafaturan/monoton"
    "github.com/mustafaturan/monoton/sequencer"
)

func NewBus() *bus.Bus {
    // configure id generator (it doesn't have to be monoton)
    node        := uint64(1)
    initialTime := uint64(1577865600000) // set 2020-01-01 PST as initial time
    m, err := monoton.New(sequencer.NewMillisecond(), node, initialTime)
    if err != nil {
        panic(err)
    }

    // init an id generator
    var idGenerator bus.Next = (*m).Next

    // create a new bus instance
    b, err := bus.NewBus(idGenerator)
    if err != nil {
        panic(err)
    }

    // maybe register topics in here
    b.RegisterTopics("order.received", "order.fulfilled")

    return b
}

Register Event Topics

To emit events to the topics, topic names need to be registered first:

// register topics
b.RegisterTopics("order.received", "order.fulfilled")

Register Event Handlers

To receive topic events you need to register handlers; A handler basically requires two vals which are a Handle function and topic Matcher regex pattern.

handler := bus.Handler{
    Handle: func(e *bus.Event) {
        // do something
        // NOTE: Highly recommended to process the event in an async way
    },
    Matcher: ".*", // matches all topics
}
b.RegisterHandler("a unique key for the handler", &handler)

Emit Events

// if txID val is blank, bus package generates one using the id generator
ctx := context.Background()
ctx = context.WithValue(ctx, bus.CtxKeyTxID, "some-transaction-id-if-exists")

// event topic name (must be registered before)
topic := "order.received"

// interface{} data for event
order := make(map[string]string)
order["orderID"]     = "123456"
order["orderAmount"] = "112.20"
order["currency"]    = "USD"

// emit the event
event, err := b.Emit(ctx, topic, order)

if err != nil {
    // report the err
    fmt.Println(err)
}

// if the caller needs the event, a ref for the event is returning as result of
// the `Emit` call.
fmt.Println(event)

Processing Events

When an event is emitted, the topic handlers receive the event synchronously. It is highly recommended to process events asynchronous. Package leave the decision to the packages/projects to use concurrency abstractions depending on use-cases. Each handlers receive the same event as ref of bus.Event struct:

// Event data structure
type Event struct {
	ID         string      // identifier
	TxID       string      // transaction identifier
	Topic      string      // topic name
	Data       interface{} // actual event data
	OccurredAt int64       // creation time in nanoseconds
}

Sample Project

A demo project with three consumers which increments a counter for each event topic, printer consumer which prints all events and lastly calculator consumer which sums amounts.

Benchmarks

BenchmarkEmit-4   	 5983903	       200 ns/op	     104 B/op	       2 allocs/op

Contributing

All contributors should follow Contributing Guidelines before creating pull requests.

Credits

Mustafa Turan

License

Apache License 2.0

Copyright (c) 2020 Mustafa Turan

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Issues
  • [feature] Same topic support multiple handlers

    [feature] Same topic support multiple handlers

    Currently, one topic only support one handler, as code as:

    handlers map[string]*Handler
    

    but in some scenario, some topic need multiple handlers.

    opened by navono 5
  • How to setup the point of synchronization between consumers and producer?

    How to setup the point of synchronization between consumers and producer?

    I see this in example code:

    // give some time to process events for async consumers
    time.Sleep(time.Millisecond * 25)
    

    This approach can not be use in production, Is there a more formal way to setup producer and consumers join point?

    If there is one consumer, within Event.Data can use channel or something to synchronize, buf if with multiple consumers, this approach is not feasible.

    Any suggestion?

    opened by navono 4
  • Update README.md

    Update README.md

    Prefixes the Event interface name with the bus package name as this is needed if copy/pasting from the provided example snippets.

    opened by tpberntsen 2
  • Clarify Semantics

    Clarify Semantics

    From the readme it's unclear how events are consumed. Will events always make it to all consumers? or can a consumer stop the processing? What is the order of processing of events? Is the order guaranteed? Would be great to clarify this.

    opened by tcurdt 2
  • Version 1.0.0 init

    Version 1.0.0 init

    🏄‍♂️

    opened by mustafaturan 0
  • Apply context best practices

    Apply context best practices

    • Extract context from struct as first arg
    • Ref: https://blog.golang.org/context-and-structs
    opened by mustafaturan 0
  • Optimize for memory allocations on emit

    Optimize for memory allocations on emit

    Optimize for memory allocations on emit

    opened by mustafaturan 0
Releases(v3.0.1)
Go simple async message bus

?? message-bus Go simple async message bus. ?? ABOUT Contributors: Rafał Lorenz Want to contribute ? Feel free to send pull requests! Have problems, b

Rafał Lorenz 182 Jul 13, 2021
A tiny wrapper around NSQ topic and channel :rocket:

Event Bus NSQ A tiny wrapper around go-nsq topic and channel. Protect nsq calls with gobreaker. Installation go get -u github.com/rafaeljesus/nsq-even

Rafael Jesus 67 Mar 24, 2021
[Go] Lightweight eventbus with async compatibility for Go

EventBus Package EventBus is the little and lightweight eventbus with async compatibility for GoLang. Installation Make sure that Go is installed on y

Alex Saskevich 952 Jul 23, 2021
A push notification server written in Go (Golang).

gorush A push notification micro server using Gin framework written in Go (Golang) and see the demo app. Contents gorush Contents Support Platform Fea

Bo-Yi Wu 5.6k Jul 23, 2021
websocket based messaging server written in golang

Guble Messaging Server Guble is a simple user-facing messaging and data replication server written in Go. Overview Guble is in an early state (release

Sebastian Mancke 148 Jun 3, 2021
A user friendly RabbitMQ library written in Golang.

TurboCookedRabbit A user friendly RabbitMQ library written in Golang to help use streadway/amqp. Based on my work found at CookedRabbit. Work Recently

Tristan (HouseCat) Hyams 80 Jul 21, 2021
Native Go bindings for D-Bus

dbus dbus is a simple library that implements native Go client bindings for the D-Bus message bus system. Features Complete native implementation of t

null 599 Jul 23, 2021
RabbitMQ wire tap and swiss army knife

rabtap - RabbitMQ wire tap Swiss army knife for RabbitMQ. Tap/Pub/Sub messages, create/delete/bind queues and exchanges, inspect broker. Contents Feat

null 185 Jul 24, 2021
:notes: Minimalist websocket framework for Go

melody ?? Minimalist websocket framework for Go. Melody is websocket framework based on github.com/gorilla/websocket that abstracts away the tedious p

Ola 2.2k Jul 25, 2021
Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Yuriy Nasretdinov 19 May 19, 2021
Chanify is a safe and simple notification tools. This repository is command line tools for Chanify.

Chanify is a safe and simple notification tools. For developers, system administrators, and everyone can push notifications with API.

Chanify 572 Jul 24, 2021
Higher level abstraction for Sarama.

kafka-do v0.1.5 kafka-do What Higher level abstraction for Sarama. Why We want to be able to write our kafka applications without making the same thin

seo.do 7 Jul 13, 2021
RES Service protocol library for Go

RES Service for Go Synchronize Your Clients Go package used to create REST, real time, and RPC APIs, where all your reactive web clients are synchroni

Samuel Jirénius 49 Jun 24, 2021
NanoMDM is a minimalist Apple MDM server heavily inspired by MicroMDM

NanoMDM NanoMDM is a minimalist Apple MDM server heavily inspired by MicroMDM. Getting started & Documentation Quickstart A quick guide to get NanoMDM

MicroMDM 15 Jul 15, 2021