High performance, distributed and low latency publish-subscribe platform.

Overview

Join the chat at https://gitter.im/emitter-io/public Build status Coverage Status Go Report Card Twitter Follow

Emitter: Distributed Publish-Subscribe Platform

Emitter is a distributed, scalable and fault-tolerant publish-subscribe platform built with MQTT protocol and featuring message storage, security, monitoring and more:

  • Publish/Subscribe using MQTT over TCP or Websockets.
  • Resilient, highly available and partition tolerant (AP in CAP terms).
  • Able to handle 3+ million of messages/sec on a single broker.
  • Supports message storage with history and message-level expiry.
  • Provides secure channel keys with permissions and can face the internet.
  • Automatic TLS/SSL and encrypted inter-broker communication.
  • Built-in monitoring with Prometheus, StatsD and more.
  • Shared subscriptions, links and private links for channels.
  • Easy deployment with Docker and Kubernetes of production-ready clusters.

Emitter can be used for online gaming and mobile apps by satisfying the requirements for low latency, binary messaging and high throughput. It can also be used for the real-time web application such as dashboards or visual analytics or chat systems. Moreover, Emitter is perfect for the internet of things and allows sensors to be controlled and data gathered and analyzed.

Tutorials & Demos

The following video tutorials demonstrate various features of Emitter in action.

FOSDEM 2018 FOSDEM 2019 PubSub in Go Message Storage Using MQTTSpy ISS Tracking Self-Signed TLS Monitor with eTop StatsD and DataDog Links & Private Links Building a Client-Server app with Publish-Subscribe in Go Distributed Actor Model with Publish/Subscribe and Golang Online Multiplayer Platformer Game with Emitter Keeping one Last Message per Channel using MQTT Retain Load-balance Messages using Subscriber Groups

How to Deploy

Local Emitter Cluster K8s and DigitalOcean K8s and Google Cloud K8s and Azure

Quick Start

Run Server

The quick way to start an Emitter broker is by using docker run command as shown below.

Notice: You must use -e for docker environment.You could get it from your docker log.

docker run -d --name emitter -p 8080:8080 --restart=unless-stopped emitter/server

Alternatively, you might compile this repository and use go get command to rebuild and run from source.

go get -u github.com/emitter-io/emitter && emitter

Get License

Both commands above start a new server and if no configuration or environment variables were supplied, it will print out a message(or you could find it in the docker log) similar to the message below once the server has started :

[service] unable to find a license, make sure 'license' value is set in the config file or EMITTER_LICENSE environment variable
[service] generated new license: uppD0PFIcNK6VY-7PTo7uWH8EobaOGgRAAAAAAAAAAI
[service] generated new secret key: JUoOxjoXLc4muSxXynOpTc60nWtwUI3o

This message shows that a new security configuration was generated, you can then re-run EMITTER_LICENSE set to the specified value. Alternatively, you can set "license" property in the emitter.conf configuration file.

Re-Run Command

References are available. Please replace your EMITTER_LICENSE to it.

docker run -d --name emitter -p 8080:8080 -e EMITTER_LICENSE=uppD0PFIcNK6VY-7PTo7uWH8EobaOGgRAAAAAAAAAAI --restart=unless-stopped emitter/server

Generate Key

Finally, open a browser and navigate to http://127.0.0.1:8080/keygen in order to generate your key. Now you can use the secret key generated to create channel keys, which allow you to secure individual channels and start using emitter.

Warning: If you use upon command, you secret is JUoOxjoXLc4muSxXynOpTc60nWtwUI3o. And it's not safe!!!

Usage Example

The code below shows a small example of usage of emitter with the Javascript SDK. As you can see, the API exposes straightforward methods such as publish and subscribe which can take binary payload and are secured through channel keys.

// connect to emitter service
var connection = emitter.connect({ host: '127.0.0.1' });

// once we're connected, subscribe to the 'chat' channel
emitter.on('connect', function(){
    emitter.subscribe({
        key: "<channel key>",
        channel: "chat"
    });
});

// publish a message to the chat channel
emitter.publish({
    key: "<channel key>",
    channel: "chat/my_name",
    message: "hello, emitter!"
});

Further documentation, demos and language/platform SDKs are available in the develop section of our website. Make sure to check out the getting started tutorial which explains the basic usage of emitter and MQTT.

Command line arguments

The Emitter broker accepts command line arguments, allowing you to specify a configuration file, usage is shown below.

-config string
   The configuration file to use for the broker. (default "emitter.conf")

-help
   Shows the help and usage instead of running the broker.

Configuration File

The configuration file (defaulting to emitter.conf) is the main way of configuring the broker. The configuration file is however, not the only way of configuring it as it allows a multi-level override through environment variables and/or hashicorp Vault.

The configuration file is in JSON format, but you can override any value by providing an environment variable which follows a particular format, for example if you'd like to provide a license through environment variable, simply define EMITTER_LICENSE environment variable, similarly, if you want to specify a certificate, define EMITTER_TLS_CERTIFICATE environment variable. Example of configuration file:

{
    "listen": ":8080",
    "license": "/*The license*/",
    "tls": {
        "listen": ":443",
        "host": "example.com"
    },
    "cluster": {
        "listen": ":4000",
        "seed": " 192.168.0.2:4000",
        "advertise": "public:4000"
    },
    "storage": {
        "provider": "inmemory"
    }
}

The structure of the configuration is described below:

Property Env. Variable Description
license EMITTER_LICENSE The license file to use for the broker. This contains the encryption key.
listen EMITTER_LISTEN The API address used for TCP & Websocket communication, in IP:PORT format (e.g: :8080).
limit.messageSize EMITTER_LIMIT_MESSAGESIZE Maximum message size. Default is 64KB.
tls.listen EMITTER_TLS_LISTEN The API address used for Secure TCP & Websocket communication, in IP:PORT format (e.g: :443).
tls.host EMITTER_TLS_HOST The hostname to whitelist for the certificate.
tls.email EMITTER_TLS_EMAIL The email account to use for autocert.
vault.address EMITTER_VAULT_ADDRESS The Hashicorp Vault address to use to further override configuration.
vault.app EMITTER_VAULT_APP The Hashicorp Vault application ID to use.
cluster.name EMITTER_CLUSTER_NAME The name of this node. This must be unique in the cluster. If this is not set, Emitter will set it to the external IP address of the running machine.
cluster.listen EMITTER_CLUSTER_LISTEN The IP address and port that is used to bind the inter-node communication network. This is used for the actual binding of the port.
cluster.advertise EMITTER_CLUSTER_ADVERTISE The address and port to advertise inter-node communication network. This is used for nat traversal.
cluster.seed EMITTER_CLUSTER_SEED The seed address (or a domain name) for cluster join.
cluster.passphrase EMITTER_CLUSTER_PASSPHRASE Passphrase is used to initialize the primary encryption key in a keyring. This key is used for encrypting all the gossip messages (message-level encryption).
storage.provider EMITTER_STORAGE_PROVIDER This property represents the publishers publish message storage mode. there are two kinds of can use, they are respectively inmemory and ssd, defaults to the former.
storage.config.dir EMITTER_STORAGE_CONFIG If the storage mode is ssd, this property indicates where the messages are stored (emitter server nodes are not allowed to use the same directory within the same machine)

Building and Testing

The server requires Golang 1.9 to be installed. Once you have this installed, simply go get this repository and run the following commands to download the package and run the server.

go get -u github.com/emitter-io/emitter && emitter

If you want to run the tests, simply run go test command as demonstrated below.

go test ./...

Deploying as Docker Container

Docker Automated build Docker Pulls

Emitter is conveniently packaged as a docker container. To run the emitter service on a single server, use the command below. Once the server is started, it will generate a new security configuration, you can then re-run the same command with an additional environment variable -e EMITTER_LICENSE set to the provided value.

docker run -d -p 8080:8080 emitter/server

For the clustered (multi-server) mode, the container can be started using the simple docker run with 3 main parameters.

docker run -d -p 8080:8080 -p 4000:4000 -e EMITTER_LICENSE=[key] -e EMITTER_CLUSTER_SEED=[seed] -e EMITTER_CLUSTER_PASSPHRASE=[name] emitter/server

Support, Discussion, and Community

If you need any help with Emitter Server or any of our client SDKs, please join us at either our gitter chat where most of our team hangs out at or drop us an e-mail at [email protected].

Please submit any Emitter bugs, issues, and feature requests to emitter-io>emitter. If there are any security issues, please email [email protected] instead of posting an open issue in Github.

Contributing

If you'd like to contribute, please fork the repository and use a feature branch. Pull requests are warmly welcome.

Licensing

Copyright (c) 2009-2019 Misakai Ltd. This project is licensed under Affero General Public License v3.

Emitter offers support contracts and is now also offered via a commercial license. Please contact [email protected] for more information.

Issues
  • Private link feature request

    Private link feature request

    Hi, We have a feature request for the create link function.

    When we create a link on the client side and publish a message to that link, the server answers the message but there is no generic way to match the request and the response on the client side and we need an "id" to match the request with the response.

    So here is our proposition:

    The server app creates a key for the private link main channel:

    emitter.keygen({ key: secretKey, channel: "rpc/", type: "e", ttl: 0 });

    Emitter generates a key which has permission to create a link on channel "rpc/".

    The key is provided to the client application.

    The client sends a create link request to emitter:

    emitter.link({ key: key, channel: "rpc/", subscribe: true, ttl: 0 }); Note: name, me and private options are truncated since they are not needed.

    If subscribe option = true, Emitter automatically subscribes user to channel "rpc/{userid}/".

    Emitter creates a key with "w" permission on "rpc/{userid}/#".

    Emitter returns the channel name "rpc/{userid}" and the generated key in response to the link request.

    Now the client can publish messages to "rpc/{userid}/{requestid}" using the key provided.

    emitter.publish({ key: key, channel: "rpc/{userid}/{requestid}", message: "A sample request", me: false }); Using this mechanism, no shortcut is needed. Private messages are sent via normal publish messages.

    The server subscribes to channel "rpc/" and responds to messages it receives on this channel with the same channel name which also includes the request id.

    This mechanism can be added as an alternative link function with a name like "link2" not to break current implementations.

    Does it sound reasonable?

    opened by postacik 21
  • How to get channel key?

    How to get channel key?

    My work step:

    1 download version 2.7 for linux amd64 2 run ./emiter 3 edit emiter.conf and fill license 4 re-run ./emiter 5 console is:

    $ ./emitter 
    2020/11/16 14:15:11 [service] configured logging provider (stderr)
    2020/11/16 14:15:11 [service] configured message storage (inmemory)
    2020/11/16 14:15:11 [service] configured usage metering (noop)
    2020/11/16 14:15:11 [service] configured contracts provider (single)
    2020/11/16 14:15:11 [service] configured monitoring sink (self)
    2020/11/16 14:15:11 [service] configured node name (00:0c:29:dc:50:43)
    2020/11/16 14:15:11 [service] starting the listener (0.0.0.0:8080)
    2020/11/16 14:15:11 [tls] unable to configure certificates, make sure a valid cache or certificate is configured
    2020/11/16 14:15:11 [service] service started
    

    6 broswer open url http://127.0.0.1:8080/keygen 7 fill Secret Key and Target Channel and click Genrate Key button 8 didnot get channel key and got toastthe security key provided is not authorized to perform this operation

    Try the similar use docker by https://github.com/emitter-io/emitter#get-license and result is same

    Quest: How to get the channel key?

    opened by leenux 14
  • Failover to other cluster node

    Failover to other cluster node

    Do the emitter client libraries perform automatic failover to a cluster node when the initially connected emitter node goes down or do we have to keep a list of all emitter nodes to do manual failover?

    opened by postacik 14
  • Presence issue in cluster mode

    Presence issue in cluster mode

    Hi,

    We have a problem with the following scenario:

    A is connected to cluster1 and subscribes to "channel/" and also subscribes to presence of "channel/". B is connected to cluster2 and subscribes to "channel/" and also subscribes to presence of "channel/".

    When A disconnects from cluster1, B gets an "unsubscribe" event with the id of A.

    But when cluster1 goes down while A is still connected, B does not get an "unsubscribe" event.

    Is this behavior by design or is it a bug?

    bug 
    opened by postacik 13
  • Presence question

    Presence question

    Hi,

    If I subscribe to "channel/" for presence changes, I get presence events for subchannels (e.g. channel/sub), too.

    Is this the normal behavior?

    opened by postacik 11
  • How to emulate request/response pattern (or RPC) on emitter?

    How to emulate request/response pattern (or RPC) on emitter?

    Some brokers like Kafka or RabbitMQ has built-in support for request/response pattern or remote procedure calls (RPC).

    Do you have a working example of implementing this pattern on emitter?

    Thanks :)

    opened by postacik 11
  • frequent  crash , fatal error: runtime: out of memory

    frequent crash , fatal error: runtime: out of memory

    happened when disconnect from emitter server , I don't known the log is helpful enough , if you need more details to trace , I will add more details later

    2018/04/15 19:39:33 [conn] created (63FUOYXV3K244TDPDMNQUTZH5Q)
    runtime stack:
    runtime.throw(0xb9f620, 0x16)
    	/usr/local/go/src/runtime/panic.go:619 +0x81
    runtime.sysMap(0xc440500000, 0x20000000, 0x5db00, 0x10a4bb8)
    	/usr/local/go/src/runtime/mem_linux.go:216 +0x20a
    runtime.(*mheap).sysAlloc(0x108b480, 0x20000000, 0x5db93)
    	/usr/local/go/src/runtime/malloc.go:470 +0xd4
    runtime.(*mheap).grow(0x108b480, 0x10000, 0x0)
    	/usr/local/go/src/runtime/mheap.go:907 +0x60
    runtime.(*mheap).allocSpanLocked(0x108b480, 0x10000, 0x10a4be0, 0xc51f01)
    	/usr/local/go/src/runtime/mheap.go:820 +0x301
    runtime.(*mheap).allocManual(0x108b480, 0x10000, 0x10a4be0, 0x0)
    	/usr/local/go/src/runtime/mheap.go:782 +0x53
    runtime.stackalloc(0x7ff620000000, 0x622c57, 0x0)
    	/usr/local/go/src/runtime/stack.go:384 +0x20e
    runtime.copystack(0xc42013d980, 0x20000000, 0x7ff66d3f4d01)
    	/usr/local/go/src/runtime/stack.go:840 +0x7d
    runtime.newstack()
    	/usr/local/go/src/runtime/stack.go:1063 +0x30f
    runtime.morestack()
    	/usr/local/go/src/runtime/asm_amd64.s:480 +0x89
    
    goroutine 1156 [copystack]:
    runtime.heapBitsSetType(0xc4201fa7b0, 0x30, 0x30, 0xb64b20)
    	/usr/local/go/src/runtime/mbitmap.go:864 +0x61c fp=0xc430500378 sp=0xc430500370 pc=0x6256fc
    runtime.mallocgc(0x30, 0xb64b20, 0xb08d01, 0x0)
    	/usr/local/go/src/runtime/malloc.go:740 +0x548 fp=0xc430500418 sp=0xc430500378 pc=0x622658
    runtime.newobject(0xb64b20, 0xc420339ce8)
    	/usr/local/go/src/runtime/malloc.go:839 +0x38 fp=0xc430500448 sp=0xc430500418 pc=0x622c58
    runtime.makemap(0xae91a0, 0x1, 0x0, 0xc4305004d0)
    	/usr/local/go/src/runtime/hashmap.go:312 +0x190 fp=0xc430500488 sp=0xc430500448 pc=0x619600
    github.com/emitter-io/emitter/broker/message.toCompressed(0xc420339ce8, 0xc430500578)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:472 +0x64 fp=0xc430500530 sp=0xc430500488 pc=0x9a53d4
    github.com/emitter-io/emitter/broker/message.clean(0xc42000e698)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:402 +0x45 fp=0xc430500568 sp=0xc430500530 pc=0x9a4f35
    github.com/emitter-io/emitter/broker/message.(*Trie).iinsert(0xc42000e598, 0xc42000f638, 0xc42000e698, 0xc420490ca8, 0x1, 0x1, 0xbfdae0, 0xc42037a6c0, 0xc430500660)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:235 +0x46b fp=0xc4305005e8 sp=0xc430500568 pc=0x9a441b
    github.com/emitter-io/emitter/broker/message.(*Trie).iinsert(0xc42000e598, 0xc42000e698, 0xc42000e588, 0xc420490ca4, 0x2, 0x2, 0xbfdae0, 0xc42037a6c0, 0xc420323b40)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:213 +0x28a fp=0xc430500668 sp=0xc4305005e8 pc=0x9a423a
    github.com/emitter-io/emitter/broker/message.(*Trie).iinsert(0xc42000e598, 0xc42000e588, 0x0, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xc420188890)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:213 +0x28a fp=0xc4305006e8 sp=0xc430500668 pc=0x9a423a
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:178 +0x75 fp=0xc430500740 sp=0xc4305006e8 pc=0x9a3ea5
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500798 sp=0xc430500740 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305007f0 sp=0xc430500798 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500848 sp=0xc4305007f0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305008a0 sp=0xc430500848 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305008f8 sp=0xc4305008a0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500950 sp=0xc4305008f8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305009a8 sp=0xc430500950 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500a00 sp=0xc4305009a8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500a58 sp=0xc430500a00 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500ab0 sp=0xc430500a58 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500b08 sp=0xc430500ab0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500b60 sp=0xc430500b08 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500bb8 sp=0xc430500b60 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500c10 sp=0xc430500bb8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500c68 sp=0xc430500c10 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500cc0 sp=0xc430500c68 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500d18 sp=0xc430500cc0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500d70 sp=0xc430500d18 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500dc8 sp=0xc430500d70 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500e20 sp=0xc430500dc8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500e78 sp=0xc430500e20 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500ed0 sp=0xc430500e78 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500f28 sp=0xc430500ed0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500f80 sp=0xc430500f28 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430500fd8 sp=0xc430500f80 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501030 sp=0xc430500fd8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501088 sp=0xc430501030 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305010e0 sp=0xc430501088 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501138 sp=0xc4305010e0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501190 sp=0xc430501138 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305011e8 sp=0xc430501190 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501240 sp=0xc4305011e8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501298 sp=0xc430501240 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305012f0 sp=0xc430501298 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501348 sp=0xc4305012f0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305013a0 sp=0xc430501348 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305013f8 sp=0xc4305013a0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501450 sp=0xc4305013f8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305014a8 sp=0xc430501450 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501500 sp=0xc4305014a8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501558 sp=0xc430501500 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305015b0 sp=0xc430501558 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501608 sp=0xc4305015b0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501660 sp=0xc430501608 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305016b8 sp=0xc430501660 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501710 sp=0xc4305016b8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501768 sp=0xc430501710 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305017c0 sp=0xc430501768 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501818 sp=0xc4305017c0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501870 sp=0xc430501818 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305018c8 sp=0xc430501870 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501920 sp=0xc4305018c8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501978 sp=0xc430501920 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305019d0 sp=0xc430501978 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501a28 sp=0xc4305019d0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501a80 sp=0xc430501a28 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501ad8 sp=0xc430501a80 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501b30 sp=0xc430501ad8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501b88 sp=0xc430501b30 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501be0 sp=0xc430501b88 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501c38 sp=0xc430501be0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501c90 sp=0xc430501c38 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501ce8 sp=0xc430501c90 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501d40 sp=0xc430501ce8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501d98 sp=0xc430501d40 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501df0 sp=0xc430501d98 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501e48 sp=0xc430501df0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501ea0 sp=0xc430501e48 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501ef8 sp=0xc430501ea0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501f50 sp=0xc430501ef8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430501fa8 sp=0xc430501f50 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502000 sp=0xc430501fa8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502058 sp=0xc430502000 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305020b0 sp=0xc430502058 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502108 sp=0xc4305020b0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502160 sp=0xc430502108 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305021b8 sp=0xc430502160 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502210 sp=0xc4305021b8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502268 sp=0xc430502210 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305022c0 sp=0xc430502268 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502318 sp=0xc4305022c0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502370 sp=0xc430502318 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305023c8 sp=0xc430502370 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502420 sp=0xc4305023c8 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502478 sp=0xc430502420 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305024d0 sp=0xc430502478 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502528 sp=0xc4305024d0 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502580 sp=0xc430502528 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc4305025d8 sp=0xc430502580 pc=0x9a3f95
    github.com/emitter-io/emitter/broker/message.(*Trie).Subscribe(0xc42000e598, 0xc420490ca0, 0x3, 0x3, 0xbfdae0, 0xc42037a6c0, 0xbfdae0, 0xc42037a6c0, 0xc420188800)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/message/subtrie.go:179 +0x165 fp=0xc430502630 sp=0xc4305025d8 pc=0x9a3f95
    ...additional frames elided...
    created by github.com/emitter-io/emitter/broker.(*Service).onAcceptConn
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:267 +0x61
    
    goroutine 1 [select (no cases), 17 minutes]:
    github.com/emitter-io/emitter/broker.(*Service).Listen(0xc42008b4a0, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:190 +0x2e5
    main.main()
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/main.go:59 +0x375
    
    goroutine 29 [chan receive]:
    github.com/emitter-io/emitter/network/listener.muxListener.Accept(...)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:235
    github.com/emitter-io/emitter/network/listener.(*muxListener).Accept(0xc4200f5fa0, 0xc4201882b0, 0xc00540, 0xc4200b86e0, 0x0)
    	<autogenerated>:1 +0x5b
    github.com/emitter-io/emitter/vendor/github.com/kelindar/tcp.(*Server).Serve(0xc4200f59c0, 0xbfe120, 0xc4200f5fa0, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/kelindar/tcp/server.go:44 +0x72
    github.com/emitter-io/emitter/vendor/github.com/kelindar/tcp.(*Server).Serve-fm(0xbfe120, 0xc4200f5fa0, 0xbfe120, 0xc4200f5fa0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:206 +0x3e
    created by github.com/emitter-io/emitter/network/listener.(*Listener).ServeAsync
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:118 +0x97
    
    goroutine 28 [chan receive]:
    github.com/emitter-io/emitter/network/listener.muxListener.Accept(...)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:235
    github.com/emitter-io/emitter/network/listener.(*muxListener).Accept(0xc4200f5f80, 0xbb72a8, 0xc42008a0a0, 0xbfe5e0, 0xc4201b44b0)
    	<autogenerated>:1 +0x5b
    net/http.(*Server).Serve(0xc420185040, 0xbfe120, 0xc4200f5f80, 0x0, 0x0)
    	/usr/local/go/src/net/http/server.go:2770 +0x1a5
    net/http.(*Server).Serve-fm(0xbfe120, 0xc4200f5f80, 0xbfe120, 0xc4200f5f80)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:205 +0x3e
    created by github.com/emitter-io/emitter/network/listener.(*Listener).ServeAsync
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:118 +0x97
    
    goroutine 15 [sleep]:
    time.Sleep(0x3b9aca00)
    	/usr/local/go/src/runtime/time.go:102 +0x166
    github.com/emitter-io/emitter/vendor/github.com/valyala/fasthttp.init.1.func1()
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/valyala/fasthttp/header.go:1384 +0x2a
    created by github.com/emitter-io/emitter/vendor/github.com/valyala/fasthttp.init.1
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/valyala/fasthttp/header.go:1382 +0x3a
    
    goroutine 14 [sleep]:
    time.Sleep(0x3b9aca00)
    	/usr/local/go/src/runtime/time.go:102 +0x166
    github.com/emitter-io/emitter/vendor/github.com/valyala/fasthttp.init.0.func1()
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/valyala/fasthttp/coarseTime.go:21 +0x51
    created by github.com/emitter-io/emitter/vendor/github.com/valyala/fasthttp.init.0
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/valyala/fasthttp/coarseTime.go:19 +0xb4
    
    goroutine 18 [syscall, 17 minutes]:
    os/signal.signal_recv(0x0)
    	/usr/local/go/src/runtime/sigqueue.go:139 +0xa6
    os/signal.loop()
    	/usr/local/go/src/os/signal/signal_unix.go:22 +0x22
    created by os/signal.init.0
    	/usr/local/go/src/os/signal/signal_unix.go:28 +0x41
    
    goroutine 19 [select]:
    github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh.(*localPeer).actorLoop(0xc420183530, 0xc420068f00)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh/local_peer.go:141 +0xeb
    created by github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh.newLocalPeer
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh/local_peer.go:31 +0x112
    
    goroutine 20 [select]:
    github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh.(*routes).run(0xc420020e00, 0xc420069020, 0xc420072b40, 0xc420072ba0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh/routes.go:177 +0x115
    created by github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh.newRoutes
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh/routes.go:44 +0x312
    
    goroutine 21 [select, 17 minutes]:
    github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh.(*connectionMaker).queryLoop(0xc42014f260, 0xc420069080)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh/connection_maker.go:226 +0x118
    created by github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh.newConnectionMaker
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh/connection_maker.go:75 +0x168
    
    goroutine 22 [select, 17 minutes]:
    github.com/emitter-io/emitter/vendor/github.com/karlseguin/ccache.(*Cache).worker(0xc420082f00)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/karlseguin/ccache/cache.go:159 +0x12b
    created by github.com/emitter-io/emitter/vendor/github.com/karlseguin/ccache.(*Cache).restart
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/karlseguin/ccache/cache.go:128 +0xec
    
    goroutine 23 [select, 17 minutes, locked to thread]:
    runtime.gopark(0xbb78e8, 0x0, 0xb960ad, 0x6, 0x18, 0x1)
    	/usr/local/go/src/runtime/proc.go:291 +0x11a
    runtime.selectgo(0xc42003cf50, 0xc420072de0)
    	/usr/local/go/src/runtime/select.go:392 +0xe50
    runtime.ensureSigM.func1()
    	/usr/local/go/src/runtime/signal_unix.go:549 +0x1f4
    runtime.goexit()
    	/usr/local/go/src/runtime/asm_amd64.s:2361 +0x1
    
    goroutine 24 [chan receive, 17 minutes]:
    github.com/emitter-io/emitter/broker.(*Service).hookSignals.func1(0xc420069260, 0xc42008b4a0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:458 +0x69
    created by github.com/emitter-io/emitter/broker.(*Service).hookSignals
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:457 +0xcb
    
    goroutine 25 [select]:
    github.com/emitter-io/emitter/broker.(*Service).notifyPresenceChange.func1(0xc42008b4a0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:220 +0x136
    created by github.com/emitter-io/emitter/broker.(*Service).notifyPresenceChange
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:217 +0x3f
    
    goroutine 26 [select]:
    github.com/emitter-io/emitter/utils.Repeat.func1(0xc420072a80, 0xc420082fa0, 0xc420188490)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/utils/timer.go:13 +0xd9
    created by github.com/emitter-io/emitter/utils.Repeat
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/utils/timer.go:11 +0x70
    
    goroutine 27 [IO wait, 17 minutes]:
    internal/poll.runtime_pollWait(0x7ff670d60e30, 0x72, 0x0)
    	/usr/local/go/src/runtime/netpoll.go:173 +0x57
    internal/poll.(*pollDesc).wait(0xc420020f98, 0x72, 0xc420064000, 0x0, 0x0)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0x9b
    internal/poll.(*pollDesc).waitRead(0xc420020f98, 0xffffffffffffff00, 0x0, 0x0)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
    internal/poll.(*FD).Accept(0xc420020f80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
    	/usr/local/go/src/internal/poll/fd_unix.go:372 +0x1a8
    net.(*netFD).accept(0xc420020f80, 0x0, 0x0, 0x0)
    	/usr/local/go/src/net/fd_unix.go:238 +0x42
    net.(*TCPListener).accept(0xc42000e5b8, 0x63ba19, 0xc4200827b0, 0xc4201aef90)
    	/usr/local/go/src/net/tcpsock_posix.go:136 +0x2e
    net.(*TCPListener).AcceptTCP(0xc42000e5b8, 0xbb7120, 0xc42000e5b8, 0x0)
    	/usr/local/go/src/net/tcpsock.go:246 +0x49
    github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh.(*Router).listenTCP.func1(0xc42000e5b8, 0xc420185110)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh/router.go:113 +0x59
    created by github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh.(*Router).listenTCP
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/weaveworks/mesh/router.go:110 +0x154
    
    goroutine 30 [IO wait]:
    internal/poll.runtime_pollWait(0x7ff670d60f00, 0x72, 0x0)
    	/usr/local/go/src/runtime/netpoll.go:173 +0x57
    internal/poll.(*pollDesc).wait(0xc420021018, 0x72, 0xc420064200, 0x0, 0x0)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0x9b
    internal/poll.(*pollDesc).waitRead(0xc420021018, 0xffffffffffffff00, 0x0, 0x0)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
    internal/poll.(*FD).Accept(0xc420021000, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
    	/usr/local/go/src/internal/poll/fd_unix.go:372 +0x1a8
    net.(*netFD).accept(0xc420021000, 0xc42057fab0, 0x0, 0x0)
    	/usr/local/go/src/net/fd_unix.go:238 +0x42
    net.(*TCPListener).accept(0xc42000e5d8, 0x6699f0, 0xc420439f60, 0xc420439f68)
    	/usr/local/go/src/net/tcpsock_posix.go:136 +0x2e
    net.(*TCPListener).Accept(0xc42000e5d8, 0xbb6c50, 0xc420083040, 0xc008a0, 0xc42057fab0)
    	/usr/local/go/src/net/tcpsock.go:259 +0x49
    github.com/emitter-io/emitter/network/listener.(*Listener).Serve(0xc420083040, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:155 +0x88
    created by github.com/emitter-io/emitter/broker.(*Service).listen
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:207 +0x1df
    
    goroutine 31 [runnable]:
    github.com/emitter-io/emitter/utils.Repeat.func1(0xc420072a80, 0xc420083180, 0xc4201885f0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/utils/timer.go:13 +0xd9
    created by github.com/emitter-io/emitter/utils.Repeat
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/utils/timer.go:11 +0x70
    
    goroutine 45 [IO wait]:
    internal/poll.runtime_pollWait(0x7ff670d60bc0, 0x72, 0xc42005f800)
    	/usr/local/go/src/runtime/netpoll.go:173 +0x57
    internal/poll.(*pollDesc).wait(0xc420021798, 0x72, 0xffffffffffffff00, 0xbf7d20, 0x1054810)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0x9b
    internal/poll.(*pollDesc).waitRead(0xc420021798, 0xc420211000, 0x1000, 0x1000)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
    internal/poll.(*FD).Read(0xc420021780, 0xc420211000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    	/usr/local/go/src/internal/poll/fd_unix.go:157 +0x17d
    net.(*netFD).Read(0xc420021780, 0xc420211000, 0x1000, 0x1000, 0x8, 0x0, 0x0)
    	/usr/local/go/src/net/fd_unix.go:202 +0x4f
    net.(*conn).Read(0xc42000e870, 0xc420211000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    	/usr/local/go/src/net/net.go:176 +0x6a
    github.com/emitter-io/emitter/network/listener.(*sniffer).Read(0xc4200b9cf0, 0xc420211000, 0x1000, 0x1000, 0x8, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:294 +0x143
    github.com/emitter-io/emitter/network/listener.(*Conn).Read(0xc4200b9ce0, 0xc420211000, 0x1000, 0x1000, 0x8, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:260 +0x4f
    bufio.(*Reader).fill(0xc42022e000)
    	/usr/local/go/src/bufio/bufio.go:100 +0x11e
    bufio.(*Reader).Peek(0xc42022e000, 0x2, 0x6461bc, 0xc420232000, 0x300000002, 0xc420232000, 0xc42005fa98)
    	/usr/local/go/src/bufio/bufio.go:132 +0x3a
    github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket.(*Conn).read(0xc4200c4a00, 0x2, 0x3, 0xc42002c070, 0xc42002c000, 0xc4201e41d8, 0xc4201e41e0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket/conn_read.go:12 +0x40
    github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket.(*Conn).advanceFrame(0xc4200c4a00, 0x0, 0x0, 0xc420066838)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket/conn.go:779 +0x5c
    github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket.(*Conn).NextReader(0xc4200c4a00, 0xc42001e070, 0xc42001e070, 0x10001, 0x0, 0xbf6660)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket/conn.go:939 +0xa3
    github.com/emitter-io/emitter/network/websocket.(*websocketTransport).Read(0xc4201b5620, 0xc420234000, 0x10000, 0x10000, 0x0, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/websocket/websocket.go:105 +0x14e
    bufio.(*Reader).Read(0xc42022e120, 0xc420022192, 0x1, 0x1, 0x0, 0x0, 0x0)
    	/usr/local/go/src/bufio/bufio.go:216 +0x238
    io.ReadAtLeast(0xbf63a0, 0xc42022e120, 0xc420022192, 0x1, 0x1, 0x1, 0xac26c0, 0x1, 0xc420022192)
    	/usr/local/go/src/io/io.go:309 +0x86
    io.ReadFull(0xbf63a0, 0xc42022e120, 0xc420022192, 0x1, 0x1, 0x1, 0x0, 0xc42005fe18)
    	/usr/local/go/src/io/io.go:327 +0x58
    github.com/emitter-io/emitter/network/mqtt.decodeStaticHeader(0xbf63a0, 0xc42022e120, 0xbeacec77821b3f96, 0x11155610883, 0x1084c20, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/mqtt/mqtt.go:525 +0x8e
    github.com/emitter-io/emitter/network/mqtt.DecodePacket(0xbf63a0, 0xc42022e120, 0x11155610883, 0x1084c20, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/mqtt/mqtt.go:171 +0x42
    github.com/emitter-io/emitter/broker.(*Conn).Process(0xc42022e060, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/conn.go:100 +0x158
    created by github.com/emitter-io/emitter/broker.(*Service).onAcceptConn
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:267 +0x61
    
    goroutine 1185 [IO wait]:
    internal/poll.runtime_pollWait(0x7ff670d60a20, 0x72, 0xc420061800)
    	/usr/local/go/src/runtime/netpoll.go:173 +0x57
    internal/poll.(*pollDesc).wait(0xc420020198, 0x72, 0xffffffffffffff00, 0xbf7d20, 0x1054810)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0x9b
    internal/poll.(*pollDesc).waitRead(0xc420020198, 0xc420230000, 0x1000, 0x1000)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
    internal/poll.(*FD).Read(0xc420020180, 0xc420230000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    	/usr/local/go/src/internal/poll/fd_unix.go:157 +0x17d
    net.(*netFD).Read(0xc420020180, 0xc420230000, 0x1000, 0x1000, 0xc42056000c, 0x78, 0xc420061978)
    	/usr/local/go/src/net/fd_unix.go:202 +0x4f
    net.(*conn).Read(0xc4203381e8, 0xc420230000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    	/usr/local/go/src/net/net.go:176 +0x6a
    github.com/emitter-io/emitter/network/listener.(*sniffer).Read(0xc4200b8590, 0xc420230000, 0x1000, 0x1000, 0x1, 0xc4200619d0, 0xa479fb)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:294 +0x143
    github.com/emitter-io/emitter/network/listener.(*Conn).Read(0xc4200b8580, 0xc420230000, 0x1000, 0x1000, 0x78, 0xff4, 0x78)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:260 +0x4f
    bufio.(*Reader).fill(0xc4200683c0)
    	/usr/local/go/src/bufio/bufio.go:100 +0x11e
    bufio.(*Reader).Peek(0xc4200683c0, 0x2, 0x2, 0xbeacec78af5846a8, 0x1127109379e, 0x1084c20, 0xc42056000c)
    	/usr/local/go/src/bufio/bufio.go:132 +0x3a
    github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket.(*Conn).read(0xc4200c4000, 0x2, 0xc4200c4000, 0x6833c2, 0x0, 0xbb7a20, 0xc420061b40)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket/conn_read.go:12 +0x40
    github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket.(*Conn).advanceFrame(0xc4200c4000, 0x0, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket/conn.go:779 +0x5c
    github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket.(*Conn).NextReader(0xc4200c4000, 0xc42001e070, 0xc42001e070, 0x10001, 0x0, 0xbf6660)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/vendor/github.com/gorilla/websocket/conn.go:939 +0xa3
    github.com/emitter-io/emitter/network/websocket.(*websocketTransport).Read(0xc420336d50, 0xc42062e000, 0x10000, 0x10000, 0x0, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/websocket/websocket.go:105 +0x14e
    bufio.(*Reader).Read(0xc4200684e0, 0xc42002259e, 0x1, 0x1, 0x0, 0x0, 0x0)
    	/usr/local/go/src/bufio/bufio.go:216 +0x238
    io.ReadAtLeast(0xbf63a0, 0xc4200684e0, 0xc42002259e, 0x1, 0x1, 0x1, 0xac26c0, 0x1, 0xc42002259e)
    	/usr/local/go/src/io/io.go:309 +0x86
    io.ReadFull(0xbf63a0, 0xc4200684e0, 0xc42002259e, 0x1, 0x1, 0x1, 0x9a8d06, 0xc420061e18)
    	/usr/local/go/src/io/io.go:327 +0x58
    github.com/emitter-io/emitter/network/mqtt.decodeStaticHeader(0xbf63a0, 0xc4200684e0, 0xbeacec78af5956b9, 0x112710a47a8, 0x1084c20, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/mqtt/mqtt.go:525 +0x8e
    github.com/emitter-io/emitter/network/mqtt.DecodePacket(0xbf63a0, 0xc4200684e0, 0x112710a47a8, 0x1084c20, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/mqtt/mqtt.go:171 +0x42
    github.com/emitter-io/emitter/broker.(*Conn).Process(0xc420068420, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/conn.go:100 +0x158
    created by github.com/emitter-io/emitter/broker.(*Service).onAcceptConn
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:267 +0x61
    
    goroutine 494 [IO wait]:
    internal/poll.runtime_pollWait(0x7ff670d60950, 0x72, 0xc420438ac0)
    	/usr/local/go/src/runtime/netpoll.go:173 +0x57
    internal/poll.(*pollDesc).wait(0xc420020318, 0x72, 0xffffffffffffff00, 0xbf7d20, 0x1054810)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0x9b
    internal/poll.(*pollDesc).waitRead(0xc420020318, 0xc4201c6000, 0x10000, 0x10000)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
    internal/poll.(*FD).Read(0xc420020300, 0xc4201c6000, 0x10000, 0x10000, 0x0, 0x0, 0x0)
    	/usr/local/go/src/internal/poll/fd_unix.go:157 +0x17d
    net.(*netFD).Read(0xc420020300, 0xc4201c6000, 0x10000, 0x10000, 0x2, 0x0, 0x0)
    	/usr/local/go/src/net/fd_unix.go:202 +0x4f
    net.(*conn).Read(0xc42000eec0, 0xc4201c6000, 0x10000, 0x10000, 0x0, 0x0, 0x0)
    	/usr/local/go/src/net/net.go:176 +0x6a
    github.com/emitter-io/emitter/network/listener.(*sniffer).Read(0xc42013a220, 0xc4201c6000, 0x10000, 0x10000, 0xc4201e4358, 0xc4201e4360, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:294 +0x143
    github.com/emitter-io/emitter/network/listener.(*Conn).Read(0xc42013a210, 0xc4201c6000, 0x10000, 0x10000, 0xb, 0xc420438ce0, 0x6a2912)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:260 +0x4f
    bufio.(*Reader).Read(0xc4201c0420, 0xc4200220ae, 0x1, 0x1, 0x1, 0xc420438d60, 0x0)
    	/usr/local/go/src/bufio/bufio.go:216 +0x238
    io.ReadAtLeast(0xbf63a0, 0xc4201c0420, 0xc4200220ae, 0x1, 0x1, 0x1, 0xac26c0, 0x1, 0xc4200220ae)
    	/usr/local/go/src/io/io.go:309 +0x86
    io.ReadFull(0xbf63a0, 0xc4201c0420, 0xc4200220ae, 0x1, 0x1, 0x1, 0xc420020300, 0xbeacec751b4c5c0c)
    	/usr/local/go/src/io/io.go:327 +0x58
    github.com/emitter-io/emitter/network/mqtt.decodeStaticHeader(0xbf63a0, 0xc4201c0420, 0x1084c20, 0x0, 0x0, 0x65ca38)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/mqtt/mqtt.go:525 +0x8e
    github.com/emitter-io/emitter/network/mqtt.DecodePacket(0xbf63a0, 0xc4201c0420, 0x10f1a8640fb, 0x1084c20, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/mqtt/mqtt.go:171 +0x42
    github.com/emitter-io/emitter/broker.(*Conn).Process(0xc4201c0360, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/conn.go:100 +0x158
    created by github.com/emitter-io/emitter/broker.(*Service).onAcceptConn
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:267 +0x61
    
    goroutine 497 [IO wait]:
    internal/poll.runtime_pollWait(0x7ff670d60880, 0x72, 0xc42043bac0)
    	/usr/local/go/src/runtime/netpoll.go:173 +0x57
    internal/poll.(*pollDesc).wait(0xc420020398, 0x72, 0xffffffffffffff00, 0xbf7d20, 0x1054810)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0x9b
    internal/poll.(*pollDesc).waitRead(0xc420020398, 0xc420444000, 0x10000, 0x10000)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
    internal/poll.(*FD).Read(0xc420020380, 0xc420444000, 0x10000, 0x10000, 0x0, 0x0, 0x0)
    	/usr/local/go/src/internal/poll/fd_unix.go:157 +0x17d
    net.(*netFD).Read(0xc420020380, 0xc420444000, 0x10000, 0x10000, 0x2, 0x0, 0x0)
    	/usr/local/go/src/net/fd_unix.go:202 +0x4f
    net.(*conn).Read(0xc42000eec8, 0xc420444000, 0x10000, 0x10000, 0x0, 0x0, 0x0)
    	/usr/local/go/src/net/net.go:176 +0x6a
    github.com/emitter-io/emitter/network/listener.(*sniffer).Read(0xc42013a2d0, 0xc420444000, 0x10000, 0x10000, 0xc4201e4458, 0xc4201e4460, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:294 +0x143
    github.com/emitter-io/emitter/network/listener.(*Conn).Read(0xc42013a2c0, 0xc420444000, 0x10000, 0x10000, 0xb, 0xc42043bce0, 0x6a2912)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:260 +0x4f
    bufio.(*Reader).Read(0xc4201c0540, 0xc4200220aa, 0x1, 0x1, 0x1, 0xc42043bd60, 0x0)
    	/usr/local/go/src/bufio/bufio.go:216 +0x238
    io.ReadAtLeast(0xbf63a0, 0xc4201c0540, 0xc4200220aa, 0x1, 0x1, 0x1, 0xac26c0, 0x1, 0xc4200220aa)
    	/usr/local/go/src/io/io.go:309 +0x86
    io.ReadFull(0xbf63a0, 0xc4201c0540, 0xc4200220aa, 0x1, 0x1, 0x1, 0xc420020380, 0xbeacec751b4c07bf)
    	/usr/local/go/src/io/io.go:327 +0x58
    github.com/emitter-io/emitter/network/mqtt.decodeStaticHeader(0xbf63a0, 0xc4201c0540, 0x1084c20, 0x0, 0x0, 0x65ca38)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/mqtt/mqtt.go:525 +0x8e
    github.com/emitter-io/emitter/network/mqtt.DecodePacket(0xbf63a0, 0xc4201c0540, 0x10f1a85ecae, 0x1084c20, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/mqtt/mqtt.go:171 +0x42
    github.com/emitter-io/emitter/broker.(*Conn).Process(0xc4201c0480, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/conn.go:100 +0x158
    created by github.com/emitter-io/emitter/broker.(*Service).onAcceptConn
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:267 +0x61
    
    goroutine 1197 [IO wait]:
    internal/poll.runtime_pollWait(0x7ff670d60d60, 0x72, 0xc420434ac0)
    	/usr/local/go/src/runtime/netpoll.go:173 +0x57
    internal/poll.(*pollDesc).wait(0xc420020218, 0x72, 0xffffffffffffff00, 0xbf7d20, 0x1054810)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0x9b
    internal/poll.(*pollDesc).waitRead(0xc420020218, 0xc42063e000, 0x10000, 0x10000)
    	/usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
    internal/poll.(*FD).Read(0xc420020200, 0xc42063e000, 0x10000, 0x10000, 0x0, 0x0, 0x0)
    	/usr/local/go/src/internal/poll/fd_unix.go:157 +0x17d
    net.(*netFD).Read(0xc420020200, 0xc42063e000, 0x10000, 0x10000, 0xc420020200, 0xc42063e000, 0x10000)
    	/usr/local/go/src/net/fd_unix.go:202 +0x4f
    net.(*conn).Read(0xc42057fab0, 0xc42063e000, 0x10000, 0x10000, 0x0, 0x0, 0x0)
    	/usr/local/go/src/net/net.go:176 +0x6a
    github.com/emitter-io/emitter/network/listener.(*sniffer).Read(0xc4200b86f0, 0xc42063e000, 0x10000, 0x10000, 0xc420064460, 0x0, 0x6a2b46)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:294 +0x143
    github.com/emitter-io/emitter/network/listener.(*Conn).Read(0xc4200b86e0, 0xc42063e000, 0x10000, 0x10000, 0xe, 0x6a2912, 0xc420020200)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/listener/listener.go:260 +0x4f
    bufio.(*Reader).Read(0xc4200686c0, 0xc420022561, 0x1, 0x1, 0x1, 0xc420434d60, 0x0)
    	/usr/local/go/src/bufio/bufio.go:216 +0x238
    io.ReadAtLeast(0xbf63a0, 0xc4200686c0, 0xc420022561, 0x1, 0x1, 0x1, 0xac26c0, 0x1, 0xc420022561)
    	/usr/local/go/src/io/io.go:309 +0x86
    io.ReadFull(0xbf63a0, 0xc4200686c0, 0xc420022561, 0x1, 0x1, 0x1, 0xc420020200, 0xbeacec7b4d293ce7)
    	/usr/local/go/src/io/io.go:327 +0x58
    github.com/emitter-io/emitter/network/mqtt.decodeStaticHeader(0xbf63a0, 0xc4200686c0, 0x1084c20, 0x0, 0x0, 0x65ca38)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/mqtt/mqtt.go:525 +0x8e
    github.com/emitter-io/emitter/network/mqtt.DecodePacket(0xbf63a0, 0xc4200686c0, 0x114de80dbd8, 0x1084c20, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/network/mqtt/mqtt.go:171 +0x42
    github.com/emitter-io/emitter/broker.(*Conn).Process(0xc420068480, 0x0, 0x0)
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/conn.go:100 +0x158
    created by github.com/emitter-io/emitter/broker.(*Service).onAcceptConn
    	/mnt/d/golang/workspace/src/github.com/emitter-io/emitter/broker/service.go:267 +0x61
    
    opened by vjeson 10
  • Android Platform Support

    Android Platform Support

    This is great project ❤️, thanks for open sourcing it. I was trying to build something if this kind for my mobile application project. Is there any platform support for Android OS in the near future. As of now most the android developer have been using firebase for realtime communication, which is paid service. I see a great potential in this project and could be next big thing. Enabling android platform support would let may more developer build great app. Android platform support would be great feature of this project .I am very much interested in this project and want to develop Android Platform support tools for this project. Let me know is this could be great thing?

    help wanted wontfix 
    opened by dharan1011 10
  • Go! Broker timeout disconnection.

    Go! Broker timeout disconnection.

    Hi.

    As per my Blog entry (ID pauldy1000, 21 November 2017) and response from Roman I am enclosing the situation and some source code. I am using the Go! version of the Broker.

    My use case is to have a Subscriber connected to the Broker for the length of time I choose.

    For example, I want a Subscriber in the UK to be ready to receive messages from a Publisher in the USA at any time the USA Publisher chooses to publish messages. If the UK Subscriber connection times out after 1 minute of inactivity, how can I detect that the Publisher in the USA has published a message destined for the UK Subscriber if the USA Publisher does not come on-line for 3 hours?

    Emitter.Connection emitterMagnetLocal = new Emitter.Connection(magnetLocalIPAddress, magnetLocalPort, "myserverkeygoeshere");
    emitterMagnetLocal.On(cChannelKeyCommander, channel, (channel, msg) =>
    {
       // Message received, go handle it.
       HandleInboundMessage(channel.TrimEnd(new char[] { '/' }), msg);
    });
    // Wire up the Disconnect event.
    emitterMagnetLocal.Disconnected += EmitterMagnetLocal_Disconnected;
    // Connect to Magnet.Local emitter.
    ConnectToMagnetLocal();  // This is a class member method which connects to the Broker.
    
    When the Broker forcibly disconnects the Subscriber (as above) the Disconnected event handles it:
    
    private void EmitterMagnetLocal_Disconnected(object sender, EventArgs e)
    {
       Logging.LogIt(cCodeName + ": Magnet.Local: Disconnected(): Waiting for five second before trying to reconnect.");
    
       // Wait for five seconds.
       Thread.Sleep(5000);
    
       Logging.LogIt(cCodeName + ": Magnet.Local: Disconnected(): Trying to reconnect again now.");
    
       // Try to connect to the magnet local service again.
       ConnectToMagnetLocal();     // This is a class member method which connects to the Broker.
    }
    

    I have had to architect the "wait 5 seconds then reconnect" due to the forced disconnection by the Broker. I would much prefer to be able to declare how long I would like to keep the Subscriber connected - even if this is "never disconnect".

    I hope this is useful.

    Regards,

    Paul.

    opened by pauldy1000 10
  • 418000 connection cause memory leak in 8G container

    418000 connection cause memory leak in 8G container

    Hi, I run emitter in a 8G container, when the connection grows to 418000, emitter will restart because of memory leak. My question is why one connection cost so large memory? Is that true I can only add 418000 connection in one broker(8G memory)?

    wontfix 
    opened by Estheriii 9
  • How to learn the version of a binary

    How to learn the version of a binary

    Is there a way to learn the version of an emitter binary?

    Where in the source code is the version number defined ?

    Is it possible to learn the version of the server from the client side?

    opened by postacik 9
  • Installation Failure

    Installation Failure

    The "get" command is deprecated as of 1.17 last August.

    https://go.dev/doc/go-get-install-deprecation#what-to-use-instead

    The alternative "install" command using [email protected] fails here:

    # [github.com/emitter-io/emitter/config](http://github.com/emitter-io/emitter/config)
    go/pkg/mod/github.com/emitter-io/[email protected]+incompatible/config/config.go:66:18: undefined: config.VaultConfig
    go/pkg/mod/github.com/emitter-io/[email protected]+incompatible/config/config.go:75:31: undefined: config.VaultConfig
    go/pkg/mod/github.com/emitter-io/[email protected]+incompatible/config/config.go:84:21: undefined: config.NewVaultCache
    
    

    Downloading the repo and running go build fails with security warning:

    verifying [github.com/google/[email protected]](http://github.com/google/[email protected]): checksum mismatch
    	downloaded: h1:N8EguYFm2wwdpoNcpchQY0tPs85vOJkboFb2dPxmixo=
    	go.sum:   h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w=
    
    opened by cogitaria-admin 0
  • BUG reports

    BUG reports

    Hello,

    Dear developers of Emitter, as we study Emitter, we have discovered some new security flaws that would lead to unauthorized access to the topics/channels that are managed by Emitter. The details of these flaws have been sent to you last month but no response. Consequently, we want to check that the email “[email protected]” is correct and that these security flaws are confirmed? Thanks! If you need any further information, please email me back at [email protected]

    opened by MQTTactic 1
  • Use over ssl?

    Use over ssl?

    firstly i want to thank you for this great package

    i have problem when use this package with https scheme, the connection always failed. is there any tutorial to add certificate with docker?

    opened by jazz7381 0
  • Bump github.com/valyala/fasthttp from 1.12.0 to 1.34.0

    Bump github.com/valyala/fasthttp from 1.12.0 to 1.34.0

    Bumps github.com/valyala/fasthttp from 1.12.0 to 1.34.0.

    Release notes

    Sourced from github.com/valyala/fasthttp's releases.

    v1.34.0

    • 59f94a3 Update github.com/klauspost/compress (#1237) (Mikhail Faraponov)
    • 62c15a5 Don't reset RequestCtx.s (#1234) (Erik Dubbelboer)
    • 7670c6e Fix windows tests (#1235) (Erik Dubbelboer)
    • f54ffa1 feature: Keep the memory usage of the service at a stable level (#1216) (Rennbon)
    • 15262ec Warn about unsafe ServeFile usage (#1228) (Erik Dubbelboer)
    • 1116d03 Fix panic while reading invalid trailers (Erik Dubbelboer)
    • 856ca8e Update dependencies (#1230) (Mikhail Faraponov)
    • 6b5bc7b Add windows support to normalizePath (Erik Dubbelboer)
    • f0b0cfe Don't log ErrBadTrailer by default (Erik Dubbelboer)
    • 6937fee fix: (useless check), skip Response body if http method HEAD (#1224) (Pavel Burak)
    • b85d2a2 Fix http proxy behavior (#1221) (Aoang)
    • ad8a07a RequestHeader support set no default ContentType (#1218) (Jack.Ju)
    • c94581c support configure HostClient (#1214) (lin longhjui)
    • 632e222 Client examples (#1208) (Sergey Ponomarev)
    • 6a3cc23 uri_test.go use example.com for clearness (#1212) (Sergey Ponomarev)
    • 9d665e0 Update dependencies (#1204) (Mikhail Faraponov)
    • 8d7953e Fix scheme check for not yet parsed requests (#1203) (ArminBTVS)

    v1.33.0

    • 61aa8b1 remove redundant code (#1202) (tyltr)
    • 4369776 fix(hijack): reuse RequestCtx (#1201) (Sergio VS)
    • 2aca3e8 fix(hijack): reset userValues after hijack handler execution (#1199) (Sergio VS)
    • 9123060 Updated dependencies (#1194) (Mikhail Faraponov)

    v1.32.0

    • 7eeb00e Make tests less flaky (#1189) (Erik Dubbelboer)
    • d19b872 Update tcpdialer.go (#1188) (Mikhail Faraponov)
    • c727b99 Release UseHostHeader in ReleaseRequest() (#1185) (Tolyar)
    • 6c0518b Fix UseHostHeader for DoTimeout + tests (#1184) (Tolyar)
    • 6b55811 Add MaxIdleWorkerDuration to Server. (#1183) (Kilos Liu)
    • 4517204 Allow to set Host header for Client (#1169) (Tolyar)
    • 258a4c1 fix: reset response after reset user values on keep-alive connections (#1176) (Sergio VS)
    • e9db537 Use %w to wrap errors (#1175) (Erik Dubbelboer)
    • 7db0597 Fix bad request trailer panic (Erik Dubbelboer)
    • 4aadf9a Fix parseTrailer panic (Erik Dubbelboer)
    • da7ff7a Add trailer support (#1165) (ichx)
    • 017f0aa fix: reset request after reset user values on keep-alive connections (#1162) (Sergio VS)
    • 3b117f8 feat: close idle connections when server shutdown (#1155) (ichx)
    • a94a2c3 Remove redundant code (#1154) (ichx)
    • f7c354c Fix race condition in Client.mCleaner (Erik Dubbelboer)
    • c078a9d Add string and bytes buffer convert trick in README (#1151) (ichx)
    • 3ff6aaa uri: isHttps() and isHttp() (#1150) (Sergey Ponomarev)
    • 8febad0 http.go: Request.SetURI() (Fix #1141) (#1148) (Sergey Ponomarev)
    • 2ca01c7 fix: Status Line parsing and writing (#1135) (Shivansh Vij)
    • 931d0a4 Fix lint (Erik Dubbelboer)
    • d613502 use sync.map is better (#1145) (halst)
    • c15e642 Don't run all race tests on windows (#1143) (Erik Dubbelboer)
    • 6006c87 chore (#1137) (tyltr)
    • 6d4db9b Fix race condition in getTCPAddrs (Erik Dubbelboer)

    ... (truncated)

    Commits

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    • @dependabot use these labels will set the current labels as the default for future PRs for this repo and language
    • @dependabot use these reviewers will set the current reviewers as the default for future PRs for this repo and language
    • @dependabot use these assignees will set the current assignees as the default for future PRs for this repo and language
    • @dependabot use this milestone will set the current milestone as the default for future PRs for this repo and language

    You can disable automated security fix PRs for this repo from the Security Alerts page.

    wontfix dependencies 
    opened by dependabot[bot] 1
  • message TTL not expire with inmemory storage provider

    message TTL not expire with inmemory storage provider

    app.js

    var client = require('emitter-io').connect({ host: '127.0.0.1', username: 'emitter-io' });
    
    const key = '<key>';
    const channel = 'demo';
    
    if (process.argv.includes('--pub')) {
    
        client.publish({ key, channel, message: 'hello, emitter! ' + (new Date()).toISOString(), ttl: 10 });
        client.disconnect();
    
    }
    
    if (process.argv.includes('--sub')) {
    
        client.subscribe({ key, channel, last: 5 });
        client.on('message', msg => {
            console.log('>>> message: ', msg.channel, msg.asString());
        });
    
    }
    

    just run:

    docker run -d --name emitter -p 8080:8080 -e EMITTER_LICENSE=<licence> emitter/server
    node app.js --pub
    # after 10 sec
    node app.js --sub
    
    bug 
    opened by cravler 2
  • occasionally panic

    occasionally panic

    panic: interface conversion: crdt.Map is *crdt.Durable, not *crdt.Volatile
    
    goroutine 70 [running]:
    github.com/emitter-io/emitter/internal/event/crdt.(*Durable).Merge(0xc000094b20, 0x4f4a180, 0xc000094b20)
    	/Users/shushenghong/Documents/workspace/demo/emitter/internal/event/crdt/durable.go:145 +0x185
    github.com/emitter-io/emitter/internal/event.(*State).Merge(0xc000094150, 0x4f35ee0, 0xc000094150, 0x0, 0x0)
    	/Users/shushenghong/Documents/workspace/demo/emitter/internal/event/state.go:105 +0x196
    github.com/weaveworks/mesh.(*gossipSender).Send(0xc0001b4550, 0x4f35ee0, 0xc000094150)
    	/Users/shushenghong/Documents/tools/go/pkg/mod/github.com/weaveworks/[email protected]/gossip.go:186 +0x15d
    github.com/weaveworks/mesh.(*gossipChannel).relay(0xc0003a2300, 0x1, 0x4f35ee0, 0xc000094150)
    	/Users/shushenghong/Documents/tools/go/pkg/mod/github.com/weaveworks/[email protected]/gossip_channel.go:123 +0x1c5
    github.com/weaveworks/mesh.(*gossipChannel).Send(0xc0003a2300, 0x4f35ee0, 0xc000094150)
    	/Users/shushenghong/Documents/tools/go/pkg/mod/github.com/weaveworks/[email protected]/gossip_channel.go:94 +0x5f
    github.com/weaveworks/mesh.(*Router).sendAllGossip(0xc0003f82a0)
    	/Users/shushenghong/Documents/tools/go/pkg/mod/github.com/weaveworks/[email protected]/router.go:209 +0xff
    github.com/weaveworks/mesh.(*localPeer).actorLoop(0xc0001b4140, 0xc00008e240)
    	/Users/shushenghong/Documents/tools/go/pkg/mod/github.com/weaveworks/[email protected]/local_peer.go:160 +0x205
    created by github.com/weaveworks/mesh.newLocalPeer
    	/Users/shushenghong/Documents/tools/go/pkg/mod/github.com/weaveworks/[email protected]/local_peer.go:42 +0x245
    
    bug 
    opened by shushenghong 3
Releases(v3.0)
  • v3.0(Dec 26, 2021)

    This release contains various bug fixes and migrates emitter to use badger/v3 as the underlying message storage. This message storage is incompatible with v1, hence the bump of the major version (breaking change). It also rebases docker to run on Go 1.17 which increases overall throughput.

    What's Changed

    • Added last will & refactored service by @kelindar in https://github.com/emitter-io/emitter/pull/321
    • fix a typo error by @lzh2nix in https://github.com/emitter-io/emitter/pull/364
    • MQTT topic matching strategy by @cravler in https://github.com/emitter-io/emitter/pull/360
    • Upgrading emitter/address by @kelindar in https://github.com/emitter-io/emitter/pull/370
    • Fix conn.Track(): CompareAndSwapUint32 by @Florimond in https://github.com/emitter-io/emitter/pull/372
    • Replace in-memory storage with badger/v3 by @kelindar in https://github.com/emitter-io/emitter/pull/371
    • Bump go version to 1.17 by @Florimond in https://github.com/emitter-io/emitter/pull/384
    • Fix bug allowing one connection to subscribe twice by @Florimond in https://github.com/emitter-io/emitter/pull/383

    New Contributors

    • @cravler made their first contribution in https://github.com/emitter-io/emitter/pull/360

    Full Changelog: https://github.com/emitter-io/emitter/compare/v2.8...v3.0

    Source code(tar.gz)
    Source code(zip)
  • v2.8(Jun 4, 2020)

    This release introduces a revamp of how internal state is replicated with gossip along with other, smaller features. The brokers now use durable CRDT map in order to replicate and synchronize internal state (e.g. subscriptions, disallow lists) within the cluster. This enables more reliable cluster and allows us to support more complex features in future, such as cluster-wide last will & testament.

    Changelog

    • Added support to block or unblock keys. This can be done using emitter/keyban/ request. The block list is replicated across the cluster and persisted to disk on each broker. In order to specify the directory, use the newly added dir parameter to the cluster configuration section. (#317)
    • Fixed a bug with frame iteration (#282).
    • Fixed alignment of int64, to be able to access it with atomic.LoadInt64 (#294).
    • Fixed the panic when received a corrupt/bad MQTT packet (#305) (#308).
    • Fixed an issue where presence events could be received with an extension key (#295).
    • Limit the gossip broadcast message size to a random set of 100K subscriptions, in order to make sure that the maximum message size is under 10MB. Exceeding this limit would cause peers to be disconnected from the cluster. (#312)
    • Fixed the issue when presence notifications would not be sent when a node goes down (#304) (#313).
    • Refactored gossip state and cluster-wide survey (#314) (#315)
    • Changed the license to v3 with shuffled salsa cipher for better randomness when generating keys (#311).
    • Changed the local CRDT set durable and can potentially be persisted to disk going forward. The incoming sets are now called "volatile" and implemented using Golang map. (#316)
    • Changed the current CRDT set to a key/value dictionary, adds more tests around it and other changes to serialization. This is the necessary plumbing to allow for cluster-global state which is replicated with gossip (#310) (#320).
    Source code(tar.gz)
    Source code(zip)
  • v2.7(Jun 5, 2019)

    This release removes private links (normal links are still there) and introduces a way of creating private keys by simply calling keygen with an "extendable" key (ie. a key with e permission). On top of that, it has a few optimisations and emitter now defaults to using/generating license v2 which was introduced a few months ago.

    Changelog

    • Added chunking before sending through to the gossi unicast, given that the max frame is 10MB we can send at once (#254)
    • Added a debug mode flag in the config. Ppprof HTTP endpoints are now by default disabled and you'll need to flip the flag on to enable them (#255)
    • Fixed a security issue by disallowing pub/sub on extended keys (#262) (#261)
    • Added a way to extend keys through a normal keygen method. Users will need to provide a key which has extend permissions in the request along with the desired access permissions. Note that the permissions will be a reflection of the key itself (e.g: if a key has re permissions and keygen request contains wr, the resulting key will only have r permission) (#268)
    • Upgraded the dependencies and made v2 license default. (#278)
    • Removed private links as they have been replaced with a keygen. This is to make sure there's only one way of extending keys and keep the API relatively simple to understand. (#279)
    Source code(tar.gz)
    Source code(zip)
    emitter-darwin-386(21.65 MB)
    emitter-darwin-amd64(25.21 MB)
    emitter-linux-386(23.06 MB)
    emitter-linux-amd64(26.97 MB)
    emitter-linux-arm(22.81 MB)
    emitter-windows-386.exe(21.61 MB)
    emitter-windows-amd64.exe(24.99 MB)
  • v2.6(Jun 5, 2019)

    This release makes flush and read rate limiting configurable which allows brokers to be more protected against misbehaving clients.

    Changelog:

    • Made flush rate configurable for tuning socket writes (#250)
    • Removed 1 alloc in channel validation (#251)
    • Added a read rate limiter which allows limiting the read QpS per client (#252)
    Source code(tar.gz)
    Source code(zip)
    emitter-darwin-386(20.76 MB)
    emitter-darwin-amd64(23.86 MB)
    emitter-linux-386(22.13 MB)
    emitter-linux-amd64(25.56 MB)
    emitter-linux-arm(21.91 MB)
    emitter-windows-386.exe(20.90 MB)
    emitter-windows-amd64.exe(23.87 MB)
short-url distributed and high-performance

durl 是一个分布式的高性能短链服务,逻辑简单,并提供了相关api接口,开发人员可以快速接入,也可以作为go初学者练手项目.

宋昂 401 Jun 24, 2022
Distributed-Services - Distributed Systems with Golang to consequently build a fully-fletched distributed service

Distributed-Services This project is essentially a result of my attempt to under

Hamza Yusuff 6 Jun 1, 2022
A feature complete and high performance multi-group Raft library in Go.

Dragonboat - A Multi-Group Raft library in Go / 中文版 News 2021-01-20 Dragonboat v3.3 has been released, please check CHANGELOG for all changes. 2020-03

lni 4.3k Jun 23, 2022
High-Performance server for NATS, the cloud native messaging system.

NATS is a simple, secure and performant communications system for digital systems, services and devices. NATS is part of the Cloud Native Computing Fo

NATS - The Cloud Native Messaging System 11.1k Jun 29, 2022
Collection of high performance, thread-safe, lock-free go data structures

Garr - Go libs in a Jar Collection of high performance, thread-safe, lock-free go data structures. adder - Data structure to perform highly-performant

LINE 311 Jun 26, 2022
Distributed lock manager. Warning: very hard to use it properly. Not because it's broken, but because distributed systems are hard. If in doubt, do not use this.

What Dlock is a distributed lock manager [1]. It is designed after flock utility but for multiple machines. When client disconnects, all his locks are

Sergey Shepelev 25 Dec 24, 2019
Distributed reliable key-value store for the most critical data of a distributed system

etcd Note: The main branch may be in an unstable or even broken state during development. For stable versions, see releases. etcd is a distributed rel

etcd-io 40.4k Jun 30, 2022
Lockgate is a cross-platform locking library for Go with distributed locks using Kubernetes or lockgate HTTP lock server as well as the OS file locks support.

Lockgate Lockgate is a locking library for Go. Classical interface: 2 types of locks: shared and exclusive; 2 modes of locking: blocking and non-block

werf 230 Jun 16, 2022
A realtime distributed messaging platform

Source: https://github.com/nsqio/nsq Issues: https://github.com/nsqio/nsq/issues Mailing List: [email protected] IRC: #nsq on freenode Docs:

NSQ 21.5k Jun 29, 2022
Simple, fast and scalable golang rpc library for high load

gorpc Simple, fast and scalable golang RPC library for high load and microservices. Gorpc provides the following features useful for highly loaded pro

Aliaksandr Valialkin 651 Jun 20, 2022
Fast, efficient, and scalable distributed map/reduce system, DAG execution, in memory or on disk, written in pure Go, runs standalone or distributedly.

Gleam Gleam is a high performance and efficient distributed execution system, and also simple, generic, flexible and easy to customize. Gleam is built

Chris Lu 3.1k Jun 22, 2022
Go Open Source, Distributed, Simple and efficient Search Engine

Go Open Source, Distributed, Simple and efficient full text search engine.

ego 6.1k Jun 25, 2022
Dapr is a portable, event-driven, runtime for building distributed applications across cloud and edge.

Dapr is a portable, serverless, event-driven runtime that makes it easy for developers to build resilient, stateless and stateful microservices that run on the cloud and edge and embraces the diversity of languages and developer frameworks.

Dapr 18.4k Jun 28, 2022
Asynq: simple, reliable, and efficient distributed task queue in Go

Asynq: simple, reliable, and efficient distributed task queue in Go

Ken Hibino 3.4k Jun 25, 2022
Build share and run your distributed applications.

sealer[ˈsiːlər] provides the way for distributed application package and delivery based on kubernetes.

Alibaba 1.5k Jun 29, 2022
A distributed and coördination-free log management system

OK Log is archived I hoped to find the opportunity to continue developing OK Log after the spike of its creation. Unfortunately, despite effort, no su

OK Log 3k Jun 16, 2022
JuiceFS is a distributed POSIX file system built on top of Redis and S3.

JuiceFS is a high-performance POSIX file system released under GNU Affero General Public License v3.0. It is specially optimized for the cloud-native

Juicedata, Inc 5.5k Jun 23, 2022