Golang client for NATS, the cloud native messaging system.

Overview

NATS - Go Client

A Go client for the NATS messaging system.

License Apache 2 FOSSA Status Go Report Card Build Status GoDoc Coverage Status

Installation

# Go client
go get github.com/nats-io/nats.go/

# Server
go get github.com/nats-io/nats-server

When using or transitioning to Go modules support:

# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.10.0

# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2

# NATS Server v1 is installed otherwise
# go get github.com/nats-io/nats-server

Basic Usage

import nats "github.com/nats-io/nats.go"

// Connect to a server
nc, _ := nats.Connect(nats.DefaultURL)

// Simple Publisher
nc.Publish("foo", []byte("Hello World"))

// Simple Async Subscriber
nc.Subscribe("foo", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

// Responding to a request message
nc.Subscribe("request", func(m *nats.Msg) {
    m.Respond([]byte("answer is 42"))
})

// Simple Sync Subscriber
sub, err := nc.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)

// Channel Subscriber
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("foo", ch)
msg := <- ch

// Unsubscribe
sub.Unsubscribe()

// Drain
sub.Drain()

// Requests
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)

// Replies
nc.Subscribe("help", func(m *nats.Msg) {
    nc.Publish(m.Reply, []byte("I can help!"))
})

// Drain connection (Preferred for responders)
// Close() not needed if this is called.
nc.Drain()

// Close connection
nc.Close()

Encoded Connections

nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer c.Close()

// Simple Publisher
c.Publish("foo", "Hello World")

// Simple Async Subscriber
c.Subscribe("foo", func(s string) {
    fmt.Printf("Received a message: %s\n", s)
})

// EncodedConn can Publish any raw Go type using the registered Encoder
type person struct {
     Name     string
     Address  string
     Age      int
}

// Go type Subscriber
c.Subscribe("hello", func(p *person) {
    fmt.Printf("Received a person: %+v\n", p)
})

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}

// Go type Publisher
c.Publish("hello", me)

// Unsubscribe
sub, err := c.Subscribe("foo", nil)
// ...
sub.Unsubscribe()

// Requests
var response string
err = c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil {
    fmt.Printf("Request failed: %v\n", err)
}

// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
    c.Publish(reply, "I can help!")
})

// Close connection
c.Close();

New Authentication (Nkeys and User Credentials)

This requires server with version >= 2.0.0

NATS servers have a new security and authentication mechanism to authenticate with user credentials and Nkeys. The simplest form is to use the helper method UserCredentials(credsFilepath).

nc, err := nats.Connect(url, nats.UserCredentials("user.creds"))

The helper methods creates two callback handlers to present the user JWT and sign the nonce challenge from the server. The core client library never has direct access to your private key and simply performs the callback for signing the server challenge. The helper will load and wipe and erase memory it uses for each connect or reconnect.

The helper also can take two entries, one for the JWT and one for the NKey seed file.

nc, err := nats.Connect(url, nats.UserCredentials("user.jwt", "user.nk"))

You can also set the callback handlers directly and manage challenge signing directly.

nc, err := nats.Connect(url, nats.UserJWT(jwtCB, sigCB))

Bare Nkeys are also supported. The nkey seed should be in a read only file, e.g. seed.txt

> cat seed.txt
# This is my seed nkey!
SUAGMJH5XLGZKQQWAWKRZJIGMOU4HPFUYLXJMXOO5NLFEO2OOQJ5LPRDPM

This is a helper function which will load and decode and do the proper signing for the server nonce. It will clear memory in between invocations. You can choose to use the low level option and provide the public key and a signature callback on your own.

opt, err := nats.NkeyOptionFromSeed("seed.txt")
nc, err := nats.Connect(serverUrl, opt)

// Direct
nc, err := nats.Connect(serverUrl, nats.Nkey(pubNkey, sigCB))

TLS

// tls as a scheme will enable secure connections by default. This will also verify the server name.
nc, err := nats.Connect("tls://nats.demo.io:4443")

// If you are using a self-signed certificate, you need to have a tls.Config with RootCAs setup.
// We provide a helper method to make this case easier.
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))

// If the server requires client certificate, there is an helper function for that too:
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
nc, err = nats.Connect("tls://localhost:4443", cert)

// You can also supply a complete tls.Config

certFile := "./configs/certs/client-cert.pem"
keyFile := "./configs/certs/client-key.pem"
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
    t.Fatalf("error parsing X509 certificate/key pair: %v", err)
}

config := &tls.Config{
    ServerName: 	opts.Host,
    Certificates: 	[]tls.Certificate{cert},
    RootCAs:    	pool,
    MinVersion: 	tls.VersionTLS12,
}

nc, err = nats.Connect("nats://localhost:4443", nats.Secure(config))
if err != nil {
	t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
}

Using Go Channels (netchan)

nc, _ := nats.Connect(nats.DefaultURL)
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()

type person struct {
     Name     string
     Address  string
     Age      int
}

recvCh := make(chan *person)
ec.BindRecvChan("hello", recvCh)

sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}

// Send via Go channels
sendCh <- me

// Receive via Go channels
who := <- recvCh

Wildcard Subscriptions

// "*" matches any token, at any level of the subject.
nc.Subscribe("foo.*.baz", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

nc.Subscribe("foo.bar.*", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// ">" matches any length of the tail of a subject, and can only be the last token
// E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
nc.Subscribe("foo.>", func(m *Msg) {
    fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// Matches all of the above
nc.Publish("foo.bar.baz", []byte("Hello World"))

Queue Groups

// All subscriptions with the same queue name will form a queue group.
// Each message will be delivered to only one subscriber per queue group,
// using queuing semantics. You can have as many queue groups as you wish.
// Normal subscribers will continue to work as expected.

nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
  received += 1;
})

Advanced Usage

// Normally, the library will return an error when trying to connect and
// there is no server running. The RetryOnFailedConnect option will set
// the connection in reconnecting state if it failed to connect right away.
nc, err := nats.Connect(nats.DefaultURL,
    nats.RetryOnFailedConnect(true),
    nats.MaxReconnects(10),
    nats.ReconnectWait(time.Second),
    nats.ReconnectHandler(func(_ *nats.Conn) {
        // Note that this will be invoked for the first asynchronous connect.
    }))
if err != nil {
    // Should not return an error even if it can't connect, but you still
    // need to check in case there are some configuration errors.
}

// Flush connection to server, returns when all messages have been processed.
nc.Flush()
fmt.Println("All clear!")

// FlushTimeout specifies a timeout value as well.
err := nc.FlushTimeout(1*time.Second)
if err != nil {
    fmt.Println("All clear!")
} else {
    fmt.Println("Flushed timed out!")
}

// Auto-unsubscribe after MAX_WANTED messages received
const MAX_WANTED = 10
sub, err := nc.Subscribe("foo")
sub.AutoUnsubscribe(MAX_WANTED)

// Multiple connections
nc1 := nats.Connect("nats://host1:4222")
nc2 := nats.Connect("nats://host2:4222")

nc1.Subscribe("foo", func(m *Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

nc2.Publish("foo", []byte("Hello World!"));

Clustered Usage

var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"

nc, err := nats.Connect(servers)

// Optionally set ReconnectWait and MaxReconnect attempts.
// This example means 10 seconds total per backend.
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))

// You can also add some jitter for the reconnection.
// This call will add up to 500 milliseconds for non TLS connections and 2 seconds for TLS connections.
// If not specified, the library defaults to 100 milliseconds and 1 second, respectively.
nc, err = nats.Connect(servers, nats.ReconnectJitter(500*time.Millisecond, 2*time.Second))

// You can also specify a custom reconnect delay handler. If set, the library will invoke it when it has tried
// all URLs in its list. The value returned will be used as the total sleep time, so add your own jitter.
// The library will pass the number of times it went through the whole list.
nc, err = nats.Connect(servers, nats.CustomReconnectDelay(func(attempts int) time.Duration {
    return someBackoffFunction(attempts)
}))

// Optionally disable randomization of the server pool
nc, err = nats.Connect(servers, nats.DontRandomize())

// Setup callbacks to be notified on disconnects, reconnects and connection closed.
nc, err = nats.Connect(servers,
	nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
		fmt.Printf("Got disconnected! Reason: %q\n", err)
	}),
	nats.ReconnectHandler(func(nc *nats.Conn) {
		fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
	}),
	nats.ClosedHandler(func(nc *nats.Conn) {
		fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
	})
)

// When connecting to a mesh of servers with auto-discovery capabilities,
// you may need to provide a username/password or token in order to connect
// to any server in that mesh when authentication is required.
// Instead of providing the credentials in the initial URL, you will use
// new option setters:
nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))

// For token based authentication:
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))

// You can even pass the two at the same time in case one of the server
// in the mesh requires token instead of user name and password.
nc, err = nats.Connect("nats://localhost:4222",
    nats.UserInfo("foo", "bar"),
    nats.Token("S3cretT0ken"))

// Note that if credentials are specified in the initial URLs, they take
// precedence on the credentials specified through the options.
// For instance, in the connect call below, the client library will use
// the user "my" and password "pwd" to connect to localhost:4222, however,
// it will use username "foo" and password "bar" when (re)connecting to
// a different server URL that it got as part of the auto-discovery.
nc, err = nats.Connect("nats://my:[email protected]:4222", nats.UserInfo("foo", "bar"))

Context support (+Go 1.7)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

nc, err := nats.Connect(nats.DefaultURL)

// Request with context
msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))

// Synchronous subscriber with context
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)

// Encoded Request with context
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
type request struct {
	Message string `json:"message"`
}
type response struct {
	Code int `json:"code"`
}
req := &request{Message: "Hello"}
resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)

License

Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.

FOSSA Status

Comments
  • Cannot go get client

    Cannot go get client

    I have a project that is outside of the go src folder, I am running go 1.12.5 on windows amd64 (win10) This is my first attempt to use NATS, not a great start!

    Getting this error:

    go get github.com/nats-io/nats.go/ go: finding github.com/nats-io/nats.go v1.8.0 go: finding github.com/nats-io/nkeys v0.0.2 go: finding github.com/nats-io/nuid v1.0.1 go: finding golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 CreateFile github.com/nats-io/nats.go: The system cannot find the path specified.

    opened by coffeefedora 55
  • Not seeing v2.0.0 for nats-io/nats.go

    Not seeing v2.0.0 for nats-io/nats.go

    $ mkdir mymod
    $ cd mymod
    $ touch go.mod
    $ go get github.com/nats-io/[email protected]
    stat github.com/nats-io/nats.go: no such file or directory
    $ go get github.com/nats-io/nats.go/@latest
    $ cat go.mod
    module mymod
    
    go 1.12
    
    require (
    	github.com/nats-io/go-nats v1.7.2 // indirect
    	github.com/nats-io/nats.go v1.7.2 // indirect
    	github.com/nats-io/nkeys v0.0.2 // indirect
    	github.com/nats-io/nuid v1.0.1 // indirect
    )
    $ 
    

    Shouldn't it be v2.0.0?

    opened by vangent 32
  • Add TokenHandler option to connection options

    Add TokenHandler option to connection options

    This pull request adds TokenFunc to the Options struct. This adds a way to generate a new token every time a connectProto is built and allows the use of expiring tokens like JWTs when authenticating.

    opened by nicholaslam 31
  • Mock NATS server implementation in order to be able to write unit tests

    Mock NATS server implementation in order to be able to write unit tests

    In order for consumers to be able to write unit tests for their subscription logic, they need a way to publish messages. Right now, this is not possible because a mock NATS server implementation has not been provided.

    This leaves the consumer with only one option: bring up an actual NATS server in your test suite. Which effectively means you are writing an integration test now. I want to avoid this as I want to test my service logic without bringing up external dependencies. Unit tests should be deterministic and not rely on any kind of network connectivity (e.g. bringing up NATS server locally would imply a port needs to be free on CI).

    The other option consumers have is to simply just unit test the message handler they pass in to their call to Subscribe(subj string, cb MsgHandler).

    What do you propose? Is there any workaround to this? Ideally, this package would provide some kind of mock NATS server somewhat similar to how opentracing go provides a mocktracer - https://github.com/opentracing/opentracing-go/tree/master/mocktracer so folks can write tests to ensure that their spans are being created as expected.

    opened by aaslamin 26
  • Subject interning

    Subject interning

    @derekcollison Please review. The price of this comparison near 2ns per operation but in case of plain subscriptions it reduces single allocation per operation and results in speedup. Looking forward for your comments. If somehow to know beforehand that subscription using wildcard then it's possible to make it no cost. So I suggest to merge this pull request first, than make some changes to subscription struct to contain isWildcard bool field and then switch to using it. so no need to compare strings just single if isWildcard and nothings else.

    opened by moredure 25
  • message consume too slow

    message consume too slow

    i created a stream in Retention=WorkQueuePolicy, and published 1 million messages. then i start to consume it.

    for {
      msgs, err := sub.Fetch(10000, nats.Context(ctx))
     for _, msg := range msgs {
    	msg.AckSync()
     }
    }
    

    the first batch fetched fast enough, after a few loops, Fetch( ) becomes too slow (> 10 seconds), and the whole batch ack time become too slow also.

    bug 
    opened by carr123 21
  • Clean up logic around max handling to regress

    Clean up logic around max handling to regress

    I'm trying to test changes against my new Java client and believe that this is a little cleaner and fixes some edge race conditions. There are several more and not sure that these changes should be applied until a wider look at delivered/max logic is looked at here...

    opened by camros 19
  • Unsubscribe does not allow consumption of already fetched messages

    Unsubscribe does not allow consumption of already fetched messages

    Perhaps there is a good reason for this, but it appears that Unsubscribe sets it's message channel to nil which is potentially a destructive action.

    Unless there is another way to consume messages that have already been fetched by the client. I would expect the channel to be closed to prevent future writes but to remain available for reads.

    s.unsubscribe()
    ...consume remaining messages in channel
    

    I am working on a PR that addresses this. I am curious though if this was intended behavior and if I am misusing it.

    opened by nickcarenza 19
  • Golang client for NATS web socket

    Golang client for NATS web socket

    nats 2.2 supports binary web sockets which means I can use golang to build wasm based web apps to communicate with NATS server ?

    i was told via the nats.ws repo that the nats.go client may support compiling to wasm .

    Is this true ? It would be rather awesome

    opened by gedw99 18
  • Return error if actual consumer configuration is different

    Return error if actual consumer configuration is different

    Feature Request

    When creating subscriptions I can pass extra options, e.g.:

    sub, err := js.PullSubscribe("TEST.*", "testing", nats.MaxDeliver(3))
    

    The consumer is created in NATS if it didn't exist yet:

    $ nats con info TEST testing
    Information for Consumer TEST > testing created 2021-08-17T11:37:10+02:00
    
    Configuration:
    
            Durable Name: testing
               Pull Mode: true
          Filter Subject: TEST.*
             Deliver All: true
              Ack Policy: Explicit
                Ack Wait: 30s
           Replay Policy: Instant
      Maximum Deliveries: 3
         Max Ack Pending: 20,000
       Max Waiting Pulls: 512
    
    State:
    ...
    

    If I then change the options, e.g.:

    sub, err := js.PullSubscribe("TEST.*", "testing", nats.MaxDeliver(5))
    

    Then the new value is silently ignored and the consumer still has Maximum Deliveries: 3. The options seems to be used only when creating a new consumer, but ignored otherwise (some of them at least).

    While it might not be feasible to transparently update the consumer, I would expect to get an error telling me that the requested options do not match the actual consumer configuration.

    Use Case:

    I can very easily imagine a situation where a developer changes the options (adds a new one, removes one, changes a value) thinking that it will work. For someone unfamiliar with NATS JS it's not obvious that some consumer object exists in NATS itself and that these options are not just client-side connection options. It's easy for the code to diverge from the actual configuration.

    Proposed Change:

    Compare the actual consumer configuration with the requested one an return an error if there's a mismatch.

    It seems that the existing configuration is already read (the info variable), so this shouldn't affect performance.

    Who Benefits From The Change(s)?

    Unwitting developers trying to change the consumer configuration.

    Alternative Approaches

    Update the consumer configuration to match the requested one.

    feature 
    opened by kszafran 18
  • Panic when publishing

    Panic when publishing

    I saw this by accident and have no idea really why. Just thought I would drop it here.

    panic: sync: WaitGroup is reused before previous Wait has returned
    
    goroutine 249599 [running]:
    panic(0x7eba60, 0xc83d589f70)
        /usr/local/go/src/runtime/panic.go:481 +0x3e6
    sync.(*WaitGroup).Wait(0xc8200598e8)
        /usr/local/go/src/sync/waitgroup.go:129 +0x114
    github.com/nats-io/nats.(*Conn).waitForExits(0xc820059800)
        /data/jenkins/jobs/tgm_ng deploy/workspace/vendor/src/github.com/nats-io/nats/nats.go:733 +0x6a
    github.com/nats-io/nats.(*Conn).doReconnect(0xc820059800)
        /data/jenkins/jobs/tgm_ng deploy/workspace/vendor/src/github.com/nats-io/nats/nats.go:1084 +0x25
    created by github.com/nats-io/nats.(*Conn).processOpErr
        /data/jenkins/jobs/tgm_ng deploy/workspace/vendor/src/github.com/nats-io/nats/nats.go:1232 +0x25f
    
    opened by dahankzter 18
  • Mirror or Sourcing to different domain made easier

    Mirror or Sourcing to different domain made easier

    This is possible today but not intuitive using the APIPrefix part of the ExternalStream.

    For instance if we have a leafnode and it wants to mirror or source from a stream in the hub system with domain "cloud", we can do this as follows.

    cfg.Sources = []*nats.StreamSource{
    	{
    		Name:          HubStreamName,
    		External:      &nats.ExternalStream{APIPrefix: fmt.Sprintf("$JS.%s.API", "cloud")},
    	},
    }
    

    It might be nice if client side (no changes needed to server) that we directly supported domain and stitched together the APIPrefix for the user.

    cfg.Sources = []*nats.StreamSource{
    	{
    		Name:    HubStreamName,
    		Domain: "cloud",
    	},
    }
    

    This would not be a direct part of the JSON and would simply create the External entry from above. Would need to do some checking client side if both Domain and External are set.

    feature 
    opened by derekcollison 0
  • Add support for proxy configuration for websocket connection

    Add support for proxy configuration for websocket connection

    Feature Request

    Add support for the HTTP_PROXY, HTTPS_PROXY and NO_PROXY environment variables, same as the http package does: https://pkg.go.dev/net/http#ProxyFromEnvironment.

    Use Case:

    Proposed Change:

    Who Benefits From The Change(s)?

    Anyone who wants to run a NATS client behind a proxy server.

    Alternative Approaches

    Allowing the user to set the proxy configuration via one or more options. (nats.Option)

    feature 
    opened by chen-shmilovich-sysdig 2
  • `PublishAsyncComplete` can signal completion before all pending futures are resolved

    `PublishAsyncComplete` can signal completion before all pending futures are resolved

    Defect

    Make sure that these boxes are checked before submitting your issue -- thank you!

    • [x] Included nats.go version
    • [x] Included a [Minimal, Complete, and Verifiable example] (https://stackoverflow.com/help/mcve)

    Versions of nats.go and the nats-server if one was involved:

    nats.go v1.16.1-0.20220810192301-fb5ca2cbc995 nats-server HEAD fdbee9b

    OS/Container environment:

    Darwin/arm64

    Steps or code to reproduce the issue:

    This test is available as PR: https://github.com/nats-io/nats-server/pull/3426

    func TestJetStreamPubAckFutureComplete(t *testing.T) {
    	s := RunBasicJetStreamServer()
    	if config := s.JetStreamConfig(); config != nil {
    		defer removeDir(t, config.StoreDir)
    	}
    	defer s.Shutdown()
    
    	nc, js := jsClientConnect(t, s)
    	defer nc.Close()
    
    	if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Storage: nats.MemoryStorage}); err != nil {
    		t.Fatalf("Unexpected error: %v", err)
    	}
    
    	toSend := 1_000_000
    	pending := make([]nats.PubAckFuture, 0, toSend)
    	for i := 0; i < toSend; i++ {
    		pubAckFuture, err := js.PublishAsync("foo", []byte("OK"))
    		if err != nil {
    			t.Fatalf("Failed to publish: %v", err)
    		}
    		pending = append(pending, pubAckFuture)
    	}
    	<-js.PublishAsyncComplete()
    	for i, pubAckFuture := range pending {
    		select {
    		case <-pubAckFuture.Ok():
    		case err := <-pubAckFuture.Err():
    			t.Fatalf("Failed to publish: %v", err)
    		default:
    			t.Fatalf("PubAck %d still pending after publish completed", i)
    		}
    	}
    }
    

    This works most of the time, error chance is about 3-5% on my machine. Run with: go test --run TestJetStreamPubAckFutureComplete -count 100 -v, and probably you'll see at least a few failures.

    === RUN   TestJetStreamPubAckFutureComplete
    --- PASS: TestJetStreamPubAckFutureComplete (1.62s)
    === RUN   TestJetStreamPubAckFutureComplete
    --- PASS: TestJetStreamPubAckFutureComplete (1.62s)
    === RUN   TestJetStreamPubAckFutureComplete
    --- PASS: TestJetStreamPubAckFutureComplete (1.62s)
    === RUN   TestJetStreamPubAckFutureComplete
        jetstream_test.go:4609: PubAck 506721 still pending after publish completed
    --- FAIL: TestJetStreamPubAckFutureComplete (1.53s)
    === RUN   TestJetStreamPubAckFutureComplete
        jetstream_test.go:4609: PubAck 440322 still pending after publish completed
    --- FAIL: TestJetStreamPubAckFutureComplete (1.52s)
    === RUN   TestJetStreamPubAckFutureComplete
    --- PASS: TestJetStreamPubAckFutureComplete (1.62s)
    

    Expected result:

    After <-js.PublishAsyncComplete() all PubAckFuture are resolved (success or failure)

    Actual result:

    After <-js.PublishAsyncComplete() some PubAckFuture are still pending. Even giving them time to complete does not help.

    bug 
    opened by mprimi 3
  • Ordered consumer subscription can become invalid

    Ordered consumer subscription can become invalid

    Defect

    Ordered consumer subscription can become invalid

    This bug is unlikely to manifest in normal circumstances. But it happens almost certainly in presence of failures (e.g., bouncing the cluster that hosts the stream).

    The sequence of events is:

    • Ordered subscription delivers some messages
    • Ordered subscription resets the internal ephemeral consumer
    • During reset, the nc.Request call (js.go:1956) fails (due to cluster being down)
    • Reset fails without recourse
    • The subscription is now in an unrecoverable state

    From this moment on, all calls to NextMsg() will return ErrBadSubscription. The application using this consumer is stuck.


    Make sure that these boxes are checked before submitting your issue -- thank you!

    • [x] Included nats.go version
    • [x] Included a [Minimal, Complete, and Verifiable example] (https://stackoverflow.com/help/mcve)

    Versions of nats.go and the nats-server if one was involved:

    nats.go: v1.16.0 nats-server: 2.9.0-beta.20 (probably not relevant)

    OS/Container environment:

    Not relevant, but reproduces on macOS and Linux

    Steps or code to reproduce the issue:

    You can reproduce this behavior by running TestJetStreamChaosConsumerOrdered from: https://github.com/nats-io/nats-server/pull/3334

    Expected result:

    Ordered consumer subscription delivers value in stream sequence order

    Actual result:

    Ordered consumer subscription can get stuck: any calls to NextMsg() will return ErrBadSubscription.

    bug 
    opened by mprimi 2
  • Partial PUT and Get objectstore

    Partial PUT and Get objectstore

    Feature Request

    • append object with slice byte
    • watch / get object with total byte

    Use Case:

    Proposed Change:

    stream.PutBytes("key",[]byte{},ObjectOpt.Append, ObjectOpt.Any) stream.PutBytes("key",[]byte{},ObjectOpt.Append, ObjectOpt.EOF) stream.GetBytes("key",ObjectOpt.From(0),ObjectOpt.Total(3200));

    type ObjectInfo struct { ObjectMeta Bucket string json:"bucket" NUID string json:"nuid" Size uint64 json:"size" ModTime time.Time json:"mtime" Chunks uint32 json:"chunks" Digest string json:"digest,omitempty" Deleted bool json:"deleted,omitempty" EOF bool `json:"eof" }

    Who Benefits From The Change(s)?

    anyone need streaming method with minimum latensi we can get or put with small byte, prevent failed upload a huge object because of poor connection. client can get partial byte before producer compleate send object

    Alternative Approaches

    feature 
    opened by Qhodok 0
Releases(v1.17.0)
  • v1.17.0(Sep 16, 2022)

    Changelog

    IMPORTANT

    This release uses a new consumer create API when interacting with nats-server version 2.9.0 or higher. This changes the subjects used by the client to create consumers, which might in some cases require changes in access and import/export configuration. To opt out of this feature, use UseLegacyDurableConsumers() option when creating JetStreamContext.

    Added

    • JetStream:

      • Support for enhanced stream purge. PurgeStream() now accepts StreamPurgeRequest as an option, allowing partial purge by subject sequence number or keeping selected number of messages (#988)
      • Option to fetch DeletedDetails when fetching StreamInfo. StreamInfo() now accepts StreamInfoRequest allowing to pass DeletedDetails flag (#990)
      • Option to report subjects on StreamInfo() request. StreamInfo() now accepts StreamInfoRequest allowing to pass SubjectsFilter value (#1010)
      • Support AllowDirect in stream configuration, enabling faster access to individual messages on a stream (#991)
      • Support for DirectGet API in GetMsg() through DirectGet() and DirectGetNext() options (#1020, #1030)
      • HeadersOnly option to RePublish field on stream configuration and change struct name to RePublish (#991)
      • SecureDeleteMsg() method to securely delete and overwrite a message on a stream (#1025)
      • MaxRequestMaxBytes() PullConsumer() option allowing setting the maximum number of bytes a single Fetch()can receive (#1043)
      • Filter streams and stream names by subject in StreamsInfo() and StreamNames() using StreamListFilter() option (#1062)
      • Accept AckAll for pull consumers. Thanks to @neilalexander for the contribution (#1063)
      • Support for setting consumer replicas through Subscribe() options. Thanks to @goku321 for the contribution (#1019)
      • Support for setting memory storage on consumer with ConsumerMemoryStorage() option in Subscribe(). Thanks to @goku321 for the contribution (#1078)
    • KV:

      • RePublish option on key value configuration (#1031)
    • ObjectStore:

      • ObjectStores() and ObjectStoreNames() methods for listing object store buckets (#1074)
    • TLSConnectionState() to expose TLS connection state (#996)

    • UserJWTAndSeed helper function accepting JWT and seed as parameters (#1046)

    • natsProtoErr type for proto error normalization and comparison using errors.Is() (#1082)

    Improved

    • JetStream

      • Add JetStreamError type for all JetStream related errors, containing error codes (for API errors). JetStreamError supports comparing and unwrapping errors using native errors package (#1044, #1047)
      • Force Subscribe() to use memory storage and no replicas when using OrderedConsumer() (#989)
      • Consistent error value of context timeout when using Fetch() on pull subscription. Thanks to @wdhongtw for the contribution (#1011)
      • Add additional note to PullSubscribe() on durable semantics (#994)
    • KV:

      • Utilize DirectGet() in KV for improved performence (#1020)
    • Add support for reporting flusher errors. THanks to @GeorgeEngland for the contribution (#1015)

    • Mention field defaults in Options struct documentation. Thanks to @costela for the contribution (#1013)

    Changed

    • JetStream:
      • DeleteMsg() now uses NoErase option by default, not overwriting the message on stream (only marking it as deleted) (#1025)
      • StreamInfo() will now return all subjects when requested (#1072)

    Updated

    • JetStream:

      • [BREAKING] Use new consumer create API when interacting with nats-server version 2.9.0 or higher. This changes the subjects used by the client to create consumers, which might in some cases require changes in access and import/export configuration. To opt out of this feature, use UseLegacyDurableConsumers() option when creating JetStreamContext (#1080)
      • Add missing fields to AccountInfo schema (#1026)
      • Align StreamSourceInfo schema with server (#1039)
    • Use nats-server 2.9.0 in tests (#1073)

    • Add new test TLS certs and run tests for go 1.18 in CI (#1023, #1055)

    Fixed

    • JetStream

      • Subscribe() automatically sending ACK when AckPolicyNone is set (#987)
      • Return error when attempting to ACK a message on a AckNone consumer (#1032)
      • Use native time.Time.Equal method for equality check when comparing consumer configs (#993)
      • Ephemeral PullConsumer's Fetch() failing with "no responders" (#1022)
      • ConsumerInfo nil pointer dereference when jsi is not initialized. Thanks to @Sergey-Belyakov for the contribution (#1024)
      • Paging in stream and consumer name listing (#1060)
    • ObjectStore

      • Update object Put() to avoid loosing last chunk when Reader returns both value and EOF. Thanks to @tinou98 for the contribution (#995)
      • Invalid digest decoding on object Get(), not propagating errors from Get() to the user (#1052)
      • Allow updating meta if new name exists but is deleted (#1053)
      • Disallow adding links in Put() object meta (#1057)
    • Typo in CustomInboxPrefix() error message. Thanks to @subtle-byte for the contribution (#1028)

    • Ignore trailing comma at the end of URL lists (#1058)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.16.0...v1.17.0

    Source code(tar.gz)
    Source code(zip)
  • v1.16.0(May 26, 2022)

    Changelog

    Added

    • JetStream:
      • Experimental: StreamConfig.RePublish configuration, which is a SubjectMapping (source/destination) which allows the republish of a message after being sequenced and stored (#980)
      • Experimental: Two new ConsumerConfig fields: Replicas and MemoryStorage which are generally inherited by parent stream, but can be configured directly (#980)
    • Websocket:
      • ProxyPath() option to add a path to websocket connection URLs. Thanks to @ido-gold-apolicy for the contribution (#974)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.15.0...v1.16.0

    Source code(tar.gz)
    Source code(zip)
  • v1.15.0(May 4, 2022)

    Changelog

    Experimental

    The ObjectStore is still experimental and subject to change.

    Added

    • ObjectStore:
      • MaxBytes in the ObjectStoreConfig structure (#955)

    Improved

    • JetStream:
      • Ability pass 0 to nats.ExpectLastSequence(0) and nats.ExpectLastSequencePerSubject(0) to the js.Publish() call. Previously, the value 0 would be ignored and the header would not be set. Note that currently, the server only accepts 0 as a valid value for nats.ExpectLastSequencePerSubject(). Thanks to @bruth for the contribution (#958)

    Fixed

    • JetStream:
      • A PullConsumer's Fetch() method with a batch greater than 1 and with other pull consumers running, may timeout although messages would have been available (#967)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.14.0...v1.15.0

    Source code(tar.gz)
    Source code(zip)
  • v1.14.0(Apr 8, 2022)

    Changelog

    JetStream and KeyValue users

    Please see the "Changed" section for important (possibly breaking) changes compared to the previous version.

    Experimental

    The ObjectStore is still experimental and subject to change.

    Added

    • JetStream:
      • Support for tracing API calls (#849, #911)
      • Error ErrMsgAlreadyAckd for a more detailed error when trying to acknowledge a message that already was. Thanks to @Berreek for the contribution (#862)
      • Made js.Subscribe() calls context aware (#872)
      • UpdateConsumer() and new consumer configuration options: MaxRequestBatch, MaxRequestExpires and InactiveThreshold (#893)
      • NakWithDelay() new acknowledgment call, and BackOff list of duration in the ConsumerConfig object (#894)
      • BackOff() subscription option. Thanks to @mfaizanse for the contribution (#933)
      • StallWait() publish option. This is the wait time (default is 200ms) that the library will wait when reaching the maximum inflight number of asynchronous publish calls (#941)
    • KeyValue:
      • Status functionality (#845)
      • MetaOnly() watcher option (#854)
      • GetRevision() to get the key at a specific revision, or ErrKeyNotFound if the key does not exist or not at the expected revision. Thanks to @boxboatmatt for the contribution (#903)
      • Placement in the KeyValueConfig structure (#929)
      • Expose nats.Context() option for the nats.KeyWatcher interface. Thanks to @boxboatmatt for the contribution (#904)
    • ObjectStore:
      • Status functionality (#845)
      • Placement in the ObjectStoreConfig structure (#929)
    • ConnectedUrlRedacted() a redacted version of ConnectedUrl() (#923)

    Changed

    • JetStream:
      • The library no longer sets a default MaxAckPending when creating a JetStream consumer on Subscribe(). The selection of the value is left for the server to pick (#920)
      • The library will now try to resend a message when getting a ErrNoResponders error on a Publish() or StreamInfo() call. This is to overcome "blips" that may happen during leader changes. The action will be retried up to 2 times with a 250ms wait in between. These can be changed with the new publish options RetryWait() and RetryAttempts() (#930)
      • PublishMsgAsync() will now be limited to 4,000 maximum asynchronous publish calls inflight, if no maximum has been specified with PublishAsyncMaxPending() option.
    • KeyValue:
      • Delete() and Purge() now accept optional DeleteOpt options. The option available is LastRevision() which allows the purge or delete to be conditional to the last revision be equal to the last revision value, otherwise the operation will fail. Thanks to @steveh for the contribution (#856)
      • PurgeDeletes() will now keep markers that are less than 30 minutes by default. Also, PurgeDeletes() now accepts optional PurgeOpt values, not WatchOpt. The new DeleteMarkersOlderThan() option can be provided to change the default 30 minutes. A negative value will remove markers, regardless of their age (#906)
      • When connecting to a v2.7.2+ server, the stream for the KeyValue should be created with DiscardNew instead of DiscardOld. The library will now automatically update an existing stream for a KeyValue from DiscardOld to DiscardNew (#917)

    Improved

    • Websocket:
      • Use 80 or 443 as default ports, depending on the scheme ws:// or wss://. Thanks to @cbrake for the suggestion (#879)
    • The connect failure error message when given an invalid user credentials file (#916)
    • The library will now auto-reconnect when the connection is closed on maximum connections reached from server, which could happen after a configuration reload. The library would previously have caused the connection to be permanently closed (#935)

    Updated

    • Examples:
      • The Nats-echo service example to simulate a status request (#950)
      • Comment for the demo servers. Removed the TLS specific version since one can connect with TLS or not to the same port (#952)

    Fixed

    • Documentation:
      • Typo in Bind go documentation. Thanks to @caspervonb for the contribution (#860)
      • Typo in SetClosedHandler. Thanks to @tormoder for the contribution (#877)
      • Typo in example_test.go. Thanks to @bvwells for the contribution (#882)
      • Comment for Subscribe method. Thanks to @ipromax for the contribution (#886)
      • Many API calls where not checking that stream and consumer names were valid, that is, did not contain a . in their name. This resulted in situations where the API would timeout because the server did not have interest on the malformed subject. Thanks to @sata-form3 for the report (#947)
    • JetStream:
      • Ordered consumers handling of auto unsubscribe (#847)
      • Activity check to handle cases when subscription was closed. Thanks to @boxboatmatt for the contribution (#873)
      • Return ErrStreamNotFound when calling AddConsumer against a missing stream (#881)
      • Prefix the error returned by StreamInfo() with nats: to match ConsumerInfo() (#928)
    • KeyValue:
      • Ensure Get() returns a nil and ErrKeyNotFound as per specification (#844)
      • Various issues, such as cancel of the context would not all the range on w.Updates() to exit, flow control, etc.. (#900, #901)
      • Use of the APIPrefix to work correctly across accounts (#910)
    • Websocket:
      • When using secure connection wss:// and a host name that resolves to multiple IPs, or when trying to reconnect to discovered servers, the (re)connection would fail with websocket invalid connection (#915)
      • Deadlock on authentication failure that manifested by a Connect() hanging forever. Thanks to @wenerme for the report (#926)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.13.0...v1.14.0

    Source code(tar.gz)
    Source code(zip)
  • v1.13.0(Oct 7, 2021)

    Changelog

    JetStream users

    Please review release notes from v1.12.0 regarding important changes if upgrading from an earlier release.

    Experimental

    This release introduces KeyValue and ObjectStore as experimental features. Note that their APIs are subject to change based on community feedback. Also, some features will not work unless using NATS Server from the main branch, or the version following the latest public release v2.6.1.

    Added

    • JetStream:
      • HeadersOnly() subscription option to only deliver headers but not the message payloads (#832)
      • Sealed, DenyDelete, DenyPurge and AllowRollup stream configuration options (#832)
      • GetLastMsg() retrieves the last raw stream message stored in JetStream by subject (#832)
    • KeyValue and ObjectStore (#832)
    • ConnectedServerVersion() returns the server's version string, or empty if not currently connected to a server (#832)

    Fixed

    • JetStream:
      • Flow control may stall in some conditions (#837)
      • Context usage for Fetch() and Ack(). Thanks to @andreib1 and @T-J-L for the reports (#838)
      • Queue name cannot contain "." character when used as the durable name. Thanks to @saschahagedorn-f3 for the report (#841)
      • PublishMsgAsync would fail if a message reply was already set (#832)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.12.2...v1.13.0

    Source code(tar.gz)
    Source code(zip)
  • v1.12.3(Sep 21, 2021)

    Changelog

    JetStream users

    Please review release notes from v1.12.0 regarding important changes if upgrading from an earlier release.

    Fixed

    • JetStream:
      • Received message may have wrong subject. This is a regression due to an attempt to reduce subject string copy in v1.12.2 (#827)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.12.2...v1.12.3

    Source code(tar.gz)
    Source code(zip)
  • v1.12.2(Sep 20, 2021)

    Changelog

    JetStream users

    Please review release notes from v1.12.0 regarding important changes if upgrading from an earlier release.

    Updated

    • JetStream:
      • Go doc for subscription calls in the interface (#818)

    Improved

    • Reduce memory allocation for inbound messages. Thanks to @moredure for the contribution (#824)

    Fixed

    • JetStream:
      • Unblock Pull Subscribe requests on a 408 with at least a message already fetched (#823)
    • Websocket:
      • Possible panic when decoding CLOSE frame. Thanks to @byazrail for the report (#821)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.12.1...v1.12.2

    Source code(tar.gz)
    Source code(zip)
  • v1.12.1(Sep 3, 2021)

    Changelog

    JetStream users

    Please review release notes from v1.12.0 regarding important changes if upgrading from an earlier release.

    Added

    • Stringer for connection's Status(). Thanks to @JosephWoodward for the contribution (#812)

    Fixed

    • JetStream:
      • Fetch() could return immediately with a timeout error. Thanks to @izarraga for the report (#813)
    • Inboxes suffix would contain many zeros (#808)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.12.0...v1.12.1

    Source code(tar.gz)
    Source code(zip)
  • v1.12.0(Aug 26, 2021)

    Changelog

    Breaking Changes

    This release has some important and breaking changes for the JetStream module. Previously, it was possible to create multiple instances of non queue subscriptions to the same JetStream consumer, which was wrong since each instance would get a copy of the same message and one instance acknowledging a message would mean that the other instance's message acknowledgement (or lack thereof) would be ignored. It was also possible to create queue groups against a JetStream consumer that was used by a non queue subscription.

    This update requires the upcoming server version v2.4.0 to behave correctly, that is, the library will reject a "plain" subscription on a JetStream consumer that is already bound (that is, there is already an instance actively consuming from it), or on a consumer that was created for a queue group. It will also reject a queue subscription on a JetStream consumer that was not created for a queue group, or to a consumer that has been created for a different queue group. But it means that this update will not be able to create a queue subscription on a server pre v2.4.0 because those server do not have the concept of DeliverGroup in the consumer configuration.

    Look at the Changed section below for the list of those changes.

    The repository master branch has been renamed main. If you have a fork or a clone of the repository, you should run those git commands:

    git branch -m master main
    git fetch origin
    git branch -u origin/main main
    git remote set-head origin -a
    

    Added

    • JetStream:
      • Bind() and BindStream() options to the subscribe calls (#740)
      • ChanQueueSubscribe() (#744)
      • APIPrefix() and Domain() options to specify prefix or domain. Thanks to @Jarema for the contribution (#750, #753)
      • Two new sentinel errors: ErrStreamNotFound and ErrConsumerNotFound. Thanks to @actatum for the contribution (#760)
      • MaxMsgsPerSubject option in the StreamConfig (#768)
      • OrderedConsumer() subscription option to create a FIFO ephemeral consumer for in order delivery of messages. There are no redelivered and no ACKs, and flow control and heartbeats will be added but be taken care of without additional user code (#789, #793)
      • DeliverSubject() option to configure the deliver subject of a JetStream consumer created by the js.Subscribe() call (and variants) (#794)
      • Fields DeliverGroup in ConsumerConfig, PushBound in ConsumerInfo. They help making prevent incorrect subscriptions to JetStream consumers (#794)
      • Field Description in StreamConfig and ConsumerConfig (#795)
      • ExpectLastSequencePerSubject() publish option (#797)
      • DeliverLastPerSubject() subscribe option (#798)
    • CustomInboxPrefix connection option to set the custom prefix instead of _INBOX. (#767)

    Changed

    • JetStream:
      • Conn.JetStream() no longer looks up account information (#739)
      • With a PullSubscription, calling NextMsg() or NextMsgWithContext() will now return ErrTypeSubscription. You must use the Fetch() API (#794)
      • If the library created internally a JetStream consumer, the consumer will be deleted on Unsubscribe() or when the Drain() completes (#794)
      • Fail multiple instances of a subscription on the same durable push consumer (only one active at a time). Also, consumers now have the concept of DeliverGroup, which is the queue group name they are created for. Only queue members from the same group can attach to this consumer, and a non queue subscription cannot attach to it. Note that this requires server v2.4.0 (#794)
      • Attempting to create a queue subscription with a consumer configuration that has idle heartbeats and/or flow control will now result in an error (#794)
      • ConsumerInfo's fields Delivered and AckFloor are now SequenceInfo objects that include the last activity (in UTC time) (#802)

    Improved

    • Avoid unnecessary data copy in some situations. Thanks to @Endeavourken for the contribution (#788)
    • JetStream:
      • js.PullSubscribe() implementation that reduces the number of internal subscriptions being created/auto-unsubscribed (#791)
      • Subscribe calls will now return an error if the consumer configuration specified does not match the current consumer's configuration. Thanks to @kszafran for the suggestion (#803)

    Fixed

    • JetStream:
      • Possible lock inversion (#794)
      • JetStream consumers could be incorrectly deleted on subscription's Unsubscribe() (#794)
    • Removed unused code. Thanks to @rutgerbrf for the contribution (#737)
    • Misspells in go doc. Thanks to @dtest11 for the contributions (#758, #759)
    • Websocket:
      • Decompression of continuation frames (#755)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.11.0...v1.12.0

    Source code(tar.gz)
    Source code(zip)
  • v1.11.0(May 4, 2021)

    Changelog

    Added

    • Message Headers support
      • Conn.HeadersSupported() returns a boolean indicating if the server, the client is currently connected to, supports headers (#582)
    • JetStream support
    • Websocket support (#719)
    • LameDuckModeHandler to be notified when the server the application is connected to enters lame-duck mode (#572)
    • RequestMsg(), RequestMsgWithContext() and RespondMsg() APIs, which makes it possible to use message headers with requests and replies (#574)
    • RetryOnFailedConnect() option, which means that the Connect() calls will not fail right away if servers are not running at the first attempt to connect. Instead, the library will behave as if it was just disconnected. Thanks to @tomwilkie, @Zambiorix, @kekoav, @mnarrell, @serajam and @hbobenicio for their feedback (#581)
    • Support for "no responders" error on requests (#576)
    • Examples:
      • TLS options in nats-pub and mats-sub examples. Thanks @egodigitus for the contribution (#615)
      • --nkey flat to enable NKey authentication (#728)

    Changed

    • ErrNoResponders is now returned for a request call for which there is no application servicing the request's subject (when connecting to a server that supports the feature). Applications checking for ErrTimeout only as a "normal" error would need to be updated (#576)

    Updated

    • Dependencies (#573)
    • Added a go_test.mod file to limit tests dependencies versus library dependencies (#705)

    Fixed

    • Comment for PublishRequest API. Thanks to @hasheddan for the contribution (#570)
    • Comment for encoded connection's Handler. Thanks to @aym-v for the contribution (#590)
    • Possible data race between an AutoUnsubscribe() and the routine delivering messages (#726)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.10.0...v1.11.0

    Source code(tar.gz)
    Source code(zip)
  • v1.10.0(May 14, 2020)

    Changelog

    Added

    • Conn.GetClientIP() returns the client IP as known by the server. Works if connected to server 2.1.6+ (#559)
    • Conn.RTT() calculates the round trip time between this client and the server (#559)

    Updated

    • Protobuf import for encoder (#557)
    • Change default of Conn.Flush() from 60 to 10 seconds (#561)
    • Bump default MAX_CONTROL_LINE to 4096 (#567)

    Fixed

    • Possible panic on connect if discovered server list shrinks in that process. Thanks to @kirill256 for the contribution (#550)
    • Conn.Request() with UseOldRequest option was not returning on connection Conn.Close() (#558)
    • Added jitter to the reconnection logic (#564)
    • Randomize discovered server URLs (unless NoRandomize() option is set) (#566)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.9.2...v1.10.0

    Source code(tar.gz)
    Source code(zip)
  • v1.9.2(Mar 30, 2020)

    Changelog

    Added

    • Option to set the reply subject in nats-pub example (#539)
    • The queue group name in nats-qsub example (#541)

    Improved

    • Reduced lock contention for connections receiving their own messages (#529)

    Updated

    • Dependencies, covering the golang crypto package CVE. The CVE mentions the ssh package, which this library does not use, so NATS Go client should not be affected. Thanks to @KauzClay for the contribution (#548)

    Fixed

    • Do not use sync.Once.Do() in connection's first request. Thanks to @hubinix for the report (#538)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.9.1...v1.9.2

    Source code(tar.gz)
    Source code(zip)
  • v1.9.1(Oct 31, 2019)

    Changelog

    Fixed

    • In preparation for JetStream and support of subject rewriting, a bug was introduced that would cause multiple deliveries when before the delivery should have been only one per response. This affects only v1.9.0 (#528)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.9.0...v1.9.1

    Source code(tar.gz)
    Source code(zip)
  • v1.9.0(Oct 30, 2019)

    Changelog

    With the release of Go 1.13.3, the go get github.com/nats-io/nats.go is now working!

    Added

    • Notes in README on using go get and Go modules (#481)
    • NoCallbacksAfterClientClose() connection option to prevent invocation of connection callback on explicit Close() (#514)

    Updated

    • Examples (log error, use Drain(), etc..)
    • Replaced use of DisconnectHandler with DisconnectErrHandler in our examples. Thanks to @kaxap for the contribution (#504)

    Improved

    • When creating a subscription, error will now be reported if using a bad subject or queue name (#500)
    • Support for ~ when providing the user credentials file(s) (#512)

    Fixed

    • Document issues. Thanks to @JensRantil for the report.
    • Some errors in code examples in the README. Thanks to @thylong for the contribution (#507)
    • Allow synchronous subscriptions to use msg.Respond() when `AutoUnsubscribe() is used (#489)
    • Wrong error returned in NextMsg() and NextMsgWithContext(). Thanks to @ekle for the report (#492)
    • Handling of expiration and auth errors on reconnect (#499, #506)
    • Data race between processMsg() and Stats(). Thanks to @Will2817 for the report (#521)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.8.1...v1.9.0

    Source code(tar.gz)
    Source code(zip)
  • v1.8.1(Oct 3, 2019)

    Changelog

    Updated

    • NATS Server import paths in tests (include the /v2 token) (#479)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.8.0...v1.8.1

    Source code(tar.gz)
    Source code(zip)
  • v1.8.0(Jun 4, 2019)

    Changelog

    Some of you may have noticed that there was temporarily a v2.0.0 tag. We wanted to align the client version to the upcoming NATS Server 2.0 release. However, there was no backward compatibility breaking changes in the client and the go.mod rules for a v2 would have force us, and users, to add v2 to the import paths. This would not be justified since, again, there are no breaking changes.

    So we have removed the tag and instead released v1.8.0. We are deeply sorry for the inconvenience but we hope that you will understand why we did this.

    Changed

    • The repository has been renamed to nats.go. When doing a go get github.com/nats-io/nats.go/, make sure to include the trailing / to avoid such error: stat github.com/nats-io/nats.go: no such file or directory.
    • The default URL (nats.DefaultURL) has been changed from nats://localhost:4222 to nats://127.0.0.1:4222 (#460)

    Added

    • ConnErrHandler: handler which can be invoked when a disconnect event occurs. Unlike ConnHandler, you can get the error that caused the disconnect event. Thanks to @mkorolyov for the contribution (#462, #464)
    • Message.Respond() to be able to conveniently reply to a request message from the message handler (#472)

    Deprecated

    • DisconnectedCB, DisconnectHandler: Use DisconnectedErrCB,DisconnectErrHandler instead (#462, #464)

    Improved

    • Some cleanup with use of RWMutex, removal of un-needed defer, etc... Thanks to @MaruHyl for the contributions (#434, #437, #438)
    • Refactor some of the examples. Thanks to @andyxning for the contribution (#440)

    Fixed

    • Proper randomization of IPs resolved from a hostname prior to dial (#445)
    • Misleading TLS error report due to credential file errors (#446)
    • Race on synchronous subscription type setting (#447)
    • Examples would not exit after printing usage, possibly causing panic (#456, #465)
    • Some typos. Thanks to @0xflotus for the contribution (#471)

    Complete Changes

    https://github.com/nats-io/nats.go/compare/v1.7.2...v1.8.0

    Source code(tar.gz)
    Source code(zip)
  • v1.7.2(Feb 21, 2019)

    Changelog

    Added

    • Ability to set MaxPingsOutstanding as an Option (#414)
    • Conn.ConnectedAddr() (#426)
    • FlushWithContext() (#433)

    Updated

    • Handle host names that resolve to more than one IP (#417)
    • Disable automatic TLS skip verify (#420)
    • Examples and Benchmarks can use new credentials based authentication and authorization (#419)
    • Smarter kickFlusher behavior (#429)
    • Smarter processing on NextMsg() when a message already available (#432)

    Improved

    • Better handling of TLS errors. Thanks to @brianshannan for the contribution (#418)
    • Updates to samples in docs (#422, #421)
    • Uses staticcheck now vs megacheck

    Fixed

    • Maintain string case for async errors from the server (#415)

    Complete Changes

    https://github.com/nats-io/go-nats/compare/v1.7.0...v1.7.2

    Source code(tar.gz)
    Source code(zip)
  • v1.7.0(Dec 11, 2018)

    Changelog

    Added

    • Ability to retrieve CID from server the client is currently connected to (#395)
    • NKey support (#399)
    • TokenHandler option. Thanks to @nicholaslam for the contribution (#405)
    • Support for User JWTs (#408)
    • FlusherTimeout option to allow all writes to honor the deadline. Thanks to @gwik for the contribution (#393)

    Updated

    • Examples moved to separate directories so they can be "go install"'ed (#406)
    • Allow expressed url parameters to influence implicit servers (#409, #410)
    • Some updates for NGS support (#400)

    Improved

    • Faster and smaller reply subjects (#402)

    Fixed

    • Set connection's last error if async error occurs during Drain (#392)

    Complete Changes

    https://github.com/nats-io/go-nats/compare/v1.6.0...v1.7.0

    Source code(tar.gz)
    Source code(zip)
  • v1.6.0(Aug 29, 2018)

    Changelog

    Added

    • PingInterval() option. Thanks to @karpovicham for the contribution (#361)
    • Ability to suppress receiving messages published by own connection, aka NoEcho() option. Requires server 1.2.0+ (#375)
    • Ability to drain subscriptions and/or connections (#378, #380)

    Improved

    • Simpler URLs. It is now possible to simply pass hostname without scheme nor port (which defaults to 4222). That is, nats.Connect("myhost") is now equivalent to nats.Connect("nats://myhost:4222"). Also switches automatically to secure connection (TLS) if server requests it (#381)

    Fixed

    • Possible deadlock in asynchronous connection callbacks. Thanks to @teh-cmc for the contribution (#365, #369)
    • Close() may not release resources immediately. Thanks to @mjgarton for the report (#370)
    • Benchmark tool (nats-bench) now starts subscriber's timer when getting the first message (#379)
    • API Documentation related to creation of various subscription types. Thanks to @nussjustin for the report (#383)
    • Panic if passing a nil option to nats.Connect(). They will now be ignored (#385)
    • RequestWithContext or NextMsgWithContext could return a message although the context was canceled or timed-out (#387)

    Complete Changes

    https://github.com/nats-io/go-nats/compare/v1.5.0...v1.6.0

    Source code(tar.gz)
    Source code(zip)
  • v1.5.0(Mar 23, 2018)

    Changelog

    Added

    • Conn.Barrier() API. This is an advance API that can be useful in some specific situations. Thanks to @nussjustin for reporting an issue during development of this feature (#338, #346)
    • ReconnectBufSize() option setter. Thanks to @ripienaar (#340)

    Improved

    • Reduce memory usage during reconnect. Thanks to @charlievieth (#341)
    • No need for regex in Connect(). Thanks to @charlievieth (#342)
    • List of servers is now updated when the cluster topology changes. Will require server at version 1.0.7+ (#344, #352)

    Fixed

    • Protocol(s) received right after initial PONG may be missed. This would manifest with the handling of cluster topology state on connect with server 1.0.7+ (#348)

    Changed

    • Moved to Apache 2.0 License as part of the move to CNCF (#354)

    Complete Changes

    https://github.com/nats-io/go-nats/compare/v1.4.0...v1.5.0

    Source code(tar.gz)
    Source code(zip)
  • v1.4.0(Dec 20, 2017)

    Changelog

    Added

    • CustomDialer interface and SetCustomDialer option setter. Thanks to @joyhope and @mdevan for the report (#334)

    Improved

    • Reduce memory allocations for subscription protocol. Thanks to @nussjustin (#319)

    Fixed

    • Possible repeated timeout of Flush/FlushTimeout and inability to dispatch (#322)
    • Some typos and unnecessary type conversion. Thanks to @marmotini (#324)
    • ErrorHandler not always reporting proper error (#326)
    • TLSConfig cloning for Go 1.8+. Thanks to @johanbrandhorst (#336)

    Deprecated

    • Dialer option/setter, which deals with a *net.Dialer. Use CustomDialer instead (#334)

    Complete Changes

    https://github.com/nats-io/go-nats/compare/v1.3.0...v1.4.0

    Source code(tar.gz)
    Source code(zip)
  • v1.3.0(Aug 10, 2017)

    Changelog

    Added

    • FlusherTimeout option to limit the time the flusher can block (#252)
    • DiscoveredServersCB connection handler invoked when a server joins the cluster (#282)
    • context.Context support with new APIs: RequestWithContext, NextMsgWithContext (#275)
    • GetDefaultOptions() produces new default Options. Should be used instead of DefaultOptions. Thanks to @nogoegst (#308)

    Improved

    • Performance when connection publishes to itself and reduces risk of getting slow consumer (#285)
    • Less chatty request protocol (#295, #299)
    • Use of pool for timers in Request/NextMsg/FlushTimeout calls (#297)

    Changed

    • Repo name (now go-nats) (#239)
    • Authorization errors now trigger the async error callback and do not close the connection (#300)

    Fixed

    • Server pool shuffling when adding new URLs when servers join the cluster (#260)
    • Very rare panic in WaitGroup.Wait() (#268)
    • Locking in handling permission violation error from server (#289)

    Deprecated

    • DefaultOptions. You should use GetDefaultOptions() instead. See #308

    Complete Changes

    https://github.com/nats-io/go-nats/compare/v1.2.2...v1.3.0

    Source code(tar.gz)
    Source code(zip)
  • v1.2.2(Oct 26, 2016)

    Changelog

    Added

    • Support for cluster auto-discovery with servers v0.9.4+
    • Conn.IsConnected method to verify if client is connected
    • Conn.DiscoveredServers to get the list of servers discovered after initial connect to server part of a cluster (with servers v0.9.4+)
    • Custom Dialers for nats.Connect()
    • staticcheck in Travis build

    Updated

    • README’s Clustered Usage section
    • Travis build with go 1.7.3

    Removed

    • Travis build with go 1.5

    Fixed

    • Use default connect timeout in opts.Connect() if none is specified
    • Chan subscribers could not receive more than 65536 messages
    • Allow message size of 0 in examples/nats-bench.go
    • Ensure message count is greater than 0 in examples/nats-bench.go
    • If Options.Url is set, ensure it is tried first on initial connect
    • Data race with Conn.LastError()
    • Warning from go 1.7.3 with TLS config copy

    Complete Changes

    https://github.com/nats-io/nats/compare/v1.2.0...v1.2.2

    Source code(tar.gz)
    Source code(zip)
Owner
NATS - The Cloud Native Messaging System
NATS is a simple, secure and performant communications system for digital systems, services and devices.
NATS - The Cloud Native Messaging System
Nats-subject-profiler - NATS Subject Profiler With Golang

NATS Subject Profiler Example Connect it to the demo NATS server. nats-subject-p

Byron Ruth 2 Feb 7, 2022
Simple-messaging - Brokerless messaging. Pub/Sub. Producer/Consumer. Pure Go. No C.

Simple Messaging Simple messaging for pub/sub and producer/consumer. Pure Go! Usage Request-Response Producer: consumerAddr, err := net.ResolveTCPAddr

IchHabeKeineNamen 1 Jan 20, 2022
Queue with NATS Jetstream to remove all the erlangs from cloud

Saf in Persian means Queue. One of the problems, that we face on projects with queues is deploying RabbitMQ on the cloud which brings us many challenges for CPU load, etc. I want to see how NATS with Jetstream can work as the queue to replace RabbitMQ.

Parham Alvani 11 Jan 7, 2022
stratus is a cross-cloud identity broker that allows workloads with an identity issued by one cloud provider to exchange this identity for a workload identity issued by another cloud provider.

stratus stratus is a cross-cloud identity broker that allows workloads with an identity issued by one cloud provider to exchange this identity for a w

robert lestak 1 Dec 26, 2021
💨 A real time messaging system to build a scalable in-app notifications, multiplayer games, chat apps in web and mobile apps.

Beaver A Real Time Messaging Server. Beaver is a real-time messaging server. With beaver you can easily build scalable in-app notifications, realtime

Ahmed 1.4k Sep 27, 2022
Go client library SDK for Ably realtime messaging service

Ably Go A Go client library for www.ably.io, the realtime messaging service. Installation ~ $ go get -u github.com/ably/ably-go/ably Feature support T

Ably Realtime - our client library SDKs and libraries 55 Sep 25, 2022
NATS example to store and retrieve large file assets from JetStream.

njs-xfer njs-xfer is a sample application that demonstrates the ability to use NATS and JetStream to store and retrieve large file assets. This sample

Derek Collison 20 Mar 3, 2022
NATS Key-Value Store based Leader Election

What? A Leader Election system that uses keys in a NATS Key-Value Store to perform leader election. How? NATS KV Buckets have a TTL, creating a bucket

R.I.Pienaar 11 Sep 13, 2022
Мост между NATS streaming и MQ Series

Мост между NATS streaming и MQ Series Оригинальный репозиторий https://github.com/nats-io/nats-mq NATS-MQ Bridge This project implements a simple, but

Temur Yunusov 0 Nov 26, 2021
Basic kick the tires on NATS Key-Value API (Go)

nats-kv-101 Basic kick the tires on NATS Key-Value API (Go) Usage # Get ./mybucket -s "nats://vbox1.tinghus.net" -creds "/home/todd/lab/nats-cluster1/

Todd Beets 1 Feb 15, 2022
A basic pub-sub project using NATS

NATS Simple Pub-Sub Mechanism This is a basic pub-sub project using NATS. There is one publisher who publishes a "Hello World" message to a subject ca

Prosonul Haque 1 Dec 13, 2021
Vigia-go-nats - Program for processing camera metadata

VIGIA MIGRAR O HOUSEKEEPER PARA O PYTHON Programa para processamento de metadado

Filipe Andrade 0 Jan 10, 2022
Kafka implemented in Golang with built-in coordination (No ZooKeeper, single binary install, Cloud Native)

Jocko Distributed commit log service in Go that is wire compatible with Kafka. Created by @travisjeffery, continued by nash. Goals: Protocol compatibl

Nash.io 104 Aug 9, 2021
Tool for collect statistics from AMQP (RabbitMQ) broker. Good for cloud native service calculation.

amqp-statisticator Tool for collect statistics around your AMQP broker. For example RabbitMQ expose a lot information trought the management API, but

Jan Seidl 0 Dec 13, 2021
websocket based messaging server written in golang

Guble Messaging Server Guble is a simple user-facing messaging and data replication server written in Go. Overview Guble is in an early state (release

Sebastian Mancke 152 Jul 19, 2022
Golang Restful API Messaging using GORM ORM (MySQL) Gorilla Mux

Golang Restful API Messaging using GORM ORM (MySQL) Gorilla Mux Getting Started Folder Structure This is my folder structure under my $GOPATH or $HOME

Septian Ramadhan 0 Dec 14, 2021
Scalable real-time messaging server in language-agnostic way

Centrifugo is a scalable real-time messaging server in language-agnostic way. Centrifugo works in conjunction with application backend written in any

Centrifugal 6.4k Sep 26, 2022
Abstraction layer for simple rabbitMQ connection, messaging and administration

Jazz Abstraction layer for quick and simple rabbitMQ connection, messaging and administration. Inspired by Jazz Jackrabbit and his eternal hatred towa

SOCIFI Ltd. 16 Aug 24, 2022
A dead simple Go library for sending notifications to various messaging services.

A dead simple Go library for sending notifications to various messaging services. About Notify arose from my own need for one of my api server running

Niko Köser 1.3k Sep 30, 2022