Apache Pulsar Go Client Library

Overview

GoDoc Go Report Card Language LICENSE

Apache Pulsar Go Client Library

A Go client library for the Apache Pulsar project.

Goal

This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.

Once feature parity and stability are reached, this will supersede the current CGo based library.

Requirements

  • Go 1.11+

Status

Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.

Usage

Import the client library:

import "github.com/apache/pulsar-client-go/pulsar"

Create a Producer:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})

defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
	Topic: "my-topic",
})

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
	Payload: []byte("hello"),
})

defer producer.Close()

if err != nil {
    fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")

Create a Consumer:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})

defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "my-topic",
        SubscriptionName: "my-sub",
        Type:             pulsar.Shared,
    })

defer consumer.Close()

msg, err := consumer.Receive(context.Background())
    if err != nil {
        log.Fatal(err)
    }

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
            msg.ID(), string(msg.Payload()))

Create a Reader:

client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
	log.Fatal(err)
}

defer client.Close()

reader, err := client.CreateReader(pulsar.ReaderOptions{
	Topic:          "topic-1",
	StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
	log.Fatal(err)
}
defer reader.Close()

for reader.HasNext() {
	msg, err := reader.Next(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
		msg.ID(), string(msg.Payload()))
}

Contributing

Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.

Contact

Mailing lists
Name Scope
[email protected] User-related discussions Subscribe Unsubscribe Archives
[email protected] Development-related discussions Subscribe Unsubscribe Archives
Slack

Pulsar slack channel #dev-go at https://apache-pulsar.slack.com/

You can self-register at https://apache-pulsar.herokuapp.com/

License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

Comments
  • HasNext returns false when reading last message inclusive

    HasNext returns false when reading last message inclusive

    Expected behavior

    Reader.HasNext() should return true when when reading last message inclusive.

    Actual behavior

    Reader.HasNext() returns false.

    Steps to reproduce

    package main
    
    import (
    	"context"
    	"fmt"
    	"testing"
    
    	"github.com/apache/pulsar-client-go/pulsar"
    	"github.com/stretchr/testify/assert"
    )
    
    const (
    	lookupURL = "pulsar://localhost:6650"
    	topic     = "persistent://public/default/test"
    )
    
    func TestReaderLatestInclusiveHasNext(t *testing.T) {
    	client, err := pulsar.NewClient(pulsar.ClientOptions{
    		URL: lookupURL,
    	})
    
    	assert.Nil(t, err)
    	defer client.Close()
    
    	ctx := context.Background()
    
    	// create producer
    	producer, err := client.CreateProducer(pulsar.ProducerOptions{
    		Topic:           topic,
    		DisableBatching: true,
    	})
    	assert.Nil(t, err)
    	defer producer.Close()
    
    	// send 10 messages
    	var lastMsgID pulsar.MessageID
    	for i := 0; i < 10; i++ {
    		lastMsgID, err = producer.Send(ctx, &pulsar.ProducerMessage{
    			Payload: []byte(fmt.Sprintf("hello-%d", i)),
    		})
    		assert.NoError(t, err)
    		assert.NotNil(t, lastMsgID)
    	}
    
    	// create reader on the last message (inclusive)
    	reader, err := client.CreateReader(pulsar.ReaderOptions{
    		Topic:                   topic,
    		StartMessageID:          pulsar.LatestMessageID(),
    		StartMessageIDInclusive: true,
    	})
    
    	assert.Nil(t, err)
    	defer reader.Close()
    
    	var msgID pulsar.MessageID
    	if reader.HasNext() {
    		msg, err := reader.Next(context.Background())
    		assert.NoError(t, err)
    
    		assert.Equal(t, []byte("hello-9"), msg.Payload()) // updated after original post
    		msgID = msg.ID()
    	}
    
    	assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize()) // updated after original post
    }
    

    System configuration

    Pulsar version: 2.6.0

    type/bug triage/week-30 
    opened by quintans 17
  • Go build fails to compile

    Go build fails to compile

    Problem

    I'm tying to build the code, which just import "github.com/apache/pulsar-client-go/pulsar", and output is below.

    go: finding module for package github.com/apache/pulsar-client-go/pulsar
    go: downloading github.com/apache/pulsar-client-go v0.3.0
    go: found github.com/apache/pulsar-client-go/pulsar in github.com/apache/pulsar-client-go v0.3.0
    go: downloading github.com/prometheus/client_golang v1.7.1
    go: downloading github.com/pkg/errors v0.9.1
    go: downloading github.com/yahoo/athenz v1.8.55
    go: downloading github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb
    go: downloading github.com/sirupsen/logrus v1.4.2
    go: downloading github.com/gogo/protobuf v1.3.1
    go: downloading github.com/klauspost/compress v1.10.8
    go: downloading github.com/spaolacci/murmur3 v1.1.0
    go: downloading github.com/pierrec/lz4 v2.0.5+incompatible
    go: downloading github.com/linkedin/goavro/v2 v2.9.8
    go: downloading github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32
    go: downloading golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
    go: downloading golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1
    go: downloading github.com/golang/snappy v0.0.1
    go: downloading github.com/dgrijalva/jwt-go v3.2.0+incompatible
    go: downloading github.com/99designs/keyring v1.1.5
    go: downloading golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
    go: downloading github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a
    go: downloading github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d
    go: downloading github.com/mtibben/percent v0.2.1
    go: downloading golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
    go: downloading github.com/mitchellh/go-homedir v1.1.0
    go: downloading github.com/ardielle/ardielle-go v1.5.2
    go: downloading github.com/golang/protobuf v1.4.2
    go: downloading github.com/prometheus/client_model v0.2.0
    go: downloading github.com/prometheus/common v0.10.0
    go: downloading github.com/cespare/xxhash/v2 v2.1.1
    go: downloading github.com/beorn7/perks v1.0.1
    go: downloading github.com/prometheus/procfs v0.1.3
    go: downloading github.com/matttproud/golang_protobuf_extensions v1.0.1
    go: downloading google.golang.org/protobuf v1.23.0
    # github.com/keybase/go-keychain
    ../../../../pkg/mod/github.com/keybase/[email protected]/macos_1.10.go:89:13: could not determine kind of name for C.SecTrustedApplicationCreateFromPath
    

    It may be caused by referring github.com/keybase/[email protected]/macos_1.10.go because that is not latest one.

    Environment

    os: macOS Mojave 10.14.6 go: go1.15.3 darwin/amd64 pulsar-client-go: 0.3.0

    area/build type/question triage/week-49 
    opened by yukshimizu 16
  • [ISSUE #68][feat]add Option config for init (#68)

    [ISSUE #68][feat]add Option config for init (#68)

    Change-Id: I85a9c9f20e61e126b617eab919d2405a3ebda087

    Master Issue: #68

    Motivation

    simple initialization of Producer

    Modifications

    add ProducerOption for newProducer of client

    Verifying this change

    • [x] Make sure that the change passes the CI checks.

    Does this pull request potentially affect one of the following parts:

    If yes was chosen, please highlight the changes

    • Dependencies (does it add or upgrade a dependency): (yes / no)
    • The public API: (yes / no)
    • The schema: (yes / no / don't know)
    • The default values of configurations: (yes / no)
    • The wire protocol: (yes / no)

    Documentation

    • Does this pull request introduce a new feature? (yes / no)
    • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
    • If a feature is not applicable for documentation, explain why?
    • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
    type/enhancement 
    opened by xujianhai666 16
  • [Issue 662] Fix race in connection.go waitUntilReady()

    [Issue 662] Fix race in connection.go waitUntilReady()

    Fixes #662

    Motivation

    As described in #662, there appears to be a potential race condition in connection.go function waitUntilReady(): the cond.Broadcast() can occur before the cond.Wait().

    [EDIT:] To be explicit, this is the issue:

    [goroutine A] calls c.getState() and sees that it is not set to connectionReady [goroutine B] changes the state to connectionReady [goroutine B] sends a cond.Broadcast(), which goes nowhere because no goroutine is waiting. [goroutine A] calls cond.Wait(), which never completes

    Modifications

    Function waitUntilReady() was previously holding the global c.Lock() on the connection. From my reading of the code, this mutex is intended to protect the cnx variable. I think that the use of c.Lock() in waitUntilReady() was probably just a typo.

    Instead, I think the author probably intended to grab the lock associated with the sync.Cond, i.e. c.cond.L.Lock(). This looks like the correct thing to do when using a sync.Cond. I think there should be a corresponding lock around the cond.Broadcast() also. See https://stackoverflow.com/questions/36857167/how-to-correctly-use-sync-cond/42772799#42772799 for more details.

    Verifying this change

    I am unsure if this change is covered by existing tests. It fixes a rare race condition, so I think it would be quite difficult to test for.

    I have deployed this change on my own production system, and it doesn't obviously break anything. I will report back if I see any issues that might be related to it.

    Does this pull request potentially affect one of the following parts:

    • Dependencies (does it add or upgrade a dependency): no
    • The public API: no
    • The schema: no
    • The default values of configurations: no
    • The wire protocol: no

    Documentation

    • Does this pull request introduce a new feature? no
    • If yes, how is the feature documented? not applicable
    type/bug 
    opened by bschofield 14
  • feat: support multiple schema version for producer and consumer

    feat: support multiple schema version for producer and consumer

    Motivation

    Implement PIP-43. Support multiple schema version for producer and consumer.

    Modifications

    • add schema cache for producer and consumer
    • add DisableMultiSchema option for producer

    Verifying this change

    • [x] Make sure that the change passes the CI checks.

    (Please pick either of the following options)

    Does this pull request potentially affect one of the following parts:

    If yes was chosen, please highlight the changes

    • Dependencies (does it add or upgrade a dependency): (no)
    • The public API: (yes)
    • The schema: (yes)
    • The default values of configurations: (yes)
    • The wire protocol: (no)

    Documentation

    • Does this pull request introduce a new feature? (no)
    • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
    • If a feature is not applicable for documentation, explain why?
    • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
    opened by oryx2 14
  • consumer.Receive () is blocking, availablePermits are decreasing, but msgBacklog is increasing

    consumer.Receive () is blocking, availablePermits are decreasing, but msgBacklog is increasing

    Expected behavior

    The topic stats as follows:

    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 0,
      "publishers" : [ ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 0,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1024,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 2967,
      "publishers" : [ {
        "msgRateIn" : 0.0,
        "msgThroughputIn" : 0.0,
        "averageMsgSize" : 0.0,
        "producerId" : 1,
        "metadata" : { },
        "producerName" : "qa-pulsar-ten-5-279",
        "connectedSince" : "2019-12-26T20:56:40.736+08:00",
        "address" : "/10.32.68.213:43156"
      } ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 1,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1021,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 5681,
      "publishers" : [ {
        "msgRateIn" : 0.0,
        "msgThroughputIn" : 0.0,
        "averageMsgSize" : 0.0,
        "producerId" : 1,
        "metadata" : { },
        "producerName" : "qa-pulsar-ten-5-279",
        "connectedSince" : "2019-12-26T20:56:40.736+08:00",
        "address" : "/10.32.68.213:43156"
      } ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 3,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1019,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 7169,
      "publishers" : [ {
        "msgRateIn" : 0.0,
        "msgThroughputIn" : 0.0,
        "averageMsgSize" : 0.0,
        "producerId" : 1,
        "metadata" : { },
        "producerName" : "qa-pulsar-ten-5-279",
        "connectedSince" : "2019-12-26T20:56:40.736+08:00",
        "address" : "/10.32.68.213:43156"
      } ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 5,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1017,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 8718,
      "publishers" : [ {
        "msgRateIn" : 0.0,
        "msgThroughputIn" : 0.0,
        "averageMsgSize" : 0.0,
        "producerId" : 1,
        "metadata" : { },
        "producerName" : "qa-pulsar-ten-5-279",
        "connectedSince" : "2019-12-26T20:56:40.736+08:00",
        "address" : "/10.32.68.213:43156"
      } ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 8,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1014,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "averageMsgSize" : 0.0,
      "storageSize" : 6,
      "publishers" : [ {
        "msgRateIn" : 0.0,
        "msgThroughputIn" : 0.0,
        "averageMsgSize" : 0.0,
        "producerId" : 1,
        "metadata" : { },
        "producerName" : "qa-pulsar-ten-5-279",
        "connectedSince" : "2019-12-26T20:56:40.736+08:00",
        "address" : "/10.32.68.213:43156"
      } ],
      "subscriptions" : {
        "response:proxy:8787:9:1577364969737" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "msgRateRedeliver" : 0.0,
          "msgBacklog" : 0,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "type" : "Failover",
          "activeConsumerName" : "lxxlc",
          "msgRateExpired" : 0.0,
          "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "msgRateRedeliver" : 0.0,
            "consumerName" : "lxxlc",
            "availablePermits" : 1011,
            "unackedMessages" : 0,
            "blockedConsumerOnUnackedMsgs" : false,
            "metadata" : { },
            "connectedSince" : "2019-12-26T20:56:09.832+08:00",
            "address" : "/10.32.68.213:57108"
          } ],
          "isReplicated" : false
        }
      },
      "replication" : { },
      "deduplicationStatus" : "Disabled"
    }
    

    System configuration

    Pulsar version: 2.4.0 type : Failover

    type/bug 
    opened by dsmlily 14
  • [oauth2] Remove oauth2 go.mod and go.sum

    [oauth2] Remove oauth2 go.mod and go.sum

    Motivation

    When the oauth2 sub-directory was added in PR #313 there wasn't any justification given (AFAICT) for creating a separate sub-module instead of just treating the oauth2 sources as a normal directory. Since the oauth2 module does not have a separate release cycle, treating it as a sub-module just adds unnecessary complexity.

    Modifications

    Removed go.mod and go.sum from the oauth2 sub-directory. Updated the main go.mod and go.sum to included the necessary dependencies.

    opened by pgier 12
  • [WIP][consumer] Deadlock with in-flight acknowledgements

    [WIP][consumer] Deadlock with in-flight acknowledgements

    Motivation

    This is an integration test built with the intent of replicating the bug reported in this issue. I opened this PR to gather some feedback before working on a fix.

    A possible solution would be to move the connectionClosed events out of the eventsCh channel and have them into their own channel (e.g. connectionEventsCh): https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L752

    This is because some of the events travelling through the eventsCh channel rely on an open connection to finish. So if the eventsCh channel gets full with, say, in-flight ackRequest events, the connectionClosed event cannot be processed until the ackRequest are finished and the ackRequest cannot finish because the connection is closed and the connectionClosed event (which should trigger a reconnection to the broker) is stuck waiting to be pushed to the eventsCh channel.

    Given that the connectionClosed event is needed to trigger a reconnection to the broker I think it could make sense to have it in a separate channel.

    If this approach sounds sensible to you I can start making modifications.

    Replicate the bug

    The test doesn't always fail, you'll have to try again. In my case it fails 1 out of 3 times (more or less). So if you run it just once and you see the test complete instead of failing, then please run it again.

    Once the test fails you should see that it timed out after five minutes and that it printed a lot of logs saying Trying to ack {messageID}.

    Please check the attached deadlock.zip which shows the logs of a failing test.

    Running the test

    • make sure you have docker running and usable from the user that runs the test
    • make sure you do docker pull apachepulsar/pulsar:2.6.1 first
    • then do: go test ./pulsar/ -test.run ^TestDeadlock$ -v -tags integration -count 1

    deadlock.zip

    opened by fracasula 12
  • short read when reading frame size

    short read when reading frame size

    Hi. While using pulsar client, I got

    level=info message "Error reading from connection" error="Short read when reading frame size"
    

    Question is: what does this log mean? what can I do for it, or, How could I deal with it?

    Thx for your help. @sijie @wolfstudy

    triage/week-15 
    opened by mileschao 12
  • Catch up Java client function list

    Catch up Java client function list

    Non-Support Features:

    • [ ] Schema
    • [x] seek by time
    • [x] nack
    • [x] replication
    • [x] Interceptor
    • [x] DeadLetterPolicy
    • [x] receive with timeout
    • [x] multi topics
    • [x] topics pattern
    • [x] auto update partitions
    • [ ] read compact
    • [x] auth
    opened by wolfstudy 12
  • define logger interface and add Logger field to ClientOptions

    define logger interface and add Logger field to ClientOptions

    Motivation

    enable users to configure the logger used by the client and use their own implementation. If no logger is provided, a wrapped logrus.StandardLogger() will be used. This PR only solved part of the problem mentioned in the issue https://github.com/apache/pulsar-client-go/issues/228.

    Modifications

    • define Logger and Entry interfaces used by the client
    • add Logger field to ClientOptions
    • add logger field to internal structures
    • provide a logger implementation backed by logrus
    • implement a no-op logger
    opened by shohi 11
  • [feature] Max Retry per msg feature added

    [feature] Max Retry per msg feature added

    (If this PR fixes a github issue, please add Fixes #<xyz>.)

    Fixes #890

    Motivation

    Add a feature to allow Max Retry per msg.

    Modifications

    Msg will have max reconsume times property. if not present use default property from consumer max deliveries

    Verifying this change

    • [ ] Make sure that the change passes the CI checks.

    This change added tests and can be verified as follows:

    • Added Integration tests for per msg retry login

    Does this pull request potentially affect one of the following parts:

    If yes was chosen, please highlight the changes

    • Dependencies (does it add or upgrade a dependency): no
    • The public API: (yes / no)
    • The schema: (yes / no / don't know)
    • The default values of configurations: no
    • The wire protocol: (yes / no)

    Documentation

    • Does this pull request introduce a new feature? yes
    • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
    • If a feature is not applicable for documentation, explain why?
    • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
    opened by ngoyal16 3
  • [feature] Support batch index ACK

    [feature] Support batch index ACK

    Fixes https://github.com/apache/pulsar-client-go/issues/894

    Modifications

    Add an EnableBatchIndexAcknowledgment to specify whether batch index ACK is enabled. Since this feature requires the conversion between a bit set and its underlying long array, which is similar to Java's BitSet, this commit introduces github.com/bits-and-blooms/bitset dependency to replace the big.Int based implementation of the bit set.

    Add a BatchSize() method to MessageId to indicate the size of the ack_set field. When the batch index ACK happens, convert the []uint64 to the []int64 as the ack_set field in CommandAck. When receiving messages, convert the ack_set field in CommandMessage to filter the acknowledged single messages.

    Remove the duplicated code in AckID and AckIDWithResponse.

    Verifications

    TestBatchIndexAck is added to cover the case whether AckWithResponse is enabled and both individual and cumulative ACK.

    opened by BewareMyPower 0
  • [feature request] Support SNAPPY CompressionType

    [feature request] Support SNAPPY CompressionType

    Motivation

    As the title says. In go client, Snappy compression is not yet supported.

    https://github.com/apache/pulsar-client-go/blob/77c7ccbd144b00d17c320c5f67cadaedb53f6b1e/pulsar/producer.go#L38-L43

    opened by shibd 1
  • [feature request] Expose the chunk config of consumer to the reader

    [feature request] Expose the chunk config of consumer to the reader

    Motivation

    About the chunk config of the Consumer, we need to make it exposed in Reader.

    https://github.com/apache/pulsar-client-go/blob/23777362f3d132f98eff56c4f511f761227d8cf4/pulsar/consumer_partition.go#L112-L114

    opened by shibd 0
  • [feature request] Support Transaction API

    [feature request] Support Transaction API

    Motivation

    Now, the pulsar-client-go does not support the go-client, so I want to support transaction API for the go client.

    Solution

    Add Transaction API for go client.

    opened by liangyepianzhou 0
Releases(v0.9.0)
  • v0.9.0(Oct 9, 2022)

    What's Changed

    • add 0.8.0 changelog for repo by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/727
    • [Issue 729]stop ticker when create producer failed by @NaraLuwan in https://github.com/apache/pulsar-client-go/pull/730
    • Update version file to 0.8.0 by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/728
    • [Issue 725][Dependencies] Upgrade beefsack/go-rate by @shubham1172 in https://github.com/apache/pulsar-client-go/pull/735
    • Upgrade klauspost/compress to v1.14.4 by @dferstay in https://github.com/apache/pulsar-client-go/pull/740
    • Upgrade prometheus client to 1.11.1 by @nicoloboschi in https://github.com/apache/pulsar-client-go/pull/738
    • add 0.8.1 changelog by @freeznet in https://github.com/apache/pulsar-client-go/pull/742
    • Temporarily point ci to pulsar 2.8.2 by @Shoothzj in https://github.com/apache/pulsar-client-go/pull/747
    • [build] make go version consistent by @Shoothzj in https://github.com/apache/pulsar-client-go/pull/751
    • Exposing broker metadata by @Shoothzj in https://github.com/apache/pulsar-client-go/pull/745
    • Add schema support to Reader by @ZiyaoWei in https://github.com/apache/pulsar-client-go/pull/741
    • allow config reader subscription name by @Shoothzj in https://github.com/apache/pulsar-client-go/pull/754
    • Cleanup topics after unit tests by @ZiyaoWei in https://github.com/apache/pulsar-client-go/pull/755
    • Add TableView support by @ZiyaoWei in https://github.com/apache/pulsar-client-go/pull/743
    • Fix ack timeout cause reconnect by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/756
    • fix: add service not ready check by @nodece in https://github.com/apache/pulsar-client-go/pull/757
    • Dlq producer on topic with schema by @GPrabhudas in https://github.com/apache/pulsar-client-go/pull/723
    • fix annotation typo in consumer.go by @Shoothzj in https://github.com/apache/pulsar-client-go/pull/758
    • Fix producer unable register when cnx closed by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/761
    • Add -c/--max-connections parameter to pulsar-perf-go and set it to 1 by default by @lhotari in https://github.com/apache/pulsar-client-go/pull/765
    • [Issue 763][producer] Fix deadlock in Producer Send when message fails to encode. by @samuelhewitt in https://github.com/apache/pulsar-client-go/pull/762
    • [Issue 749]Fix panic caused by flushing current batch with an incorrect internal function. by @shileiyu in https://github.com/apache/pulsar-client-go/pull/750
    • Add consumer state check when request commands by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/772
    • Fix sequenceID is not equal to cause the connection to be closed incorrectly by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/774
    • Add error response for Ack func by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/775
    • Revert "Fix stuck when reconnect broker (#703)" by @lhotari in https://github.com/apache/pulsar-client-go/pull/767
    • [Issue 718] Fix nil pointer dereference in TopicNameWithoutPartitionPart by @hantmac in https://github.com/apache/pulsar-client-go/pull/734
    • Support ack response for Go SDK by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/776
    • [Issue 779]Fix ack request not set requestId when enable AckWithResponse option by @liushengzhong0927 in https://github.com/apache/pulsar-client-go/pull/780
    • [issue 791] allow to add at least one message on batch builder by @zzzming in https://github.com/apache/pulsar-client-go/pull/792
    • schema creation and validation functions without panicing by @zzzming in https://github.com/apache/pulsar-client-go/pull/794
    • [github ci] add go 1.18 to the test matrix by @pgier in https://github.com/apache/pulsar-client-go/pull/790
    • Fix using closed connection in consumer by @hrsakai in https://github.com/apache/pulsar-client-go/pull/785
    • feat: add basic authentication by @nodece in https://github.com/apache/pulsar-client-go/pull/778
    • [Issue 781][add consumer seek by time on partitioned topic] by @GPrabhudas in https://github.com/apache/pulsar-client-go/pull/782
    • [ci] update and simplify GitHub workflow by @pgier in https://github.com/apache/pulsar-client-go/pull/796
    • feat: support multiple schema version for producer and consumer by @oryx2 in https://github.com/apache/pulsar-client-go/pull/611
    • [issue #752] replace go-rate rate limiter with a buffered channel implementation by @zzzming in https://github.com/apache/pulsar-client-go/pull/799
    • [issue 814] consumer and producer reconnect failure metrics counter by @zzzming in https://github.com/apache/pulsar-client-go/pull/815
    • [cleanup] Fix to follow lint error (File is not goimports-ed (goimports)) by @equanz in https://github.com/apache/pulsar-client-go/pull/824
    • [oauth2] Remove oauth2 go.mod and go.sum by @pgier in https://github.com/apache/pulsar-client-go/pull/802
    • [client] Rename test_helper.go to follow the test code naming convention by @equanz in https://github.com/apache/pulsar-client-go/pull/822
    • [security] Bump github.com/stretchr/testify to update gopkg.in/yaml.v3 by @massakam in https://github.com/apache/pulsar-client-go/pull/813
    • [client] Add MetricsRegisterer to ClientOptions by @pragkent in https://github.com/apache/pulsar-client-go/pull/826
    • NackBackoffPolicy.Next return time.Duration by @h-hy in https://github.com/apache/pulsar-client-go/pull/825
    • Add golang 1.19 in test matrix by @Shoothzj in https://github.com/apache/pulsar-client-go/pull/832
    • ci: add makefile by @pgier in https://github.com/apache/pulsar-client-go/pull/800
    • Make keepalive interval configurable by @nodece in https://github.com/apache/pulsar-client-go/pull/838
    • [issue #807] dlq topic producer options by @zzzming in https://github.com/apache/pulsar-client-go/pull/809
    • [log-format] remove redundant "[]" pair in the head and tail of log content by @shenqianjin in https://github.com/apache/pulsar-client-go/pull/831
    • Update proto file latest by @liangyuanpeng in https://github.com/apache/pulsar-client-go/pull/841
    • [bugfix] Fix wrong check eventime by default by @liangyuanpeng in https://github.com/apache/pulsar-client-go/pull/843
    • Fixs : NackBackoffPolicy.Next return time.Duration by @h-hy in https://github.com/apache/pulsar-client-go/pull/834
    • Introduce doneCh for ack error by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/777
    • Parameterize the reconnection option by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/853
    • add 0.9.0 release changelog by @freeznet in https://github.com/apache/pulsar-client-go/pull/804
    • Embed Go SDK version to 0.9.0 by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/854

    New Contributors

    • @NaraLuwan made their first contribution in https://github.com/apache/pulsar-client-go/pull/730
    • @shubham1172 made their first contribution in https://github.com/apache/pulsar-client-go/pull/735
    • @nicoloboschi made their first contribution in https://github.com/apache/pulsar-client-go/pull/738
    • @ZiyaoWei made their first contribution in https://github.com/apache/pulsar-client-go/pull/741
    • @lhotari made their first contribution in https://github.com/apache/pulsar-client-go/pull/765
    • @samuelhewitt made their first contribution in https://github.com/apache/pulsar-client-go/pull/762
    • @shileiyu made their first contribution in https://github.com/apache/pulsar-client-go/pull/750
    • @hantmac made their first contribution in https://github.com/apache/pulsar-client-go/pull/734
    • @liushengzhong0927 made their first contribution in https://github.com/apache/pulsar-client-go/pull/780
    • @oryx2 made their first contribution in https://github.com/apache/pulsar-client-go/pull/611
    • @massakam made their first contribution in https://github.com/apache/pulsar-client-go/pull/813
    • @pragkent made their first contribution in https://github.com/apache/pulsar-client-go/pull/826
    • @h-hy made their first contribution in https://github.com/apache/pulsar-client-go/pull/825
    • @shenqianjin made their first contribution in https://github.com/apache/pulsar-client-go/pull/831

    Full Changelog: https://github.com/apache/pulsar-client-go/compare/v0.8.0...v0.9.0

    Source code(tar.gz)
    Source code(zip)
  • v0.8.1(Mar 15, 2022)

    What's Changed

    • Upgrade beefsack/go-rate by @shubham1172 in https://github.com/apache/pulsar-client-go/pull/735
    • Upgrade prometheus client to 1.11.1 by @nicoloboschi in https://github.com/apache/pulsar-client-go/pull/738

    New Contributors

    • @shubham1172 made their first contribution in https://github.com/apache/pulsar-client-go/pull/735
    • @nicoloboschi made their first contribution in https://github.com/apache/pulsar-client-go/pull/738

    Full Changelog: https://github.com/apache/pulsar-client-go/compare/v0.8.0...v0.8.1

    Source code(tar.gz)
    Source code(zip)
  • v0.8.0(Feb 16, 2022)

    What's Changed

    • Update release docs with missing information by @cckellogg in https://github.com/apache/pulsar-client-go/pull/656
    • Update change log for 0.7.0 release by @cckellogg in https://github.com/apache/pulsar-client-go/pull/655
    • Update version to 0.7.0 by @cckellogg in https://github.com/apache/pulsar-client-go/pull/654
    • fix issue 650,different handle sequence value by @baomingyu in https://github.com/apache/pulsar-client-go/pull/651
    • Support nack backoff policy for SDK by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/660
    • Remove unused dependency in oauth2 module by @reugn in https://github.com/apache/pulsar-client-go/pull/661
    • [Issue 662] Fix race in connection.go waitUntilReady() by @bschofield in https://github.com/apache/pulsar-client-go/pull/663
    • Update dependencies by @reugn in https://github.com/apache/pulsar-client-go/pull/665
    • [Issue 652] Quick fixes to the documentation for the main building blocks of the library by @reugn in https://github.com/apache/pulsar-client-go/pull/667
    • Add subscription properties for ConsumerOptions by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/671
    • Add new bug-resistant build constraints by @reugn in https://github.com/apache/pulsar-client-go/pull/670
    • Handle the parameters parsing error in NewProvider by @reugn in https://github.com/apache/pulsar-client-go/pull/673
    • Update email template of release docs by @izumo27 in https://github.com/apache/pulsar-client-go/pull/674
    • Add properties filed for batch container by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/683
    • [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method by @bschofield in https://github.com/apache/pulsar-client-go/pull/678
    • Upgrade DataDog/zstd to v1.5.0 by @dferstay in https://github.com/apache/pulsar-client-go/pull/690
    • fix:add order key to message by @leizhiyuan in https://github.com/apache/pulsar-client-go/pull/688
    • Set default go version to 1.13 in CI related files by @reugn in https://github.com/apache/pulsar-client-go/pull/692
    • [Partition Consumer] Provide lock-free access to compression providers by @dferstay in https://github.com/apache/pulsar-client-go/pull/689
    • Use a separate gorutine to handle the logic of reconnect by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/691
    • [DefaultRouter] add a parallel bench test by @dferstay in https://github.com/apache/pulsar-client-go/pull/693
    • Revert "Use a separate gorutine to handle the logic of reconnect" by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/700
    • Fix data race while accessing connection in partitionProducer by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/701
    • Fix stuck when reconnect broker by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/703
    • Fix slice bounds out of range for readSingleMessage by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/709
    • Encryption failure test case fix by @GPrabhudas in https://github.com/apache/pulsar-client-go/pull/708
    • [DefaultRouter] fix unnecessary system clock reads due to races accessing router state by @dferstay in https://github.com/apache/pulsar-client-go/pull/694
    • Fix negative WaitGroup counter issue by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/712
    • [issue 675] oauth2 use golang-jwt address CVE-2020-26160 by @zzzming in https://github.com/apache/pulsar-client-go/pull/713
    • readme: add note about how to build and test by @pgier in https://github.com/apache/pulsar-client-go/pull/714
    • Bump oauth2 package version to the latest in master by @iorvd in https://github.com/apache/pulsar-client-go/pull/715
    • Fix closed connection leak by @billowqiu in https://github.com/apache/pulsar-client-go/pull/716
    • [Bugfix] producer runEventsLoop for reconnect early exit by @billowqiu in https://github.com/apache/pulsar-client-go/pull/721
    • [issue 679][oauth2] Fix macos compiler warnings by @pgier in https://github.com/apache/pulsar-client-go/pull/719
    • [optimize] Stop batchFlushTicker when Disable batching by @Shoothzj in https://github.com/apache/pulsar-client-go/pull/720
    • Markdown error fix by @Shoothzj in https://github.com/apache/pulsar-client-go/pull/722

    New Contributors

    • @bschofield made their first contribution in https://github.com/apache/pulsar-client-go/pull/663
    • @izumo27 made their first contribution in https://github.com/apache/pulsar-client-go/pull/674
    • @pgier made their first contribution in https://github.com/apache/pulsar-client-go/pull/714
    • @iorvd made their first contribution in https://github.com/apache/pulsar-client-go/pull/715
    • @billowqiu made their first contribution in https://github.com/apache/pulsar-client-go/pull/716

    Full Changelog: https://github.com/apache/pulsar-client-go/compare/v0.7.0...v0.8.0-candidate-1

    Source code(tar.gz)
    Source code(zip)
  • v0.7.0(Nov 16, 2021)

    Feature

    • Encryption support for producer, see PR-560
    • Decrytion support for consumer, see PR-612
    • User-defined metric cardinality, see PR-604
    • Better support for Azure AD OAuth 2.0, see PR-630, PR-633, PR-634
    • Removed testing for go versions 1.11 and 1.12, see PR-632
    • Add epoch to create producer to prevent a duplicate producer when broker is not available., see PR-582

    Improve

    • Fix batch size limit validation, see PR-528
    • Fix logic of command for sendError, see PR-622
    • Drain connection requests channel without closing, see PR-645
    • Fix ConsumersOpened counter not incremented when use multitopic or regexp consumer, see PR-619
    • Fix reconnection logic when topic is deleted, see PR-627
    • Fix panic when scale down partitions, see PR-601
    • Fix missing metrics for topics by registration of existing collector, see PR-600
    • Fix producer panic by oldProducers, see PR-598
    • Fail pending messages when topic is terminated, see PR-588
    • Fix handle send error panic, see PR-576
    Source code(tar.gz)
    Source code(zip)
  • v0.6.0(Jul 26, 2021)

    Feature

    • Make PartitionsAutoDiscoveryInterval configurable, see PR-514.
    • Always check connection close channell, before attempting to put requests, see PR-521.
    • Add LedgerId,EntryId,BatchIdx,PartitionIdx func for MessageId interface, see PR-529.
    • Add DisableReplication to Producer Message, see PR-543.
    • Updating comments to conform to golang comment specification, see PR-532.
    • Producer respects Context passed to Send() and SendAsync() when applying backpressure, see PR-534.
    • Simplify connection close logic, see PR-559.
    • Add open tracing to pulsar go client, see PR-518.
    • Update proto file, see PR-562.
    • Add send error logic for connection, see PR-566.
    • Add license file for depend on libs, see PR-567.

    Improve

    • Update jwt-go dependency to resolve vulnerabilities, see PR-524.
    • Fixed Athenz repository name, see PR-522.
    • Fix reader latest position, see PR-525.
    • Fix timeout guarantee for RequestOnCnx, see PR-492.
    • Fix nil pointer error with GetPartitionedTopicMetadata, see PR-536.
    • Release locks before calling producer consumer response callbacks, see PR-542.
    • Fix lookup service not implemented GetTopicsOfNamespace, see PR-541.
    • Regenerate the certs to work with Pulsar 2.8.0 and Java 11, see PR-548.
    • Fix race condition when resend pendingItems, see PR-551.
    • Fix data race while accessing connection in partitionConsumer, see PR-535.
    • Fix channel data race, see PR-558.
    • Fix write to closed channel panic() in internal/connection during connection close, see PR-539.
    • Fix possible race condition in connection pool, see PR-561.
    • Fix default connection timeout, see PR-563.
    • Add lock for compressionProviders to fix data race problem, see PR-533.
    • Fix send goroutine blocked, see PR-530.
    Source code(tar.gz)
    Source code(zip)
  • v0.5.0(May 13, 2021)

    Fixes

    • #465 Reverted datadog to DataDog
    • #499 Fix range channel deadlock error
    • #509 Add sentAt when put item into pendingQueue
    • #474 Fix race condition/goroutine leak in partition discovery goroutine
    • #494 Close cnxPool when closing a Client
    • #478 Move GetPartitionedTopicMetadata to lookup service
    • #470 Fix unexpected nil pointer when reading item from keyring
    • #467 Fix reader with start latest message id inclusive

    Improvements

    • #510 Added http lookup service support
    • #502 Support listener name for go client
    • #484 Add multiple hosts support
    • #471 Use newError to build return error
    • #459 Improve error log for frame size too big and maxMessageSize
    Source code(tar.gz)
    Source code(zip)
  • v0.4.0(Mar 5, 2021)

    Feature

    • Support send timeout in Producer side, see PR-394.
    • Add metric for internal publish latency, see PR-397.
    • Add key_based Batch logic, see PR-363.
    • Add error label to publish errors metric, see PR-405.
    • Add const client label to metrics, see PR-406.
    • Attach topic and custom labels to Prometheus metrics, see PR-410.
    • Add orderingKey apis, see PR-427.
    • Support jwt and trusted cert for pulsar perf client, see PR-427.

    Improve

    • Fix bot action CI yaml file, see PR-395.
    • Update go-keyring to v1.1.6 to remove warnings on MacOS Catalina+ , see PR-404.
    • Read the clock fewer times during message routing, see PR-372.
    • Close dangling autoDiscovery goroutine in consumer, see PR-411.
    • Fix discard unacked messages, see PR-413.
    • Fixed logic to attempt reconnections to same broker, see PR-414.
    • Reduce the default TCP connection timeout from 30 to 5 seconds, see PR-415.
    • Removed unused import "C" statement, see PR-416.
    • Renamed Metrics.RpcRequestCount to RPCRequestCount to conform to style check, see PR-417.
    • Fix leaked nack tracker goroutine, see PR-418.
    • Clearing message queues after seek requests, see PR-419.
    • Fix retry policy not effective with partitioned topic, see PR-425.
    • Deduplicate user provided topics before using them, see PR-426.
    • The reader.HasNext() returns true on empty topic, see PR-441.
    • Upgrade gogo/protobuf to 1.3.2, see PR-446.
    • Fix logrusWrapper shrink interfaces slice into an interface, see PR-449.
    • Logging what really caused lookup failure, see PR-450.
    • Make state thread safe in consumer_partition and connection, see PR-451.
    • Fix KeyFileTypeServiceAccount not found compile error, see PR-455.
    • Fix unsubscribe blocked when consumer is closing or has closed, see PR-457.
    • The asynchronized send timeout checking without pending queue lock, see PR-460.
    Source code(tar.gz)
    Source code(zip)
  • v0.3.0(Nov 17, 2020)

    Feature

    • Support retry letter topic in Go client, see PR-359.
    • Support limit the retry number of reconnectToBroker, see PR-360.
    • Support key shared policy in Go client, see PR-363.
    • Add schema logic in producer and consumer, see PR-368.

    Improve

    • Fix panic on receiverQueueSize set to -1, see PR-361.
    • Fix may lead to panic test case, see PR-369.
    • Send delay message individually even batching is enabled, see PR-372.
    • Fixed buffer resize when writing request on connection, see PR-374.
    • Fixed deadlock in DLQ ack processing, see PR-375.
    • Fix deadlock when connection closed, see PR-376.
    • Fix producer deadlock after write failure, see PR-378.
    • Fix maxMessageSize not effective even if aligned with broker, see PR-381.
    • Update default router to switch partition on all batching thresholds, see PR-383.
    • Replaced github.com/DataDog/zstd with github.com/datadog/zstd, see PR-385.
    • Fix retry policy not effective with non-FQDN topics, see PR-386.
    Source code(tar.gz)
    Source code(zip)
  • v0.2.0(Aug 27, 2020)

    Feature

    • Expose BatchingMaxSize from ProducerOptions, see PR-280.
    • Allow applications to configure the compression level, see PR-290.
    • Support producer name for Message, see PR-299.
    • Support oauth2 authentication for pulsar-client-go, see PR-313.
    • Add interceptor feature for Go client, see PR-314.
    • Export client metrics to Prometheus, see PR-317.
    • Add Name method to Consumer interface, see PR-321.
    • Add oauth2 to the provider, see PR-338.
    • Support specified the oauth2 private key with prefix file:// and data://, see PR-343.
    • Fix the keyfile unmarshal error, see PR-339.
    • Add a new method to create auth provider from tls cert supplier, see PR-347.
    • Add seek logic for reader, see PR-356.

    Improve

    • Use .asf.yaml to configure github repo, see PR-216.
    • Auto update the client to handle changes in number of partitions, see PR-221.
    • Clean callbacks of connection after run loop stopped, see PR-248.
    • Fix unable to close consumer after unsubscribe in Shared Subscription, see PR-283.
    • Introduced lifecycle for compression providers, see PR-284.
    • Use maxPendingMessages for sizing producer eventsChan, see PR-285.
    • Avoid contention on producer mutex on critical path, see PR-286.
    • Switched to DataDog zstd wrapper, reusing the compression ctx, see PR-287.
    • Fix panic when creating consumer with ReceiverQueueSize set to -1, see PR-289.
    • Used pooled buffering for compression and batch serialization, see PR-292.
    • Use gogofast to have in-place protobuf serialization, see PR-294.
    • Added semaphore implementation with lower contention, see PR-298.
    • Fixed pooled buffer lifecycle, see PR-300.
    • Removed blocking queue iterator, see PR-301.
    • Fix panic in CreateReader API using custom MessageID for ReaderOptions, see PR-305.
    • Change connection failed warn log to error and print error message, see PR-309.
    • Share buffer pool across all partitions, see PR-310.
    • Add rerun feature test command to repo, see PR-311.
    • Fix CompressMaxSize() for ZLib provider, see PR-312.
    • Reduce the size of the MessageID structs by one word on 64-bit arch, see PR-316.
    • Do not allocate MessageIDs on the heap, see PR-319.
    • Different MessageID implementations for message Production and Consumption, see PR-324.
    • Fix producer block when the producer with the same id, see PR-326.
    • Get the last message when LatestMessageID and inclusive, see PR-329.
    • Fix go.mod issue with invalid version, see PR-330.
    • Fix producer goroutine leak, see PR-331.
    • Fix producer state by reconnecting when receiving unexpected receipts, see PR-336.
    • Avoid producer deadlock on connection closing, see PR-337.
    Source code(tar.gz)
    Source code(zip)
  • v0.1.0(Apr 3, 2020)

    Client

    • Support TLS logic
    • Support Authentication logic
    • Support Proxy logic
    • Support Hostname verification logic

    Producer

    • Add Send() method in Producer interface
    • Add SendAsync() method in Producer interface
    • Add LastSequenceID() method in Producer interface
    • Add Flush() method in Producer interface
    • Add Close() method in Producer interface
    • Add Topic() method in Producer interface
    • Add Name() method in Producer interface
    • Support MessageRouter logic
    • Support batch logic
    • Support compression message logic
    • Support HashingScheme logic
    • Support User defined properties producer logic

    Consumer

    • Add Subscription() method in Consumer interface
    • Add Unsubscribe() method in Consumer interface
    • Add Receive() method in Consumer interface
    • Add Ack() method in Consumer interface
    • Add AckID() method in Consumer interface
    • Add Nack() method in Consumer interface
    • Add NackID() method in Consumer interface
    • Add Seek() method in Consumer interface
    • Add SeekByTime() method in Consumer interface
    • Add Close() method in Consumer interface
    • Support Dead Letter Queue consumer policy
    • Support Topics Pattern and Topics logic
    • Support topic consumer regx logic
    • Support multi topics consumer logic
    • Support Exclusive, Failover, Shared and KeyShared subscribe type logic
    • Support Latest and Earliest logic
    • Support ReadCompacted logic
    • Support ReplicateSubscriptionState logic
    • Support User defined properties consumer logic
    • Support Delayed Delivery Messages logic

    Reader

    • Add Topic() method in Reader interface
    • Add Next() method in Reader interface
    • Add HasNext() method in Reader interface
    • Add Close() method in Reader interface
    • Support read compacted logic
    • Support start messageID logic
    • Support User defined properties reader logic
    Source code(tar.gz)
    Source code(zip)
Owner
The Apache Software Foundation
The Apache Software Foundation
Kafka, Beanstalkd, Pulsar Pub/Sub framework

go-queue Kafka, Beanstalkd, Pulsar Pub/Sub framework.

chenquan 3 Sep 17, 2022
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Black Square Media 1k Dec 28, 2022
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Confluent Inc. 3.7k Dec 30, 2022
franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Travis Bischel 865 Dec 29, 2022
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Shopify 9.5k Jan 1, 2023
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

CloudHut 2.9k Jan 3, 2023
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Johannes Brüderl 1.8k Dec 31, 2022
CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Matheus Fidelis 27 Nov 13, 2022
Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

Diego Hordi 0 Dec 5, 2021
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

bagher sohrabi 2 Oct 27, 2021
Testing Apache Kafka using Go.

Apache Kafka Go Testing Apache Kafka using Go. Instructions Provision the single node Kafka cluster using Docker: docker-compose -p apache-kafka-go up

Gabriel Kim 0 Dec 17, 2021
golang client library to Viessmann Vitotrol web service

Package go-vitotrol provides access to the Viessmann™ Vitotrol™ cloud API for controlling/monitoring boilers. See https://www.viessmann.com/app_vitoda

Maxime Soulé 21 Nov 16, 2022
Go client library SDK for Ably realtime messaging service

Ably Go A Go client library for www.ably.io, the realtime messaging service. Installation ~ $ go get -u github.com/ably/ably-go/ably Feature support T

Ably Realtime - our client library SDKs and libraries 62 Dec 2, 2022
It's client library written in Golang for interacting with Linkedin Cruise Control using its HTTP API.

go-cruise-control It's client library (written in Golang) for interacting with Linkedin Cruise Control using its HTTP API. Supported Cruise Control ve

Banzai Cloud 1 Jan 10, 2022
Go client to reliable queues based on Redis Cluster Streams

Ami Go client to reliable queues based on Redis Cluster Streams. Consume/produce performance Performance is dependent from: Redis Cluster nodes count;

Andrey Kuzmin 26 Dec 12, 2022
Golang client for NATS, the cloud native messaging system.

NATS - Go Client A Go client for the NATS messaging system. Installation # Go client go get github.com/nats-io/nats.go/ # Server go get github.com/na

NATS - The Cloud Native Messaging System 4.3k Jan 5, 2023
RabbitMQ Reconnection client

rmqconn RabbitMQ Reconnection for Golang Wrapper over amqp.Connection and amqp.Dial. Allowing to do a reconnection when the connection is broken befor

Babiv Sergey 20 Sep 27, 2022
An easy-to-use CLI client for RabbitMQ.

buneary, pronounced bun-ear-y, is an easy-to-use RabbitMQ command line client for managing exchanges, managing queues and publishing messages to exchanges.

Dominik Braun 56 Sep 3, 2022
An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`

Go RabbitMQ Client Library This is a Go AMQP 0.9.1 client maintained by the RabbitMQ core team. It was originally developed by Sean Treadway. Differen

RabbitMQ 671 Jan 1, 2023