go-nsq
The official Go package for NSQ.
Docs
See godoc and the main repo apps directory for examples of clients built using this package.
Tests
Tests are run via ./test.sh
(which requires nsqd
and nsqlookupd
to be installed).
The official Go package for NSQ.
See godoc and the main repo apps directory for examples of clients built using this package.
Tests are run via ./test.sh
(which requires nsqd
and nsqlookupd
to be installed).
We'd like to be able to override how a consumer queries from NSQLookupds. I think this approach would also allow others to switch NSQLookupd for etcd, zookeeper, etc.
This is still a WIP but I hope you can see where I'm going with this. Suggestions and Criticism welcome.
cc @oliver-bitly @jehiah @mreiferson
enhancementI'm guessing this is user error, but I'm getting a panic when stopping a consumer as of this commit: 4e74fa1f8933064a4f04786e65d3ee7b611be598. Reading the comment in there, I'm trying to understand what's going on.
My shutdown code looks something like this:
func (n *NSQPeer) Teardown() {
n.producer.Stop()
if n.consumer != nil {
n.consumer.DisconnectFromNSQD(n.host)
n.consumer.Stop()
<-n.consumer.StopChan
}
}
Here's the error:
runtime.panic(0x509600, 0x84c1d5)
/usr/local/Cellar/go/1.3/libexec/src/pkg/runtime/panic.c:279 +0xf5
github.com/bitly/go-nsq.(*Consumer).exit(0xc20800ef00)
/Users/tylertreat/Go/src/github.com/bitly/go-nsq/consumer.go:1082 +0x2e
github.com/bitly/go-nsq.func·007()
/Users/tylertreat/Go/src/github.com/bitly/go-nsq/consumer.go:990 +0x29
created by time.goFunc
/usr/local/Cellar/go/1.3/libexec/src/pkg/time/sleep.go:121 +0x47
Can anyone provide some insight on why I'm seeing this now?
bugnsqd --version nsqd v1.0.0-compat (built w/go1.8)
go-nsq version commit b9762cdcb6d5cc5ac5287ca076354143d332cc97 Tue Feb 14 16:13:23 2017 -0800
# go version go version go1.8 linux/amd64
http://127.0.0.1:4151/stats
[d.c2.cp#ephemeral/detect#ephemeral] (127.0.0.3:4150) heartbeat received
. but seems won't got any other nsq message.# curl -XGET http://127.0.0.1:4151/stats;
[[email protected] ptd]# curl -XGET http://127.0.0.1:4151/stats;
nsqd v1.0.0-compat (built w/go1.8)
start_time 2017-03-27T19:07:02+08:00
uptime 45m7.260003591s
Health: OK
[.raw.raw#ephemeral] depth: 0 be-depth: 0 msgs: 6656 e2e%:
[processor#ephemeral ] depth: 0 be-depth: 0 inflt: 0 def: 0 re-q: 0 timeout: 0 msgs: 6656 e2e%:
[V2 18-190 ] state: 3 inflt: 0 rdy: 1 fin: 6656 re-q: 0 msgs: 6656 connected: 8s
[d.c2.cp#ephemeral] depth: 10000 be-depth: 0 msgs: 791227 e2e%:
[d.c2.cp.done#ephemeral] depth: 0 be-depth: 0 msgs: 0 e2e%:
[processor#ephemeral ] depth: 0 be-depth: 0 inflt: 0 def: 0 re-q: 0 timeout: 0 msgs: 0 e2e%:
[V2 18-190 ] state: 3 inflt: 0 rdy: 1 fin: 0 re-q: 0 msgs: 0 connected: 8s
[d.c2.domain#ephemeral] depth: 10000 be-depth: 0 msgs: 208218 e2e%:
...
code snip
func nsqSubscribe(addr string, topic string, channel string, hdlr nsq.HandlerFunc) error {
consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig())
if err != nil {
print("new consumer error: ", err, "\n")
time.Sleep(1 * time.Second) //wait 1s
panic(err)
}
consumer.AddHandler(hdlr)
err = consumer.ConnectToNSQD(addr)
if err != nil {
print("connect nsqd error: ", err, "\n")
time.Sleep(1 * time.Second) //wait 1s
panic(err)
}
_ = <-consumer.StopChan
panic("nsq conn dead topic=" + topic + " channel=" + channel)
return nil
}
func main(){
producer, err := nsq.NewProducer(nsqConf.Local.Addr, nsq.NewConfig())
if err != nil {
panic(err)
}
go func() {
detector := new(c2.C2Sdk)
if detectorEnabled {
err = detector.Init()
if err != nil {
fmt.Println("Failed to init c2")
panic(err)
}
}
nsqSubscribe(nsqConf.Local.Addr, "d.c2.cp#ephemeral", "detect#ephemeral",
nsq.HandlerFunc(func(message *nsq.Message) error {
return handler_scan(message, detector,
producer, unmarshal_url, scan_url,
"d.c2.cp.done#ephemeral")
}))
}()
go func() {
detector := new(c2.C2Sdk)
if detectorEnabled {
err = detector.Init()
if err != nil {
fmt.Println("Failed to init c2")
panic(err)
}
}
nsqSubscribe(nsqConf.Local.Addr, "d.c2.url#ephemeral", "detect#ephemeral",
nsq.HandlerFunc(func(message *nsq.Message) error {
return handler_scan(message, detector,
producer, unmarshal_url, scan_url,
"d.c2.url.done#ephemeral")
}))
}()
.....
}
I'm currently investigating an issue where even after calling either DisconnectFromNSQLookupd
or DisconnectFromNSQD
on a host that is already disconnected, re-connection attempts still occur.
In our scenario we are consuming from 3 hosts: 10.0.0.1/10.0.0.2/10.0.0.3 and the host 10.0.0.1
becomes unhealthy (service stopped) we remove it from the list of nodes and call DisconnectFromNSQLookupd
or DisconnectFromNSQD
.
As you can see from the following log output, the connection to host 10.0.0.1
was lost and re-connection attempts are made. The host is then removed from our list and DisconnectFromNSQLookupd
is called. However, the re-connection attempts still occur:
2016-04-29 14:35:26 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:35:26 [DBG] IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false AuthRequired:false} (conn.go 328)
2016-04-29 14:35:29 [ERR] IO error - EOF (conn.go 471)
2016-04-29 14:35:29 [INF] beginning close (conn.go 611)
2016-04-29 14:35:29 [INF] readLoop exiting (conn.go 528)
2016-04-29 14:35:29 [INF] breaking out of writeLoop (conn.go 535)
2016-04-29 14:35:29 [INF] writeLoop exiting (conn.go 581)
2016-04-29 14:35:29 [INF] finished draining, cleanup exiting (conn.go 660)
2016-04-29 14:35:29 [INF] clean close complete (conn.go 668)
2016-04-29 14:35:29 [DBG] there are 2 connections left alive (consumer.go 741)
2016-04-29 14:35:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:35:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:35:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:35:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:35:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:35:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:35:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:36:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:36:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:36:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:36:26 [DBG] querying nsqlookupd http://10.0.0.3:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
2016-04-29 14:36:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:36:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:36:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:36:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:36:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:36:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:36:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:36:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:36:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:37:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:37:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:37:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:37:26 [DBG] querying nsqlookupd http://10.0.0.2:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
2016-04-29 14:37:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:37:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:37:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:37:32 [INF] Connecting to NSQ via lookupd hosts: [10.0.0.3:4161 10.0.0.2:4161] (subscriber.go 166)
2016-04-29 14:37:32 [INF] Connecting to NSQ via lookupd hosts: [10.0.0.3:4161 10.0.0.2:4161] (subscriber.go 166)
2016-04-29 14:37:32 [INF] Disconnecting from NSQ hosts: [10.0.0.1:4150] (subscriber.go 176)
2016-04-29 14:37:32 [INF] Disconnecting from NSQ lookupds: [10.0.0.1:4161] (subscriber.go 187)
2016-04-29 14:37:44 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:37:44 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:37:44 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:37:59 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:37:59 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:37:59 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:38:14 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:38:14 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:38:14 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
2016-04-29 14:38:26 [DBG] querying nsqlookupd http://10.0.0.3:4161/lookup?topic=helloworld%23ephemeral (consumer.go 419)
2016-04-29 14:38:29 [INF] (10.0.0.1:4150) connecting to nsqd (consumer.go 508)
2016-04-29 14:38:29 [ERR] (10.0.0.1:4150) error connecting to nsqd - dial tcp 10.0.0.1:4150: getsockopt: connection refused (consumer.go 790)
2016-04-29 14:38:29 [DBG] (10.0.0.1:4150) re-connecting in 15 seconds... (consumer.go 776)
Is this intended/expected behaviour?
bugIf a consumer is in backoff and attempts to recover with RDY 1 and then a message, but response with RequeueWithoutBackoff it will stall and not exit backoff. (failing test attached)
cc: @georgicodes
bugI did some benchmark works for nsq, and encounter some strange problem. I pub 100000 msgs to nsq by go-nsq, but always failed. and I check small count (like 10000 msgs) was ok. the go-nsq log show write to nsqd timeout, and nsqd log tell me the producer's connection be reset. what problem in such situation?
go-nsq log:
2019/02/26 22:44:52 INF 1 (192.168.1.125:4150) connecting to nsqd
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 INF 1 (192.168.1.125:4150) beginning close
2019/02/26 22:45:01 INF 1 (192.168.1.125:4150) readLoop exiting
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) IO error - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 ERR 1 (192.168.1.125:4150) sending command - write tcp 192.168.1.99:4908->192.168.1.125:4150: i/o timeout
2019/02/26 22:45:01 INF 1 (192.168.1.125:4150) breaking out of writeLoop
2019/02/26 22:45:01 INF 1 (192.168.1.125:4150) writeLoop exiting
2019/02/26 22:45:01 INF 1 (192.168.1.125:4150) finished draining, cleanup exiting
2019/02/26 22:45:01 INF 1 (192.168.1.125:4150) clean close complete
2019/02/26 22:45:01 INF 1 exiting router
nsqd log:
[nsqd] 2019/02/26 22:39:55.590775 INFO: TCP: new client(192.168.1.99:4908)
[nsqd] 2019/02/26 22:39:55.590882 INFO: CLIENT(192.168.1.99:4908): desired protocol magic ' V2'
[nsqd] 2019/02/26 22:39:55.591299 INFO: [192.168.1.99:4908] IDENTIFY: {ClientID:HuangWei-PC Hostname:HuangWei-PC HeartbeatInterval:30000 OutputBufferSize:16384 OutputBufferTimeout:250 FeatureNegotiation:true TLSv1:false Deflate:false DeflateLevel:6 Snappy:false SampleRate:0 UserAgent:go-nsq/1.0.7 MsgTimeout:0}
[nsqd] 2019/02/26 22:40:04.104850 INFO: PROTOCOL(V2): [192.168.1.99:4908] exiting ioloop
[nsqd] 2019/02/26 22:40:04.104955 ERROR: client(192.168.1.99:4908) - failed to send response - write tcp 192.168.1.125:4150->192.168.1.99:4908: write: connection reset by peer
[nsqd] 2019/02/26 22:40:04.104980 INFO: PROTOCOL(V2): [192.168.1.99:4908] exiting messagePump
go benchmark code:
func BenchmarkProducer(b *testing.B) {
b.StopTimer()
config := nsq.NewConfig()
p, _ := nsq.NewProducer("192.168.1.125:4150", config)
body := make([]byte, 512)
startCh := make(chan struct{})
var wg sync.WaitGroup
parallel := runtime.GOMAXPROCS(0)
b.Log(parallel)
b.N = 100000
doneCh := make(chan *nsq.ProducerTransaction, parallel)
wg.Add(b.N)
for j := 0; j < parallel; j++ {
go func() {
<-startCh
for i := 0; i < b.N/parallel; i++ {
p.PublishAsync("test", body, doneCh)
}
}()
go func() {
for _ = range doneCh {
wg.Done()
}
}()
}
b.StartTimer()
close(startCh)
wg.Wait()
}
nsqd config:
nsqd --lookupd-tcp-address=0.0.0.0:4160 --snappy=true --max-msg-size=67108864 --max-msg-timeout=1h --mem-queue-size=100000
question needs-info
This request does two things, it centralizes logging to the log.go
to remove some code duplication with respect to logging. It also make the log an interface so that people can easily drop their own logging structures in.
The structure simply needs to implement the following methods:
Debugf(format string, v ...interface{})
Infof(format string, v ...interface{})
Warnf(format string, v ...interface{})
Errorf(format string, v ...interface{})
enhancement question
Currently, the ConnectToNSQLookupd method on Consumer fails to return any error when invoked with an invalid nsqlookupd address. Instead the error is logged (and swallowed) in the queryLookupd func, right after the call to apiRequestNegotiateV1, since queryLookupd doesn't return anything. This leaves a consumer with no reliable way to tell that the lookupd address provided was invalid.
To repro:
func foo() {
q, _ := nsq.NewConsumer("the-topic", "the-channel", nsq.NewConfig())
handler := &SomeHandler{}
q.AddHandler(handler)
err := q.ConnectToNSQLookupd("1.2.3.4:9999")
// err is nil!
}
question
We're calling Stop on a NSQ producer, and we're getting the following output after a number of MultiPublishes. Is this io.EOF error expected? We're using the tag 1.0.6.
INFO[0011] INF 1 stopping
INFO[0011] ERR 1 (127.0.0.1:4150) IO error - EOF
INFO[0011] INF 1 (127.0.0.1:4150) beginning close
INFO[0011] INF 1 (127.0.0.1:4150) readLoop exiting
bug question
Hey there, I'm using this code to make a producer that sends messages to NSQ. I'm using PublishAsync(), with a "responder" that basically does nothing:
var respChan chan *nsq.ProducerTransaction
func responder(respChan chan *nsq.ProducerTransaction) {
for _ = range respChan {
//Get rid of the messages or something... should probably respond, but not sure
//how to do that yet...
}
}
I'm sending as many messages as possible, calling publishAsync() in a loop reading from memory. My code maxes out the CPU, and manages about 14,000 messages per second (the messages are quite small, a msgpack'd struct containing a < 16 char string).
In doing a code profile, my code apparently spends 50.5% of its time in "ExternalCode" (no idea what that is).
I've attached my code profile:
Any idea what is going on here?
questionI got this on Ubuntu 12.04, go 1.2
panic: non-positive interval for NewTicker
goroutine 141 [running]:
runtime.panic(0x815520, 0xc21010d230)
/usr/local/go/src/pkg/runtime/panic.c:266 +0xb6
time.NewTicker(0x0, 0x3fd8a050fdd8d7f5)
/usr/local/go/src/pkg/time/tick.go:22 +0x9f
github.com/bitly/go-nsq.(*Consumer).lookupdLoop(0xc21005a8c0)
/tmp/godep/rev/2b/9a8b5edc036235580256e9a5e5f118a0025151/src/github.com/bitly/go-nsq/consumer.go:228 +0x6e
created by github.com/bitly/go-nsq.(*Consumer).ConnectToNSQLookupd
/tmp/godep/rev/2b/9a8b5edc036235580256e9a5e5f118a0025151/src/github.com/bitly/go-nsq/consumer.go:216 +0x282
This is https://github.com/bitly/go-nsq/commit/2b9a8b5edc036235580256e9a5e5f118a0025151
I'm connecting to 8 nsqlookupd hosts (which might increase the chance of this happening)
I'm not setting config.lookupdPollInterval in my code.
bugRight now, in the event that nsqd addresses change, the old connections stay around in the consumer's connections map. This simply cleans up old connections from that map.
the nsq service is deployed on the public network. (106.75.49.135:4161, 106.75.49.135:4150),Can send and receive messages normally, but there will be 'error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused? '
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160:4160"
- "4161:4161"
nsqd:
image: nsqio/nsq
command: /nsqd --broadcast-address=nsqd --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4151:4151"
- "4150:4150"
2/10/12 11:21:15 INF 1 [Study/Chegg] (nsqd:4150) connecting to nsqd
2022/10/12 11:21:15 ERR 1 [Study/Chegg] (nsqd:4150) error connecting to nsqd - dial tcp 127.0.0.1:4150: connect: connection refused
{"Address":["106.75.49.141:4150"],"level":"info","msg":"Receiving NSQ messages from addresses","time":"2022-10-12T11:21:15+08:00"}
send message:
sub message:
I have a system where I'm getting log messages from nsqd such as:
INFO: PROTOCOL(V2): [127.0.0.1:53376] exiting ioloop
ERROR: client(127.0.0.1:53376) - failed to read command - read tcp 1270.0.1:4150->127.0.0.1:53376: i/o timeout
INFO: PROTOCOL(v) [127.0.0.1:53376] exiting messagePump
From looking at the /stats
endpoint I can see after this message that the consumer is no longer connected (two producers are still connected fine). I can see that there were 2 messages in timeout which I presume is related to the disconnection of the consumer.
However, there is no indication that the consumer knows that it has lost its connection so there's no attempt to reconnect (or even just notify me that it's stopped). The StopChan
isn't being called and the Stats()
end point on the consumer is still giving a connection count of 1.
Is this the correct behaviour? Have I missed something around how to detect this i/o timeout situation? Unfortunately I can't share the source code, but happy to try things and report back.
Thanks
Andre
needs-infoBeen playing around with NSQ a lot lately and I keep hitting walls when trying to write test suites for assembling various network topologies. Most of the issues seems to stem from NSQD not handling properly consumer disconnects (I'm using go-nsq). I don't even know where to describe the strange things:
CLS
message is sent to NSQD, sometimes it is not.CLOSE_WAIT
, rather nukes the stream.Seems to me that the entire shutdown pathway is very very wrong, just that various timeouts hack around the root cause. E.g. the client heartbeats (or lack thereof after a disconnect) is the one that will trigger the cleanup of leftover client counts; the in-flight timeout is the one that reschedules messages nor processed by a disconnected client.
I'm unsure if I'm doing something weird here, but it seems that NSQ is very very prone to weird behavior when I have very short lived connections.
bugIs it possible to use singleton NSQ Producer? I'm running an API with NSQ Producer and it seems like i have to open and close connection everytime i hit endpoint.
questionThere should be no backward-incompatible changes, but there are a couple new API methods.
Upgrading from 1.0.7: There are no backward incompatible changes.
Thanks to @judwhite, @vitaliytv, and @HaraldNordgren for contributing to testing and dependency management improvements
Changes in this release
#249 update RDY when setting MaxInFlight to 0 #267 check response message size is positive (thanks @andyxning) #271 godoc for publisher and consumer (thanks @skateinmars) #270 set log level (thanks @YongHaoWu) #255 go vet tls.Config copying (thanks @iaburton)
Source code(tar.gz)Upgrading from 1.0.6: There are no backward incompatible changes.
Upgrading from 1.0.5: There are no backward incompatible changes.
DeferredPublish
(thanks @DanielHeckrath)Upgrading from 1.0.4: There are no backward incompatible changes.
flag.Value
interfacemax_backoff_duration
(thanks @judwhite)go-simplejson
dependencyUpgrading from 1.0.3: There are no backward incompatible changes.
ErrNotConnected
race during Producer
connection (thanks @jeddenlea)RDY
redistribution after backoff with no connectionsRequeueWithoutBackoff
BackoffStrategy
to be set via flag (thanks @twmb)BackoffStrategy
; add full-jitter strategy (thanks @hden)DialTimeout
and LocalAddr
config (thanks @yashkin)Producer.Ping()
method (thanks @zulily)Message
data races on responded
Upgrading from 1.0.2: here are no backward incompatible changes.
Upgrading from 1.0.1: here are no backward incompatible changes.
Consumer.Stop()
race and Producer.Stop()
deadlock (thanks @tylertreat)Message.NSQDAddress
Consumer.Stop()
if handlers are deadlockedtls-min-version
option (thanks @twmb)Consumer
runtime stats (thanks @dcarney)#ephemeral
topic names (thanks @jamesgroat)Upgrading from 1.0.0: There are no backward incompatible changes functionally, however this release no longer compiles with Go 1.0.x
.
DisconnectFrom*
heartbeat_interval
and output_buffer_timeout
to be disablednsqlookupd
behaviorsRDY
before FIN
/REQ
(forwards compatibility with bitly/nsq#404)tls-cert
and tls-key
config optionsUpgrading from 0.3.7: The public API was significantly refactored and is not backwards compatible, please read UPGRADING.
IDENTIFY
msg_timeout
ServerName
nsqlookupd
URL specificationAUTH
supportUpgrading from 0.3.6: There are no backward incompatible changes. THIS IS THE LAST STABLE RELEASE PROVIDING THIS API. Future releases will be based on the api in #30 and will not be backwards compatible!
This is a bug fix release relating to the refactoring done in 0.3.6
.
Upgrading from 0.3.5: There are no backward incompatible changes.
This release includes a significant internal refactoring, designed to better encapsulate responsibility, see #19.
Specifically:
Conn
publicConn
from Reader
/Writer
Conn
eventsAs part of the refactoring, a few additional clean exit related issues were resolved:
readLoop
exits before exiting cleanup
messagesInFlight
at readLoop
exitUpgrading from 0.3.4: There are no backward incompatible changes.
This release includes a few new features such as support for channel sampling and sending along a user agent string (which is now displayed in nsqadmin
).
Also, a critical bug fix for potential deadlocks (thanks @kjk for reporting and help testing).
New Features/Improvements:
Bug Fixes:
Upgrading from 0.3.3: There are no backward incompatible changes.
This is a bug fix release, notably potential deadlocks in Message.Requeue()
and Message.Touch()
as well as a potential busy loop cleaning up closed connections with in-flight messages.
New Features/Improvements:
Reader.Configure()
nsqlookupd
address is already configuredBug Fixes:
handleError()
loop if already connectedMessage
respondersfinishedMessages
Upgrading from 0.3.2: This release requires NSQ binary version 0.2.23+
for compression support.
This release contains significant Reader
refactoring of the RDY handling code paths. The motivation is documented in #1 however the commits in #8 identify individual changes. Additionally, we eliminated deadlocks during connection cleanup in Writer
.
As a result, both user-facing APIs should now be considerably more robust and stable. Additionally, Reader
should behave better when backing off.
New Features/Improvements:
Writer
Requeue()
method on Message
Touch()
method on Message
Bug Fixes:
Reader
RDY handling refactoring (race conditions, deadlocks, consolidation)Writer
deadlocksmax-in-flight
race conditionUpgrading from 0.3.1: This release requires NSQ binary version 0.2.22+
for TLS support.
New Features/Improvements:
Writer
MaxBackoffDuration
of 0
disables backoffnsqd
config option --max-rdy-count
nsqd
Bug Fixes:
nsqlookupd
polling improvementsIsStarved()
to connections w/ in-flight messagesIsStarved()
; redistribute RDY statebroadcast_address
Upgrading from 0.3.0: This release requires NSQ binary version 0.2.17+
for TOUCH
support.
Upgrading from 0.2.4: There are no backward incompatible changes to applications
written against the public nsq.Reader
API.
However, there are a few backward incompatible changes to the API for applications that directly use other public methods, or properties of a few NSQ data types:
nsq.Message
IDs are now a type nsq.MessageID
(a [16]byte
array). The signatures of
nsq.Finish()
and nsq.Requeue()
reflect this change.
nsq.SendCommand()
and nsq.Frame()
were removed in favor of nsq.SendFramedResponse()
.
nsq.Subscribe()
no longer accepts shortId
and longId
. If upgrading your consumers
before upgrading your nsqd
binaries to 0.2.16-rc.1
they will not be able to send the
optional custom identifiers.
Event Bus NSQ A tiny wrapper around go-nsq topic and channel. Protect nsq calls with gobreaker. Installation go get -u github.com/rafaeljesus/nsq-even
Action pause unpause empty info check Worker Pool 1 <= n <= len(target) 0 for unlimited pool depend on how many the targets Target Array of topics or
go-notify Package notify provides an implementation of the Gnome DBus Notifications Specification. Examples Display a simple notification. ntf := noti
Package pubsub implements a simple multi-topic pub-sub library. Install pubsub with, go get github.com/cskr/pubsub This repository is a go module and
event-go Simple synchronous event pub-sub package for Golang This is a Go language package for publishing/subscribing domain events. This is useful to
Reference Architecture using SingleStore and Redpanda for global logistics ?? INFO: For the story behind this code (and epic dashboards), check out th
github.com/eth-p/clout (Command Line Output) clout is a package that helps you print user-friendly output messages from your Go command line applicati
redis Redis as backend for Queue package Setup start the redis server redis-server start the redis cluster, see the config # server 01 mkdir server01
htmltopdf Package htmltopdf implements wkhtmltopdf Go bindings. It can be used to convert HTML documents to PDF files. The package does not use the wk
NSQ as backend for Queue Package
Event Bus NSQ A tiny wrapper around go-nsq topic and channel. Protect nsq calls with gobreaker. Installation go get -u github.com/rafaeljesus/nsq-even
Action pause unpause empty info check Worker Pool 1 <= n <= len(target) 0 for unlimited pool depend on how many the targets Target Array of topics or
nsq-auth nsq http auth service ./nsq-auth -h Usage: 2021/12/25 17:10:56 Usage:
Package set is a small wrapper around the official reflect package that facilitates loose type conversion and assignment into native Go types. Read th
Edgeless 自动插件机器人 2 简介 该项目是为了使用 Golang 重新实现 Edgeless 自动插件机器人 特性 (WIP) 完全兼容 Edgeless 自动插件机器人,包括 Tasks,以实现无缝迁移 更快的构建速度 更好的代码结构 更高的拓展性 工作进度 截止至 2021/11/28
Mongo Go Models Important Note: We changed package name from github.com/Kamva/mgm/v3(uppercase Kamva) to github.com/kamva/mgm/v3(lowercase kamva) in v
Qmgo English | 简体中文 Qmgo is a Go driver for MongoDB . It is based on MongoDB official driver, but easier to use like mgo (such as the chain call). Qmg
NEW: Subscribe to email notifications for releases and breaking changes. The default branch name for this repository has been changed to main as of 07
Give thanks (in the form of a GitHub ★) to your fellow Go modules maintainers. About GoThanks performs the following operations Sends a star to Go's r
Casbin-forum is the official forum for Casbin developers and users. Link https://forum.casbin.com/ Architecture Casbin-forum contains 2 p