The official Go package for NSQ

Overview

go-nsq

Build Status GoDoc GitHub release

The official Go package for NSQ.

Docs

See godoc and the main repo apps directory for examples of clients built using this package.

Tests

Tests are run via ./test.sh (which requires nsqd and nsqlookupd to be installed).

Comments
  • make lookup functionality pluggable

    make lookup functionality pluggable

    We'd like to be able to override how a consumer queries from NSQLookupds. I think this approach would also allow others to switch NSQLookupd for etcd, zookeeper, etc.

    This is still a WIP but I hope you can see where I'm going with this. Suggestions and Criticism welcome.

    cc @oliver-bitly @jehiah @mreiferson

    enhancement 
    opened by ploxiln 31
  • panic: runtime error: close of closed channel

    panic: runtime error: close of closed channel

    I'm guessing this is user error, but I'm getting a panic when stopping a consumer as of this commit: 4e74fa1f8933064a4f04786e65d3ee7b611be598. Reading the comment in there, I'm trying to understand what's going on.

    My shutdown code looks something like this:

    func (n *NSQPeer) Teardown() {
        n.producer.Stop()
        if n.consumer != nil {
            n.consumer.DisconnectFromNSQD(n.host)
            n.consumer.Stop()
            <-n.consumer.StopChan
        }
    }
    

    Here's the error:

    runtime.panic(0x509600, 0x84c1d5)
            /usr/local/Cellar/go/1.3/libexec/src/pkg/runtime/panic.c:279 +0xf5
    github.com/bitly/go-nsq.(*Consumer).exit(0xc20800ef00)
            /Users/tylertreat/Go/src/github.com/bitly/go-nsq/consumer.go:1082 +0x2e
    github.com/bitly/go-nsq.func·007()
            /Users/tylertreat/Go/src/github.com/bitly/go-nsq/consumer.go:990 +0x29
    created by time.goFunc
            /usr/local/Cellar/go/1.3/libexec/src/pkg/time/sleep.go:121 +0x47
    

    Can anyone provide some insight on why I'm seeing this now?

    bug 
    opened by tylertreat 22
  • consumer: sometimes got heartbeat but not message

    consumer: sometimes got heartbeat but not message

    1. env

    nsqd --version nsqd v1.0.0-compat (built w/go1.8)

    go-nsq version commit b9762cdcb6d5cc5ac5287ca076354143d332cc97 Tue Feb 14 16:13:23 2017 -0800

    # go version go version go1.8 linux/amd64

    2. what do i meet

    • Program runed, but channel not created at http://127.0.0.1:4151/stats
    • Sometimes this will appear, it always appear at program starts. And when appear, program will never( wait for 2days) got any message.
    • It always appear at program starts. When program successfully starts, seems won't have a chance to reproduct.
    • I have serveral topic to listen. when it appears in one topic, other topic may not affected.
    • When set go-nsq debug on, program will print [d.c2.cp#ephemeral/detect#ephemeral] (127.0.0.3:4150) heartbeat received. but seems won't got any other nsq message.
    # curl -XGET http://127.0.0.1:4151/stats; 
    [[email protected] ptd]# curl -XGET http://127.0.0.1:4151/stats; 
    nsqd v1.0.0-compat (built w/go1.8)
    start_time 2017-03-27T19:07:02+08:00
    uptime 45m7.260003591s
    
    Health: OK
    
       [.raw.raw#ephemeral] depth: 0     be-depth: 0     msgs: 6656     e2e%: 
          [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 6656     e2e%: 
            [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 6656     re-q: 0        msgs: 6656     connected: 8s
    
       [d.c2.cp#ephemeral] depth: 10000 be-depth: 0     msgs: 791227   e2e%: 
    
       [d.c2.cp.done#ephemeral] depth: 0     be-depth: 0     msgs: 0        e2e%: 
          [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 0        e2e%: 
            [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 0        re-q: 0        msgs: 0        connected: 8s
    
       [d.c2.domain#ephemeral] depth: 10000 be-depth: 0     msgs: 208218   e2e%: 
    
    ...
    

    code snip

    func nsqSubscribe(addr string, topic string, channel string, hdlr nsq.HandlerFunc) error {
        consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig())
        if err != nil {
            print("new consumer error: ", err, "\n")
            time.Sleep(1 * time.Second) //wait 1s
            panic(err)
        }   
        consumer.AddHandler(hdlr)
        err = consumer.ConnectToNSQD(addr)
        if err != nil {
            print("connect nsqd error: ", err, "\n")
            time.Sleep(1 * time.Second) //wait 1s
            panic(err)
        }   
        _ = <-consumer.StopChan
        panic("nsq conn dead topic=" + topic + " channel=" + channel)
        return nil 
    }
    
    func main(){
    producer, err := nsq.NewProducer(nsqConf.Local.Addr, nsq.NewConfig())
        if err != nil {
            panic(err)
        }
        go func() {
            detector := new(c2.C2Sdk)
    
            if detectorEnabled {
                err = detector.Init()
                if err != nil {
                    fmt.Println("Failed to init c2")
                    panic(err)
                }
            }
            nsqSubscribe(nsqConf.Local.Addr, "d.c2.cp#ephemeral", "detect#ephemeral",
                nsq.HandlerFunc(func(message *nsq.Message) error {
                    return handler_scan(message, detector,
                        producer, unmarshal_url, scan_url,
                        "d.c2.cp.done#ephemeral")
                }))
        }()
    go func() {
            detector := new(c2.C2Sdk)
    
            if detectorEnabled {
                err = detector.Init()
                if err != nil {
                    fmt.Println("Failed to init c2")
                    panic(err)
                }
            } 
            nsqSubscribe(nsqConf.Local.Addr, "d.c2.url#ephemeral", "detect#ephemeral",
                nsq.HandlerFunc(func(message *nsq.Message) error {
                    return handler_scan(message, detector,
                        producer, unmarshal_url, scan_url,
                        "d.c2.url.done#ephemeral")
                }))
        }()
    
    .....
    
    }
    
    

    3. how to reproduct

    • set program connect to a busy nsq server
    • start & restart it for serveral times. ( 5-10 times). when not repoduct, retry.
    bug 
    opened by slayercat 20
  • consumer: unexpected re-connection attempts

    consumer: unexpected re-connection attempts

    I'm currently investigating an issue where even after calling either DisconnectFromNSQLookupd or DisconnectFromNSQD on a host that is already disconnected, re-connection attempts still occur.

    In our scenario we are consuming from 3 hosts: 10.0.0.1/10.0.0.2/10.0.0.3 and the host 10.0.0.1 becomes unhealthy (service stopped) we remove it from the list of nodes and call DisconnectFromNSQLookupd or DisconnectFromNSQD.

    As you can see from the following log output, the connection to host 10.0.0.1 was lost and re-connection attempts are made. The host is then removed from our list and DisconnectFromNSQLookupd is called. However, the re-connection attempts still occur:

    2016-04-29 14:35:26 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:35:26 [DBG] IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false AuthRequired:false} (conn.go 328)
    2016-04-29 14:35:29 [ERR] IO error - EOF (conn.go 471)
    2016-04-29 14:35:29 [INF] beginning close (conn.go 611)
    2016-04-29 14:35:29 [INF] readLoop exiting (conn.go 528)
    2016-04-29 14:35:29 [INF] breaking out of writeLoop (conn.go 535)
    2016-04-29 14:35:29 [INF] writeLoop exiting (conn.go 581)
    2016-04-29 14:35:29 [INF] finished draining, cleanup exiting (conn.go 660)
    2016-04-29 14:35:29 [INF] clean close complete (conn.go 668)
    2016-04-29 14:35:29 [DBG] there are 2 connections left alive (consumer.go 741)
    2016-04-29 14:35:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:35:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:35:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:35:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:35:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:35:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:35:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:36:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:36:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:36:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:36:26 [DBG] querying nsqlookupd http://10.0.0.3:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
    2016-04-29 14:36:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:36:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:36:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:36:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:36:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:36:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:36:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:36:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:36:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:37:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:37:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:37:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:37:26 [DBG] querying nsqlookupd http://10.0.0.2:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
    2016-04-29 14:37:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:37:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:37:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:37:32 [INF] Connecting to NSQ via lookupd hosts: [10.0.0.3:4161 10.0.0.2:4161] (subscriber.go 166)
    2016-04-29 14:37:32 [INF] Connecting to NSQ via lookupd hosts: [10.0.0.3:4161 10.0.0.2:4161] (subscriber.go 166)
    2016-04-29 14:37:32 [INF] Disconnecting from NSQ hosts: [10.0.0.1:4150] (subscriber.go 176)
    2016-04-29 14:37:32 [INF] Disconnecting from NSQ lookupds: [10.0.0.1:4161] (subscriber.go 187)
    2016-04-29 14:37:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:37:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:37:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:37:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:37:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:37:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:38:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:38:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:38:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    2016-04-29 14:38:26 [DBG] querying nsqlookupd http://10.0.0.3:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
    2016-04-29 14:38:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
    2016-04-29 14:38:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
    2016-04-29 14:38:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
    

    Is this intended/expected behaviour?

    bug 
    opened by imjoshholloway 20
  • Consumer stall after RequeueWithoutBackoff

    Consumer stall after RequeueWithoutBackoff

    If a consumer is in backoff and attempts to recover with RDY 1 and then a message, but response with RequeueWithoutBackoff it will stall and not exit backoff. (failing test attached)

    cc: @georgicodes

    bug 
    opened by jehiah 16
  • producer: I/O timeout in PublishAsync benchmark

    producer: I/O timeout in PublishAsync benchmark

    I did some benchmark works for nsq, and encounter some strange problem. I pub 100000 msgs to nsq by go-nsq, but always failed. and I check small count (like 10000 msgs) was ok. the go-nsq log show write to nsqd timeout, and nsqd log tell me the producer's connection be reset. what problem in such situation?

    go-nsq log:

    2019/02/26 22:44:52 INF    1 (192.168.1.125:4150) connecting to nsqd
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) beginning close
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) readLoop exiting
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 ERR    1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) breaking out of writeLoop
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) writeLoop exiting
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) finished draining, cleanup exiting
    
    2019/02/26 22:45:01 INF    1 (192.168.1.125:4150) clean close complete
    
    2019/02/26 22:45:01 INF    1 exiting router
    
    

    nsqd log:

    [nsqd] 2019/02/26 22:39:55.590775 INFO: TCP: new client(192.168.1.99:4908)
    [nsqd] 2019/02/26 22:39:55.590882 INFO: CLIENT(192.168.1.99:4908): desired protocol magic '  V2'
    [nsqd] 2019/02/26 22:39:55.591299 INFO: [192.168.1.99:4908] IDENTIFY: {ClientID:HuangWei-PC Hostname:HuangWei-PC HeartbeatInterval:30000 OutputBufferSize:16384 OutputBufferTimeout:250 FeatureNegotiation:true TLSv1:false Deflate:false DeflateLevel:6 Snappy:false SampleRate:0 UserAgent:go-nsq/1.0.7 MsgTimeout:0}
    [nsqd] 2019/02/26 22:40:04.104850 INFO: PROTOCOL(V2): [192.168.1.99:4908] exiting ioloop
    [nsqd] 2019/02/26 22:40:04.104955 ERROR: client(192.168.1.99:4908) - failed to send response - write tcp 192.168.1.125:4150->192.168.1.99:4908: write: connection reset by peer
    [nsqd] 2019/02/26 22:40:04.104980 INFO: PROTOCOL(V2): [192.168.1.99:4908] exiting messagePump
    

    go benchmark code:

    func BenchmarkProducer(b *testing.B) {
    	b.StopTimer()
    
    	config := nsq.NewConfig()
    	p, _ := nsq.NewProducer("192.168.1.125:4150", config)
    
    	body := make([]byte, 512)
    	startCh := make(chan struct{})
    	var wg sync.WaitGroup
    	parallel := runtime.GOMAXPROCS(0)
    	b.Log(parallel)
    	b.N = 100000
    	doneCh := make(chan *nsq.ProducerTransaction, parallel)
    	wg.Add(b.N)
    
    	for j := 0; j < parallel; j++ {
    		go func() {
    			<-startCh
    			for i := 0; i < b.N/parallel; i++ {
    				p.PublishAsync("test", body, doneCh)
    			}
    		}()
    
    		go func() {
    			for _ = range doneCh {
    				wg.Done()
    			}
    		}()
    	}
    
    	b.StartTimer()
    	close(startCh)
    	wg.Wait()
    }
    

    nsqd config:

    nsqd --lookupd-tcp-address=0.0.0.0:4160 --snappy=true --max-msg-size=67108864 --max-msg-timeout=1h --mem-queue-size=100000 
    
    question needs-info 
    opened by huangwei1024 14
  • centralize logging and make it pluggable

    centralize logging and make it pluggable

    This request does two things, it centralizes logging to the log.go to remove some code duplication with respect to logging. It also make the log an interface so that people can easily drop their own logging structures in.

    The structure simply needs to implement the following methods:

        Debugf(format string, v ...interface{})
        Infof(format string, v ...interface{})
        Warnf(format string, v ...interface{})
        Errorf(format string, v ...interface{})
    
    enhancement question 
    opened by devdazed 14
  • consumer: expose a way to retrieve runtime stats

    consumer: expose a way to retrieve runtime stats

    Currently, the ConnectToNSQLookupd method on Consumer fails to return any error when invoked with an invalid nsqlookupd address. Instead the error is logged (and swallowed) in the queryLookupd func, right after the call to apiRequestNegotiateV1, since queryLookupd doesn't return anything. This leaves a consumer with no reliable way to tell that the lookupd address provided was invalid.

    To repro:

    func foo() {
        q, _ := nsq.NewConsumer("the-topic", "the-channel", nsq.NewConfig())
        handler := &SomeHandler{}
        q.AddHandler(handler)
        err := q.ConnectToNSQLookupd("1.2.3.4:9999")
        // err is nil!
    }
    
    question 
    opened by dcarney 12
  • producer: unexpected io.EOF when calling Stop

    producer: unexpected io.EOF when calling Stop

    We're calling Stop on a NSQ producer, and we're getting the following output after a number of MultiPublishes. Is this io.EOF error expected? We're using the tag 1.0.6.

    INFO[0011] INF    1 stopping
    INFO[0011] ERR    1 (127.0.0.1:4150) IO error - EOF
    INFO[0011] INF    1 (127.0.0.1:4150) beginning close
    INFO[0011] INF    1 (127.0.0.1:4150) readLoop exiting
    
    bug question 
    opened by larzconwell 11
  • Performance Question

    Performance Question

    Hey there, I'm using this code to make a producer that sends messages to NSQ. I'm using PublishAsync(), with a "responder" that basically does nothing:

    var respChan chan *nsq.ProducerTransaction
    
    func responder(respChan chan *nsq.ProducerTransaction) {
        for _ = range respChan {
            //Get rid of the messages or something... should probably respond, but not sure
            //how to do that yet...
        }
    }
    

    I'm sending as many messages as possible, calling publishAsync() in a loop reading from memory. My code maxes out the CPU, and manages about 14,000 messages per second (the messages are quite small, a msgpack'd struct containing a < 16 char string).

    In doing a code profile, my code apparently spends 50.5% of its time in "ExternalCode" (no idea what that is).

    I've attached my code profile: cpu copy

    Any idea what is going on here?

    question 
    opened by virtuallynathan 11
  • unintuitive behavior when using &nsq.Config{}

    unintuitive behavior when using &nsq.Config{}

    I got this on Ubuntu 12.04, go 1.2

    panic: non-positive interval for NewTicker
    
    goroutine 141 [running]:
    runtime.panic(0x815520, 0xc21010d230)
        /usr/local/go/src/pkg/runtime/panic.c:266 +0xb6
    time.NewTicker(0x0, 0x3fd8a050fdd8d7f5)
        /usr/local/go/src/pkg/time/tick.go:22 +0x9f
    github.com/bitly/go-nsq.(*Consumer).lookupdLoop(0xc21005a8c0)
        /tmp/godep/rev/2b/9a8b5edc036235580256e9a5e5f118a0025151/src/github.com/bitly/go-nsq/consumer.go:228 +0x6e
    created by github.com/bitly/go-nsq.(*Consumer).ConnectToNSQLookupd
        /tmp/godep/rev/2b/9a8b5edc036235580256e9a5e5f118a0025151/src/github.com/bitly/go-nsq/consumer.go:216 +0x282
    

    This is https://github.com/bitly/go-nsq/commit/2b9a8b5edc036235580256e9a5e5f118a0025151

    I'm connecting to 8 nsqlookupd hosts (which might increase the chance of this happening)

    I'm not setting config.lookupdPollInterval in my code.

    bug 
    opened by kjk 11
  • fix(consumer): remove old nsqd connections if addresses change

    fix(consumer): remove old nsqd connections if addresses change

    Right now, in the event that nsqd addresses change, the old connections stay around in the consumer's connections map. This simply cleans up old connections from that map.

    opened by jrkt 4
  • error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused?

    error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused?

    the nsq service is deployed on the public network. (106.75.49.135:4161, 106.75.49.135:4150),Can send and receive messages normally, but there will be 'error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused? '

    • docker-compose :
      nsqlookupd:
        image: nsqio/nsq
        command: /nsqlookupd
        ports:
          - "4160:4160"
          - "4161:4161"
      nsqd:
        image: nsqio/nsq
        command:  /nsqd --broadcast-address=nsqd --lookupd-tcp-address=nsqlookupd:4160
        depends_on:
          - nsqlookupd
        ports:
          - "4151:4151"
          - "4150:4150"
    
    • Error message:
    2/10/12 11:21:15 INF    1 [Study/Chegg] (nsqd:4150) connecting to nsqd
    2022/10/12 11:21:15 ERR    1 [Study/Chegg] (nsqd:4150) error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused
    {"Address":["106.75.49.141:4150"],"level":"info","msg":"Receiving NSQ messages from addresses","time":"2022-10-12T11:21:15+08:00"}
    

    send message: image

    sub message:

    image
    opened by shumintao 0
  • Client doesn't know it's disconnected

    Client doesn't know it's disconnected

    I have a system where I'm getting log messages from nsqd such as:

    INFO: PROTOCOL(V2): [127.0.0.1:53376] exiting ioloop ERROR: client(127.0.0.1:53376) - failed to read command - read tcp 1270.0.1:4150->127.0.0.1:53376: i/o timeout INFO: PROTOCOL(v) [127.0.0.1:53376] exiting messagePump

    From looking at the /stats endpoint I can see after this message that the consumer is no longer connected (two producers are still connected fine). I can see that there were 2 messages in timeout which I presume is related to the disconnection of the consumer.

    However, there is no indication that the consumer knows that it has lost its connection so there's no attempt to reconnect (or even just notify me that it's stopped). The StopChan isn't being called and the Stats() end point on the consumer is still giving a connection count of 1.

    Is this the correct behaviour? Have I missed something around how to detect this i/o timeout situation? Unfortunately I can't share the source code, but happy to try things and report back.

    Thanks

    Andre

    needs-info 
    opened by watchforstock 2
  • Disconnects seem to be quite ungraceful

    Disconnects seem to be quite ungraceful

    Been playing around with NSQ a lot lately and I keep hitting walls when trying to write test suites for assembling various network topologies. Most of the issues seems to stem from NSQD not handling properly consumer disconnects (I'm using go-nsq). I don't even know where to describe the strange things:

    • When stopping a consumer, sometimes the CLS message is sent to NSQD, sometimes it is not.
    • Even if the CLS does get to NSQD, sometimes it seems to not respond with CLOSE_WAIT, rather nukes the stream.
    • The logs are full of error messages on both consumer and broker side during shutdowns that one side or another tries to read/write but the stream is already dead (no graceful disconnect).
    • Disconnecting the last consumer doesn't seem to decrement the client count of a topic/channel.
    • Disconnecting a consumer doesn't seem to abort/reschedule the in-flight messages for that consumer.

    Seems to me that the entire shutdown pathway is very very wrong, just that various timeouts hack around the root cause. E.g. the client heartbeats (or lack thereof after a disconnect) is the one that will trigger the cleanup of leftover client counts; the in-flight timeout is the one that reschedules messages nor processed by a disconnected client.

    I'm unsure if I'm doing something weird here, but it seems that NSQ is very very prone to weird behavior when I have very short lived connections.

    bug 
    opened by karalabe 2
  • SIngleton Producer

    SIngleton Producer

    Is it possible to use singleton NSQ Producer? I'm running an API with NSQ Producer and it seems like i have to open and close connection everytime i hit endpoint.

    question 
    opened by wilhart 2
Releases(v1.1.0)
  • v1.1.0(Oct 24, 2021)

    There should be no backward-incompatible changes, but there are a couple new API methods.

    • #275/#281 - support separate Logger for each log level (thanks @crazyweave)
    • #282 - consumer: reduce duplicate RDY (ready) count updates (thanks @andyxning)
    • #283 - remove redundant Config initialized check (thanks @SwanSpouse)
    • #313 - add Authorization header to lookup queries
    • #321 - consumer: fix panic with some invalid lookupd http addresses (thanks @martin-sucha)
    • #317 - producer: connect() code-style improvement (thanks @martin-sucha)
    • #330 - fix random backoff jitter on 32-bit architectures
    • #333 - consumer: re-use http client with keepalives for lookupd requests (thanks @JieTrancender)
    • #336 - producer: shutdown logging prefix consistent with other logging (thanks @karalabe)
    • #294 - docs: fix producer example (thanks @nikitabuyevich)
    • #307 - docs: add exit signal handling to consumer example
    • #324 - docs: fix Consumer.SetLogger() description (thanks @gabriel-vasile)
    • #297 - add AUTHORS file
    • #329/#330 - switch to GitHub Actions for CI
    Source code(tar.gz)
    Source code(zip)
  • v1.0.8(Dec 25, 2019)

    Upgrading from 1.0.7: There are no backward incompatible changes.

    Thanks to @judwhite, @vitaliytv, and @HaraldNordgren for contributing to testing and dependency management improvements

    Changes in this release

    #249 update RDY when setting MaxInFlight to 0 #267 check response message size is positive (thanks @andyxning) #271 godoc for publisher and consumer (thanks @skateinmars) #270 set log level (thanks @YongHaoWu) #255 go vet tls.Config copying (thanks @iaburton)

    Source code(tar.gz)
    Source code(zip)
  • v1.0.7(Aug 4, 2017)

    Upgrading from 1.0.6: There are no backward incompatible changes.

    • #97/#209 - consumer: retry nsqlookupd queries
    • #179/#208 - consumer: redistribute RDY when connections are active
    • #184/#201 - producer: fix misleading Stop() EOF (thanks @mengskysama)
    • #203 - switch to golang/snappy (addressing potential snappy related deadlocks)
    • #202 - consumer: fix backoff logging
    Source code(tar.gz)
    Source code(zip)
  • v1.0.6(Jun 4, 2016)

    Upgrading from 1.0.5: There are no backward incompatible changes.

    • #175 - consumer: reduce garbage generation in DecodeMessage (thanks @Dieterbe)
    • #162 - producer: support DeferredPublish (thanks @DanielHeckrath)
    Source code(tar.gz)
    Source code(zip)
  • v1.0.5(Sep 20, 2015)

    Upgrading from 1.0.4: There are no backward incompatible changes.

    • #156 - consumer: prevent data race on RNG
    • #155 - config: support flag.Value interface
    • #147/#150 - consumer: fix application of max_backoff_duration (thanks @judwhite)
    • #138 - fix lint, vet, fmt issues
    • #137 - remove go-simplejson dependency
    Source code(tar.gz)
    Source code(zip)
  • v1.0.4(Apr 8, 2015)

    Upgrading from 1.0.3: There are no backward incompatible changes.

    • #133 - fix ErrNotConnected race during Producer connection (thanks @jeddenlea)
    • #132 - fix RDY redistribution after backoff with no connections
    • #128 - fix backoff stall when using RequeueWithoutBackoff
    • #127 - fix handling of connection closing when resuming after backoff (thanks @jnewmano)
    • #126 - allow BackoffStrategy to be set via flag (thanks @twmb)
    • #125 - add pluggable consumer BackoffStrategy; add full-jitter strategy (thanks @hden)
    • #124 - add DialTimeout and LocalAddr config (thanks @yashkin)
    • #119 - add Producer.Ping() method (thanks @zulily)
    • #122 - refactor log level string handling
    • #120 - fix Message data races on responded
    • #114 - fix lookupd jitter having no effect (thanks @judwhite)
    Source code(tar.gz)
    Source code(zip)
  • v1.0.3(Feb 7, 2015)

    Upgrading from 1.0.2: here are no backward incompatible changes.

    • #104 - fix reconnect address bug (thanks @ryanslade)
    • #106 - fix backoff reconnect deadlock (thanks @ryanslade)
    • #107 - fix out-of-bounds error when removing nsqlookupd addresses (thanks @andreas)
    • #108 - fix potential logger race conditions (thanks @judwhite)
    • #111 - fix resolved address error in reconnect loop (thanks @twmb)
    Source code(tar.gz)
    Source code(zip)
  • v1.0.2(Jan 22, 2015)

    Upgrading from 1.0.1: here are no backward incompatible changes.

    • #102 - TLS min/max config defaults (thanks @twmb)
    • #99 - fix Consumer.Stop() race and Producer.Stop() deadlock (thanks @tylertreat)
    • #92 - expose Message.NSQDAddress
    • #95 - cleanup panic during Consumer.Stop() if handlers are deadlocked
    • #98 - add tls-min-version option (thanks @twmb)
    • #93 - expose a way to get Consumer runtime stats (thanks @dcarney)
    • #94 - allow #ephemeral topic names (thanks @jamesgroat)
    Source code(tar.gz)
    Source code(zip)
  • v1.0.1(Nov 10, 2014)

    Upgrading from 1.0.0: There are no backward incompatible changes functionally, however this release no longer compiles with Go 1.0.x.

    • #89 - don't spam connection teardown cleanup messages
    • #91 - add consumer DisconnectFrom*
    • #87 - allow heartbeat_interval and output_buffer_timeout to be disabled
    • #86 - pluggable nsqlookupd behaviors
    • #83 - send RDY before FIN/REQ (forwards compatibility with bitly/nsq#404)
    • #82 - fix panic when conn isn't assigned
    • #75/#76 - minor config related bug fixes
    • #75/#77/#78 - add tls-cert and tls-key config options
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0(Aug 11, 2014)

    Upgrading from 0.3.7: The public API was significantly refactored and is not backwards compatible, please read UPGRADING.

    • #58 - support IDENTIFY msg_timeout
    • #54 - per-connection TLS config and set ServerName
    • #49 - add common connect helpers
    • #43/#63 - more flexible nsqlookupd URL specification
    • #35 - AUTH support
    • #41/#62 - use package private RNG
    • #36 - support 64 character topic/channel names
    • #30/#38/#39/#42/#45/#46/#48/#51/#52/#65/#70 - refactor public API (see UPGRADING)
    Source code(tar.gz)
    Source code(zip)
  • v0.3.7(May 25, 2014)

    Upgrading from 0.3.6: There are no backward incompatible changes. THIS IS THE LAST STABLE RELEASE PROVIDING THIS API. Future releases will be based on the api in #30 and will not be backwards compatible!

    This is a bug fix release relating to the refactoring done in 0.3.6.

    • #32 - fix potential panic for race condition when # conns == 0
    • #33/#34 - more granular connection locking
    Source code(tar.gz)
    Source code(zip)
  • v0.3.6(Apr 30, 2014)

    Upgrading from 0.3.5: There are no backward incompatible changes.

    This release includes a significant internal refactoring, designed to better encapsulate responsibility, see #19.

    Specifically:

    • make Conn public
    • move transport responsibilities into Conn from Reader/Writer
    • supply callbacks for hooking into Conn events

    As part of the refactoring, a few additional clean exit related issues were resolved:

    • wait group now includes all exit related goroutines
    • ensure that readLoop exits before exiting cleanup
    • always check messagesInFlight at readLoop exit
    • close underlying connection last
    Source code(tar.gz)
    Source code(zip)
  • v0.3.5(Apr 5, 2014)

    Upgrading from 0.3.4: There are no backward incompatible changes.

    This release includes a few new features such as support for channel sampling and sending along a user agent string (which is now displayed in nsqadmin).

    Also, a critical bug fix for potential deadlocks (thanks @kjk for reporting and help testing).

    New Features/Improvements:

    • #27 - reader logs disambiguate topic/channel
    • #22 - channel sampling
    • #23 - user agent

    Bug Fixes:

    • #24 - fix racey reader IDENTIFY buffering
    • #29 - fix recursive RLock deadlocks
    Source code(tar.gz)
    Source code(zip)
  • v0.3.4(Nov 20, 2013)

    Upgrading from 0.3.3: There are no backward incompatible changes.

    This is a bug fix release, notably potential deadlocks in Message.Requeue() and Message.Touch() as well as a potential busy loop cleaning up closed connections with in-flight messages.

    New Features/Improvements:

    • #14 - add Reader.Configure()
    • #18 - return an exported error when an nsqlookupd address is already configured

    Bug Fixes:

    • #15 - dont let handleError() loop if already connected
    • #17 - resolve potential deadlocks on Message responders
    • #16 - eliminate busy loop when draining finishedMessages
    Source code(tar.gz)
    Source code(zip)
  • v0.3.3(Oct 23, 2013)

    Upgrading from 0.3.2: This release requires NSQ binary version 0.2.23+ for compression support.

    This release contains significant Reader refactoring of the RDY handling code paths. The motivation is documented in #1 however the commits in #8 identify individual changes. Additionally, we eliminated deadlocks during connection cleanup in Writer.

    As a result, both user-facing APIs should now be considerably more robust and stable. Additionally, Reader should behave better when backing off.

    New Features/Improvements:

    • #9 - ability to ignore publish responses in Writer
    • #12 - Requeue() method on Message
    • #6 - Touch() method on Message
    • #4 - snappy/deflate feature negotiation

    Bug Fixes:

    • #8 - Reader RDY handling refactoring (race conditions, deadlocks, consolidation)
    • #13 - fix Writer deadlocks
    • #10 - stop accessing simplejson internals
    • #5 - fix max-in-flight race condition
    Source code(tar.gz)
    Source code(zip)
  • v0.3.2(Aug 29, 2013)

    Upgrading from 0.3.1: This release requires NSQ binary version 0.2.22+ for TLS support.

    New Features/Improvements:

    • #227 - TLS feature negotiation
    • #164/#202/#255 - add Writer
    • #186 - MaxBackoffDuration of 0 disables backoff
    • #175 - support for nsqd config option --max-rdy-count
    • #169 - auto-reconnect to hard-coded nsqd

    Bug Fixes:

    • #254/#256/#257 - new connection RDY starvation
    • #250 - nsqlookupd polling improvements
    • #243 - limit IsStarved() to connections w/ in-flight messages
    • #169 - use last RDY count for IsStarved(); redistribute RDY state
    • #204 - fix early termination blocking
    • #177 - support broadcast_address
    • #161 - connection pool goroutine safety
    Source code(tar.gz)
    Source code(zip)
  • v0.3.1(Aug 29, 2013)

    Upgrading from 0.3.0: This release requires NSQ binary version 0.2.17+ for TOUCH support.

    • #119 - add TOUCH command
    • #133 - improved handling of errors/magic
    • #127 - send IDENTIFY (missed in #90)
    • #16 - add backoff to Reader
    Source code(tar.gz)
    Source code(zip)
  • v0.3.0(Aug 29, 2013)

    Upgrading from 0.2.4: There are no backward incompatible changes to applications written against the public nsq.Reader API.

    However, there are a few backward incompatible changes to the API for applications that directly use other public methods, or properties of a few NSQ data types:

    nsq.Message IDs are now a type nsq.MessageID (a [16]byte array). The signatures of nsq.Finish() and nsq.Requeue() reflect this change.

    nsq.SendCommand() and nsq.Frame() were removed in favor of nsq.SendFramedResponse().

    nsq.Subscribe() no longer accepts shortId and longId. If upgrading your consumers before upgrading your nsqd binaries to 0.2.16-rc.1 they will not be able to send the optional custom identifiers.

    • #90 performance optimizations
    • #81 reader performance improvements / MPUB support
    Source code(tar.gz)
    Source code(zip)
Owner
NSQ
A realtime distributed messaging platform
NSQ
A tiny wrapper around NSQ topic and channel :rocket:

Event Bus NSQ A tiny wrapper around go-nsq topic and channel. Protect nsq calls with gobreaker. Installation go get -u github.com/rafaeljesus/nsq-even

Rafael Jesus 75 Sep 27, 2022
Pause / Unpause NSQ Topics and Channels

Action pause unpause empty info check Worker Pool 1 <= n <= len(target) 0 for unlimited pool depend on how many the targets Target Array of topics or

Muhammad Zaki Al-Afrani 7 Jun 29, 2022
Package notify provides an implementation of the Gnome DBus Notifications Specification.

go-notify Package notify provides an implementation of the Gnome DBus Notifications Specification. Examples Display a simple notification. ntf := noti

null 65 Dec 27, 2022
A simple pubsub package for go.

Package pubsub implements a simple multi-topic pub-sub library. Install pubsub with, go get github.com/cskr/pubsub This repository is a go module and

Chandra Sekar S 387 Dec 31, 2022
Simple synchronous event pub-sub package for Golang

event-go Simple synchronous event pub-sub package for Golang This is a Go language package for publishing/subscribing domain events. This is useful to

itchyny 19 Jun 16, 2022
Scalable package delivery logistics simulator built using SingleStore and Vectorized Redpanda

Reference Architecture using SingleStore and Redpanda for global logistics ?? INFO: For the story behind this code (and epic dashboards), check out th

SingleStore Labs 31 Oct 29, 2022
An opinionated package that helps you print user-friendly output messages from your Go command line applications.

github.com/eth-p/clout (Command Line Output) clout is a package that helps you print user-friendly output messages from your Go command line applicati

Ethan P. 4 Jan 15, 2022
Redis as backend for Queue Package

redis Redis as backend for Queue package Setup start the redis server redis-server start the redis cluster, see the config # server 01 mkdir server01

golang-queue 11 Oct 16, 2022
Package htmltopdf implements wkhtmltopdf Go bindings.

htmltopdf Package htmltopdf implements wkhtmltopdf Go bindings. It can be used to convert HTML documents to PDF files. The package does not use the wk

DigitalData s.r.o. 3 Sep 19, 2022
NSQ as backend for Queue Package

NSQ as backend for Queue Package

golang-queue 10 Jul 4, 2022
A tiny wrapper around NSQ topic and channel :rocket:

Event Bus NSQ A tiny wrapper around go-nsq topic and channel. Protect nsq calls with gobreaker. Installation go get -u github.com/rafaeljesus/nsq-even

Rafael Jesus 75 Sep 27, 2022
Pause / Unpause NSQ Topics and Channels

Action pause unpause empty info check Worker Pool 1 <= n <= len(target) 0 for unlimited pool depend on how many the targets Target Array of topics or

Muhammad Zaki Al-Afrani 7 Jun 29, 2022
Nsq http auth service for golang

nsq-auth nsq http auth service ./nsq-auth -h Usage: 2021/12/25 17:10:56 Usage:

纸喵 15 Nov 21, 2022
Package set is a small wrapper around the official reflect package that facilitates loose type conversion and assignment into native Go types.

Package set is a small wrapper around the official reflect package that facilitates loose type conversion and assignment into native Go types. Read th

null 44 Dec 27, 2022
🤖 Automatically scrape PortableApps.com (or official release page) and convert into Edgeless plugin package

Edgeless 自动插件机器人 2 简介 该项目是为了使用 Golang 重新实现 Edgeless 自动插件机器人 特性 (WIP) 完全兼容 Edgeless 自动插件机器人,包括 Tasks,以实现无缝迁移 更快的构建速度 更好的代码结构 更高的拓展性 工作进度 截止至 2021/11/28

Hydrogen 2 Sep 12, 2022
Mongo Go Models (mgm) is a fast and simple MongoDB ODM for Go (based on official Mongo Go Driver)

Mongo Go Models Important Note: We changed package name from github.com/Kamva/mgm/v3(uppercase Kamva) to github.com/kamva/mgm/v3(lowercase kamva) in v

kamva 607 Jan 2, 2023
Qmgo - The Go driver for MongoDB. It‘s based on official mongo-go-driver but easier to use like Mgo.

Qmgo English | 简体中文 Qmgo is a Go driver for MongoDB . It is based on MongoDB official driver, but easier to use like mgo (such as the chain call). Qmg

Qiniu Cloud 1k Dec 28, 2022
The Official Twilio SendGrid Led, Community Driven Golang API Library

NEW: Subscribe to email notifications for releases and breaking changes. The default branch name for this repository has been changed to main as of 07

Twilio SendGrid 872 Dec 15, 2022
GoThanks automatically stars Go's official repository and your go.mod github dependencies, providing a simple way to say thanks to the maintainers of the modules you use and the contributors of Go itself.

Give thanks (in the form of a GitHub ★) to your fellow Go modules maintainers. About GoThanks performs the following operations Sends a star to Go's r

psampaz 117 Dec 24, 2022
Casbin-forum is the official forum for Casbin developers and users.

Casbin-forum is the official forum for Casbin developers and users. Link https://forum.casbin.com/ Architecture Casbin-forum contains 2 p

Casbin 922 Jan 3, 2023