A tiny wrapper around NSQ topic and channel :rocket:

Overview

Event Bus NSQ

  • A tiny wrapper around go-nsq topic and channel.
  • Protect nsq calls with gobreaker.

Installation

go get -u github.com/rafaeljesus/nsq-event-bus

Usage

The nsq-event-bus package exposes a interface for emitting and listening events.

Emitter

import "github.com/rafaeljesus/nsq-event-bus"

topic := "events"
emitter, err := bus.NewEmitter(bus.EmitterConfig{
  Address: "localhost:4150",
  MaxInFlight: 25,
})

e := event{}
if err = emitter.Emit(topic, &e); err != nil {
  // handle failure to emit message
}

// emitting messages on a async fashion
if err = emitter.EmitAsync(topic, &e); err != nil {
  // handle failure to emit message
}

Listener

import "github.com/rafaeljesus/nsq-event-bus"

if err = bus.On(bus.ListenerConfig{
  Topic:              "topic",
  Channel:            "test_on",
  HandlerFunc:        handler,
  HandlerConcurrency: 4,
}); err != nil {
  // handle failure to listen a message
}

func handler(message *Message) (reply interface{}, err error) {
  e := event{}
  if err = message.DecodePayload(&e); err != nil {
    message.Finish()
    return
  }

  if message.Attempts > MAX_DELIVERY_ATTEMPTS {
    message.Finish()
    return
  }

  err, _ = doWork(&e)
  if err != nil {
    message.Requeue(BACKOFF_TIME)
    return
  }

  message.Finish()
  return
}

Request (Request/Reply)

import "github.com/rafaeljesus/nsq-event-bus"

topic := "user_signup"
emitter, err = bus.NewEmitter(bus.EmitterConfig{})

e := event{Login: "rafa", Password: "ilhabela_is_the_place"}
if err = bus.Request(topic, &e, handler); err != nil {
  // handle failure to listen a message
}

func handler(message *Message) (reply interface{}, err error) {
  e := event{}
  if err = message.DecodePayload(&e); err != nil {
    message.Finish()
    return
  }

  reply = &Reply{}
  message.Finish()
  return
}

Contributing

  • Fork it
  • Create your feature branch (git checkout -b my-new-feature)
  • Commit your changes (git commit -am 'Add some feature')
  • Push to the branch (git push origin my-new-feature)
  • Create new Pull Request

Badges

Build Status Go Report Card Go Doc


GitHub @rafaeljesus  ·  Medium @_jesus_rafael  ·  Twitter @_jesus_rafael

Comments
  • Fix default value from EmitterConfig Address

    Fix default value from EmitterConfig Address

    Fixed default value to Producer's address.

    # before changes
    nsq-event-bus (fix-conf-address)$ go test -run TestNewEmitter
    --- FAIL: TestNewEmitter (0.00s)
    	emitter_test.go:19: Expected emitter address "new-address", got "" 
    FAIL
    exit status 1
    FAIL	github.com/rafaeljesus/nsq-event-bus	0.003s
    
    # after changes
    nsq-event-bus (fix-conf-address)$ go test -run TestNewEmitter
    PASS
    ok  	github.com/rafaeljesus/nsq-event-bus	0.004s
    
    bug 
    opened by aalvesjr 2
  • Change handlerfunc signature

    Change handlerfunc signature

    This PR exposes the bus.Message which wraps nsq.Message allowing developers to handle messages pretty like the nsq.Message

    closes https://github.com/rafaeljesus/nsq-event-bus/issues/7

    feature 
    opened by rafaeljesus 2
  • Allows emitter and listener to be configurable

    Allows emitter and listener to be configurable

    Allows emitter and listener to be configurable. To achieve this both event bus methods must be totally separated. This PR separates them having each one with it's own configuration.

    • [x] Expose Emitter with Emit methods
    • [x] Expose Emitter with Request methods
    • [x] Expose Listener with On method

    closes https://github.com/rafaeljesus/nsq-event-bus/issues/3

    ⚠ important feature 
    opened by rafaeljesus 2
  • go get URL wrong on README

    go get URL wrong on README

    When using the example from README to get the package, we get this error:

    go get -u https://github.com/rafaeljesus/nsq-event-bus
    package https:/github.com/rafaeljesus/nsq-event-bus: "https://" not allowed in import path
    

    There is no need for the https in the go get... ;)

    enhancement 
    opened by joaoh82 1
  • nsq on client side (consumer)

    nsq on client side (consumer)

    I created a route for http request that handles all incoming requests then i push them on event bus by bus.NewEmitter() method, then return status code 200 to user. on the other side (consumer) what is the best approach to handle event bus ? waitGroup with large number like wg.Add(100000) or infinite loop ? or .. ?

    can you help me with example ...

    opened by farzad-845 0
  • Make message a nsq.message struct

    Make message a nsq.message struct

    In order to keep exposing the most important features from NSQ and give all flexibility the official driver does for developers, the event bus pkg should expose an bus.Message wrapping nsq.Message allowing developers to handle messages pretty like the same when with nsq.Message

    feature 
    opened by rafaeljesus 0
  • Unable to receive messages, but nsq_tail works

    Unable to receive messages, but nsq_tail works

    I've set up a simple test of the event-bus library. Its a simple Listener for messages on a channel called SE on a Topic called hypatia-inbound. I use the following to_nsq command line to send messages

    echo "JOHN,CHERIE" | to_nsq -delimiter="," -topic="hypatia-inbound" -nsqd-tcp-address="127.0.0.1:4150" -rate=2
    

    When I run tail_nsq, I receive the messages. However, from a small utility built with this code

    type event struct{ Name string }
    
    var wg sync.WaitGroup
    
    func main() {
    
    	wg.Add(1)
    	if err := bus.On(bus.ListenerConfig{
    		Topic:               "hypatia-inbound",
    		Channel:             "se",
    		HandlerFunc:         recv_handler,
    		Lookup:              []string{"127.0.0.1:4161"},
    		HandlerConcurrency:  4,
    		DialTimeout:         time.Second * 5,
    		ReadTimeout:         time.Second * 60,
    		WriteTimeout:        time.Second * 1,
    		LookupdPollInterval: time.Second * 60,
    		LookupdPollJitter:   0.3,
    		MaxRequeueDelay:     time.Second * 5,
    		DefaultRequeueDelay: time.Second * 5,
    	}); err != nil {
    		fmt.Println("Error in main. err ", err)
    		fmt.Println("---------------------------------------")
    	}
    	wg.Wait()
    }
    
    func recv_handler(message *bus.Message) (reply interface{}, err error) {
    	e := event{}
    
    	fmt.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
    	fmt.Println(message)
    	fmt.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
    
    	if err = message.DecodePayload(&e); err != nil {
    		message.Finish()
    		wg.Done()
    		fmt.Println("Error decoding message ", err)
    		return
    	}
    
    	fmt.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
    	fmt.Println(e)
    	fmt.Println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
    
    	message.Finish()
    	wg.Done()
    
    	return
    }
    

    I receive the error shown below. My handler is never called. Have I mis-configured this?

    017/07/12 20:30:43 ERR    1 [hypatia-inbound/se] Handler returned error (invalid character 'C' looking for beginning of value) for msg 083dce4335207000
    
    opened by johnptoohey 4
Owner
Rafael Jesus
fork() -> exec(...)
Rafael Jesus
The official Go package for NSQ

go-nsq The official Go package for NSQ. Docs See godoc and the main repo apps directory for examples of clients built using this package. Tests Tests

NSQ 2.3k Sep 24, 2022
NSQ as backend for Queue Package

NSQ as backend for Queue Package

golang-queue 10 Jul 4, 2022
Topictool - Batch replace, add or remove Github repository topic labels

Topictool CLI Tool to manage topic labels on Github repositories Installation go

Christian Bargmann 0 Feb 3, 2022
A tiny wrapper over amqp exchanges and queues 🚌 ✨

Rabbus ?? ✨ A tiny wrapper over amqp exchanges and queues. In memory retries with exponential backoff for sending messages. Protect producer calls wit

Rafael Jesus 93 May 29, 2022
The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which can provide developers with stable, reliable, and efficient push services.

Go-Push-API MiPush、JiPush、UMeng MiPush The Xiaomi message push service is a system-level channel on MIUI and is universal across the platform, which c

houseme 4 Jun 25, 2022
A library for scheduling when to dispatch a message to a channel

gosd go-schedulable-dispatcher (gosd), is a library for scheduling when to dispatch a message to a channel. Implementation The implementation provides

Alexander Sniffin 20 Aug 17, 2022
replicate messages from streaming channel to jetstream

NATS Streaming/Jetstream Replicator [SJR] Introduction This project replicates messages from streaming channels to jetstream. but why? At Snapp when w

Snapp Cab Incubators 15 Mar 25, 2022
A wrapper of streadway/amqp that provides reconnection logic and sane defaults

go-rabbitmq Wrapper of streadway/amqp that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐ Supporte

Lane Wagner 385 Sep 24, 2022
goczmq is a golang wrapper for CZMQ.

goczmq Introduction A golang interface to the CZMQ v4.2 API. Install Dependencies libsodium libzmq czmq For CZMQ master go get github.com/zeromq/goczm

The ZeroMQ project 519 Aug 22, 2022
Golang API wrapper for MangaDex v5's MVP API.

mangodex Golang API wrapper for MangaDex v5's MVP API. Full documentation is found here. This API is still in Open Beta, so testing may not be complet

null 33 Aug 7, 2022
Golang AMQP wrapper for RabbitMQ with better API

go-rabbitmq Golang AMQP wrapper for RabbitMQ with better API Table of Contents Background Features Usage Installation Connect to RabbitMQ Declare Queu

Hadi Hidayat Hammurabi 7 Sep 6, 2022
High abstraction wrapper for Golang Rabbit MQ Client

GRMQ Go Rabbit MQ What are the typical use-cases of RabbitMQ broker ? We create

null 5 Jan 12, 2022
⚡️ A lightweight service that will build and store your go projects binaries, Integrated with Github, Gitlab, Bitbucket and Bitbucket Server.

Rabbit A lightweight service that will build and store your go projects binaries. Rabbit is a lightweight service that will build and store your go pr

Ahmed 196 Aug 11, 2022
Cadence is a distributed, scalable, durable, and highly available orchestration engine to execute asynchronous long-running business logic in a scalable and resilient way.

Cadence Visit cadenceworkflow.io to learn about Cadence. This repo contains the source code of the Cadence server. To implement workflows, activities

Uber Open Source 6.3k Sep 24, 2022
⚡ HTTP/2 Apple Push Notification Service (APNs) push provider for Go — Send push notifications to iOS, tvOS, Safari and OSX apps, using the APNs HTTP/2 protocol.

APNS/2 APNS/2 is a go package designed for simple, flexible and fast Apple Push Notifications on iOS, OSX and Safari using the new HTTP/2 Push provide

Adam Jones 2.7k Sep 22, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be sc

Ken Hibino 4.2k Sep 22, 2022
💨 A real time messaging system to build a scalable in-app notifications, multiplayer games, chat apps in web and mobile apps.

Beaver A Real Time Messaging Server. Beaver is a real-time messaging server. With beaver you can easily build scalable in-app notifications, realtime

Ahmed 1.4k Sep 27, 2022
Build event-driven and event streaming applications with ease

Commander ?? Commander is Go library for writing event-driven applications. Enabling event sourcing, RPC over messages, SAGA's, bidirectional streamin

Jeroen Rinzema 60 Jun 13, 2022
Emits events in Go way, with wildcard, predicates, cancellation possibilities and many other good wins

Emitter The emitter package implements a channel-based pubsub pattern. The design goals are to use Golang concurrency model instead of flat callbacks

Oleg Lebedev 434 Sep 27, 2022