Emits events in Go way, with wildcard, predicates, cancellation possibilities and many other good wins

Overview

Emitter wercker status Coverage Status godoc

The emitter package implements a channel-based pubsub pattern. The design goals are to use Golang concurrency model instead of flat callbacks and to design a very simple API that is easy to consume.

Why?

Go has expressive concurrency model but nobody uses it properly for pubsub as far as I can tell (in the year 2015). I implemented my own solution as I could not find any other that meets my expectations. Please, read this article for more information.

What it does?

Brief example

e := &emitter.Emitter{}
go func(){
	<-e.Emit("change", 42) // wait for the event sent successfully
	<-e.Emit("change", 37)
	e.Off("*") // unsubscribe any listeners
}()

for event := range e.On("change") {
	// do something with event.Args
	println(event.Int(0)) // cast the first argument to int
}
// listener channel was closed

Constructor

emitter.New takes a uint as the first argument to indicate what buffer size should be used for listeners. It is also possible to change the buffer capacity during runtime using the following code: e.Cap = 10.

By default, the emitter uses one goroutine per listener to send an event. You can change that behavior from asynchronous to synchronous by passing emitter.Sync flag as shown here: e.Use("*", emitter.Sync). I recommend specifying middlewares(see below) for the emitter at the begining.

Wildcard

The package allows publications and subscriptions with wildcard. This feature is based on path.Match function.

Example:

go e.Emit("something:special", 42)
event := <-e.Once("*") // search any events
println(event.Int(0)) // will print 42

// or emit an event with wildcard path
go e.Emit("*", 37) // emmit for everyone
event := <-e.Once("something:special")
println(event.Int(0)) // will print 37

Note that the wildcard uses path.Match, but the lib does not return errors related to parsing for this is not the main feature. Please check the topic specifically via emitter.Test() function.

Middlewares

An important part of pubsub package is the predicates. It should be allowed to skip some events. Middlewares address this problem. The middleware is a function that takes a pointer to the Event as its first argument. A middleware is capable of doing the following items:

  1. It allows you to modify an event.
  2. It allows skipping the event emitting if needed.
  3. It also allows modification of the event's arguments.
  4. It allows you to specify the mode to describe how exactly an event should be emitted(see below).

There are two ways to add middleware into the event emitting flow:

  • via .On("event", middlewares...)
  • via .Use("event", middlewares...)

The first one add middlewares only for a particular listener, while the second one adds middlewares for all events with a given topic.

For example:

// use synchronous mode for all events, it also depends
// on the emitter capacity(buffered/unbuffered channels)
e.Use("*", emitter.Sync)
go e.Emit("something:special", 42)

// define predicate
event := <-e.Once("*", func(ev *emitter.Event){
	if ev.Int(0) == 42 {
	    // skip sending
		ev.Flags = ev.Flags | emitter.FlagVoid
	}
})
panic("will never happen")

Flags

Flags needs to describe how exactly the event should be emitted. The available options are listed here.

Every event(emitter.Event) has a field called.Flags that contains flags as a binary mask. Flags can be set only via middlewares(see above).

There are several predefined middlewares to set needed flags:

You can chain the above flags as shown below:

e.Use("*", emitter.Void) // skip sending for any events
go e.Emit("surprise", 65536)
event := <-e.On("*", emitter.Reset, emitter.Sync, emitter.Once) // set custom flags for this listener
pintln(event.Int(0)) // prints 65536

Cancellation

Golang provides developers with a powerful control for its concurrency flow. We know the state of a channel and whether it would block a go routine or not. So, by using this language construct, we can discard any emitted event. It's a good practice to design your application with timeouts so that you cancel the operations if needed as shown below:

Assume you have time out to emit the events:

done := e.Emit("broadcast", "the", "event", "with", "timeout")

select {
case <-done:
	// so the sending is done
case <-time.After(timeout):
	// time is out, let's discard emitting
	close(done)
}

It's pretty useful to control any goroutines inside an emitter instance.

Callbacks-only usage

using the emitter in more traditional way is possible, as well. If you don't need the async mode or you very attentive to the application resources, then the recipe is to use an emitter with zero capacity or to use FlagVoid to skip sending into the listener channel and use middleware as callback:

e := &emitter.Emitter{}
e.Use("*", emitter.Void)

go e.Emit("change", "field", "value")
e.On("change", func(event *Event){
	// handle changes here
	field := event.String(0)
	value := event.String(1)
	// ...and so on
})

Groups

Group merges different listeners into one channel. Example:

e1 := &emitter.Emitter{}
e2 := &emitter.Emitter{}
e3 := &emitter.Emitter{}

g := &emitter.Group{Cap: 1}
g.Add(e1.On("first"), e2.On("second"), e3.On("third"))

for event := g.On() {
	// handle the event
	// event has field OriginalTopic and Topic
}

Also you can combine several groups into one.

See the api here.

Event

Event is a struct that contains event information. Also, th event has some helpers to cast various arguments into bool, string, float64, int by given argument index with an optional default value.

Example:

go e.Emit("*", "some string", 42, 37.0, true)
event := <-e.Once("*")

first := event.String(0)
second := event.Int(1)
third := event.Float(2)
fourth := event.Bool(3)

// use default value if not exists
dontExists := event.Int(10, 64)
// or use dafault value if type don't match
def := event.Int(0, 128)

// .. and so on

License

MIT

Comments
  • Recover from closed chan?

    Recover from closed chan?

    I noticed that you commented out the recovery lines for sending to closed chan. https://github.com/olebedev/emitter/blob/master/emitter.go#L343 When I used emitter, it errors out. What was the reason when you decide to comment them out? Can we add them back?

    Thanks, Sea

    opened by SeaYo 5
  • how to transfer object into the listener

    how to transfer object into the listener

    e := &ev.Emitter{} e.Use("*", ev.Void)

    e.On("main_event", func(evt *ev.Event) { message_like := evt.Args[0] js_content, _ := json.Marshal(message_like) fmt.Printf("\nBC mess:", js_content) })

    type Announcement struct { GameID int json:"gid" PlayerId int64 json:"pid" }

    b1 := &Announcement{ GameID: 1, PlayerId: 4234, } go ev.Emit("main_event", b1)

    how to cast the type into the receive listener?

    opened by jjhesk 2
  • Callbacks-only usage not working

    Callbacks-only usage not working

    Hi, thank you for this nice package. I was trying to use it following the callback-only instructions provided in the README, with no luck. There, you specify:

    The recipe is use emitter with zero capacity, define FlagVoid to skip sending into the listener channel and use middleware as callback

    I've tried changing FlagVoid to FlagSkip and now things seem to be working. Could it be that documentation is not right at this point, or am I doing something wrong?

    opened by svera 2
  • Add result to event

    Add result to event

    Don't know if is the right approach for:

    e.On("upload", function(ev *emitter.Emitter) {
       // Processing upload and return result
      ev.Result <- errors.New("Some error")
      // or
      ev.Result <- &CustomResult{}
    });
    
    ch := e.Emit("upload", file, header)
    
    select {
    case v := <-ch:
    	fmt.Println("Result:", v)
    case <-time.After(time.Second):
    	fmt.Println("Timeout")
    }
    

    // Also the following might be helpful

    bus.On("test", func(ev *Event, a, b int) {}) // Maybe event optional
    bus.Emit("test", 1, 2)
    
    opened by albulescu 0
  • Add cancellation possibilities

    Add cancellation possibilities

    Also service event like listener:new removed. Because there is no guarantee that event will delivered synchronously. In other case cancellation should be added as well.

    opened by olebedev 0
  • Cancellation example in readme has a race condition that could panic.

    Cancellation example in readme has a race condition that could panic.

    Correct me if I'm wrong, but it seems that the below example that's currently on the README.md has a small chance of panicking.

    done := e.Emit("broadcast", "the", "event", "with", "timeout")
    
    select {
    case <-done:
    	// so the sending is done
    case <-time.After(timeout):
    	// time is out, let's discard emitting
    	close(done)
    }
    

    If timeout happens at the same time when Emit closes the done channel, the done channel can be closed twice, which will cause a panic. To illustrate the point, the below code causes a panic reproducibly.

    e := emitter.Emitter{}
    ch := e.On("broadcast")
    
    done := e.Emit("broadcast", "the", "event", "with", "timeout")
    
    go func() {
    	time.Sleep(200 * time.Millisecond)
    	fmt.Println("this happens second")
    	<-ch
    }()
    
    select {
    case <-done:
    case <-time.After(100 * time.Millisecond):
    	fmt.Println("this happens first")
    	time.Sleep(200 * time.Millisecond)
    	fmt.Println("this happens third")
    	close(done)
    }
    
    opened by Gaboose 3
  • Wercker builds seem dead?

    Wercker builds seem dead?

    Not sure if the build list is supposed to be empty, or if the Oracle acquisition of Wercker maybe messed some things up, but currently Emitter has no build history.

    opened by sporkmonger 0
  • Abstracted path.Matcher to Matcher interface

    Abstracted path.Matcher to Matcher interface

    Hello,

    I had a project where I was using your library, but needed different matching than path.Match (our strings have "/" in them, but are not paths).

    So I pulled the match functionality out into an interface to make it easier to override. Thought I'd push it back and see if it was useful.

    Thanks for the cool library.

    opened by sampaioletti 1
Owner
Oleg Lebedev
Don’t let the Canva bugs bite
Oleg Lebedev
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

CloudHut 2.9k Dec 2, 2022
Tool for collect statistics from AMQP (RabbitMQ) broker. Good for cloud native service calculation.

amqp-statisticator Tool for collect statistics around your AMQP broker. For example RabbitMQ expose a lot information trought the management API, but

Jan Seidl 0 Dec 13, 2021
A brief demo of real-time plotting with Plotly, Go, and server-sent events

Golang SSE Demo A brief demo of real-time plotting with Plotly, Go, and server-side events. Overview I first learned about Server-Sent Events from @mr

Damon P. Cortesi 14 Nov 28, 2022
This service consumes events about new posts in go blog (go.dev)

This service consumes events about new posts in go blog (go.dev) from message broker (rabbitmq) (gbu-scanner service publishes these events) and sends notifications to websocket and grpc streams consumers.

null 0 Jan 29, 2022
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

bagher sohrabi 2 Oct 27, 2021
Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.

Cadence Visit cadenceworkflow.io to learn about Cadence. This repo contains the source code of the Cadence server. To implement workflows, activities

Uber Open Source 6.4k Dec 1, 2022
Declare AMQP entities like queues, producers, and consumers in a declarative way. Can be used to work with RabbitMQ.

About This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])(https://www.amqp.org) entities like queues, excha

Alex 79 Oct 10, 2022
Scalable real-time messaging server in language-agnostic way

Centrifugo is a scalable real-time messaging server in language-agnostic way. Centrifugo works in conjunction with application backend written in any

Centrifugal 6.6k Dec 2, 2022
⚡️ A lightweight service that will build and store your go projects binaries, Integrated with Github, Gitlab, Bitbucket and Bitbucket Server.

Rabbit A lightweight service that will build and store your go projects binaries. Rabbit is a lightweight service that will build and store your go pr

Ahmed 196 Nov 19, 2022
The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

houseme 5 Oct 20, 2022
⚡ HTTP/2 Apple Push Notification Service (APNs) push provider for Go — Send push notifications to iOS, tvOS, Safari and OSX apps, using the APNs HTTP/2 protocol.

APNS/2 APNS/2 is a go package designed for simple, flexible and fast Apple Push Notifications on iOS, OSX and Safari using the new HTTP/2 Push provide

Adam Jones 2.7k Dec 1, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Ken Hibino 4.8k Dec 5, 2022
💨 A real time messaging system to build a scalable in-app notifications, multiplayer games, chat apps in web and mobile apps.

Beaver A Real Time Messaging Server. Beaver is a real-time messaging server. With beaver you can easily build scalable in-app notifications, realtime

Ahmed 1.4k Dec 1, 2022
Build event-driven and event streaming applications with ease

Commander ?? Commander is Go library for writing event-driven applications. Enabling event sourcing, RPC over messages, SAGA's, bidirectional streamin

Jeroen Rinzema 61 Nov 2, 2022
Glue - Robust Go and Javascript Socket Library (Alternative to Socket.io)

Glue - Robust Go and Javascript Socket Library Glue is a real-time bidirectional socket library. It is a clean, robust and efficient alternative to so

DesertBit 408 Nov 25, 2022
Abstraction layer for simple rabbitMQ connection, messaging and administration

Jazz Abstraction layer for quick and simple rabbitMQ connection, messaging and administration. Inspired by Jazz Jackrabbit and his eternal hatred towa

SOCIFI Ltd. 17 Sep 27, 2022
Server-sent live updates: protocol and reference implementation

Protocol and Reference Implementation Mercure is a protocol allowing to push data updates to web browsers and other HTTP clients in a convenient, fast

Kévin Dunglas 3.2k Dec 2, 2022
A tiny wrapper around NSQ topic and channel :rocket:

Event Bus NSQ A tiny wrapper around go-nsq topic and channel. Protect nsq calls with gobreaker. Installation go get -u github.com/rafaeljesus/nsq-even

Rafael Jesus 75 Sep 27, 2022
A tiny wrapper over amqp exchanges and queues 🚌 ✨

Rabbus ?? ✨ A tiny wrapper over amqp exchanges and queues. In memory retries with exponential backoff for sending messages. Protect producer calls wit

Rafael Jesus 94 Sep 27, 2022