Sarama is a Go library for Apache Kafka 0.8, and up.

Overview

sarama

Go Reference Build Status Coverage

Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later).

Getting started

  • API documentation and examples are available via pkg.go.dev.
  • Mocks for testing are available in the mocks subpackage.
  • The examples directory contains more elaborate example applications.
  • The tools directory contains command line tools that can be useful for testing, diagnostics, and instrumentation.

You might also want to look at the Frequently Asked Questions.

Compatibility and API stability

Sarama provides a "2 releases + 2 months" compatibility guarantee: we support the two latest stable releases of Kafka and Go, and we provide a two month grace period for older releases. This means we currently officially support Go 1.14 through 1.15, and Kafka 2.5 through 2.7, although older releases are still likely to work.

Sarama follows semantic versioning and provides API stability via the gopkg.in service. You can import a version with a guaranteed stable API via http://gopkg.in/Shopify/sarama.v1. A changelog is available here.

Contributing

Comments
  • Implement a higher-level consumer group

    Implement a higher-level consumer group

    Related to my other PR #1083, this is another attempt, this time with a higher-level consumer API. I have been going back and forth on this one, I can't find a better way to make it flexible enough to support all the use cases and comprehensible at the same time. I would really appreciate some feedback.

    opened by dim 83
  • OffsetManager Implementation

    OffsetManager Implementation

    There are still some TODOs sprinkled throughout, but not all of them necessarily need to be fixed before we ship this (it's already pretty close to the minimum viable implementation). The only critical missing piece is implementing AsyncClose for the PartitionOffsetManager.

    Regardless, it's at the state where it can be opened up for comments. This should completely supersede #379.

    opened by eapache 55
  • Lockless multiproducer

    Lockless multiproducer

    CC @sirupsen @wvanbergen @burke @fw42 @graemej

    Simple non-batching synchronous tests are working. Needs batching tests, but as Burke mentioned yesterday they are very messy with the current framework.

    opened by eapache 51
  • New producer design [beta]

    New producer design [beta]

    @wvanbergen @burke @Sirupsen @graemej and whoever else might be interested.

    This is an idea for a different API for the producer that is somewhat more channel/goroutine based. I was thinking about alternative architectures after reviewing the PR which adds the channels returning failed/successful messages. I started playing with things (after also reviewing the original multiproducer PRs) and came up with this.

    It ended up being basically a complete rewrite, so reviewing the diff won't be too helpful, just read the new producer.go straight. It's very much a work-in-progress, I'm more interested in architectural thoughts right now than code nits. It seems to pass the simple tests, but that's primarily accidental, shrug.

    Misc. notes on things that might cause questions:

    • User communication (input of new messages, receipt of errors) is all done with channels now, so the common idiom I expect would be a select on those two channels to avoid blocking.
    • It doesn't have a "synchronous" mode, instead the separate SimpleProducer uses the AckSuccesses flag to effectively imitate that.
    • In the normal success path, a message ends up passing through five goroutines before being bundled into a message and passed to the Broker object to put on the wire. Not sure if this will impact performance noticeably, but it means that if any intermediate step blocks (i.e. a metadata request for a new leader for a topic) then all other messages not directly impacted will continue processing, which is nice (assuming you configure a large enough channel buffer).
    • I delayed actually converting the Key/Value elements to bytes until the very last moment. This seems cleaner, and also potentially simplifies the addition of streaming messages which somebody asked for recently.
    • Hopefully the retry path is easier to reason about than the old one, as it is exactly the same as the normal path now (the message gets resubmitted on exactly the same channel as the user submits messages on, it just has an extra flag set). It should still preserve ordering correctly though (see the leaderDispatcher logic for this).
    opened by eapache 47
  • panic: sync: negative WaitGroup counter

    panic: sync: negative WaitGroup counter

    Seeing this occasionally in production:

    Revision: 23d523386ce0c886e56c9faf1b9c78b07e5b8c90

    panic: sync: negative WaitGroup counter
    
    goroutine 444 [running]:
    sync.(*WaitGroup).Add(0xc208e6c510, 0xffffffffffffffff)
            /usr/src/go/src/sync/waitgroup.go:67 +0x96
    sync.(*WaitGroup).Done(0xc208e6c510)
            /usr/src/go/src/sync/waitgroup.go:85 +0x31
    github.com/Shopify/sarama.(*asyncProducer).returnSuccesses(0xc208e6c4d0, 0xc29ffe9600, 0x3f, 0x40)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:753 +0xd3
    github.com/Shopify/sarama.(*asyncProducer).flusher(0xc208e6c4d0, 0xc20814c540, 0xc2080b9800)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:560 +0xafe
    github.com/Shopify/sarama.func·006()
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:430 +0x43
    github.com/Shopify/sarama.withRecover(0xc20802b220)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/utils.go:42 +0x3a
    created by github.com/Shopify/sarama.(*asyncProducer).messageAggregator
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:430 +0x205
    
    bug :-( producer 
    opened by cep21 38
  • Not seeing all messages when syncProducer called from goroutines

    Not seeing all messages when syncProducer called from goroutines

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly. Sarama Version: 1.15.0 Kafka Version: 0.11.0 Go Version: 1.9.2 darwin/amd64

    Configuration

    What configuration values are you using for Sarama and Kafka?

    Sarama configuration

    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 10
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    config.Version = sarama.V0_11_0_0
    

    Kafka configuration

    broker.id=0
    delete.topic.enable=true
    listeners=PLAINTEXT://localhost:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/usr/local/Cellar/kafka/kafka-log-1
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    auto.create.topics.enable=false 
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    
    Logs

    When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

    sarama: client.go:115: Initializing new client
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: broker.go:146: Connected to broker at localhost:9092 (unregistered)
    sarama: client.go:429: client/brokers registered new broker #0 at localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (3 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (2 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (1 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:161: Successfully initialized new client
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: async_producer.go:601: producer/broker/0 starting up
    sarama: async_producer.go:612: producer/broker/0 state change to [open] on people/0
    sarama: broker.go:144: Connected to broker at localhost:9092 (registered as #0)
    
    Problem Description

    When calling SendMessage on a single instance of syncProducer from multiple goroutines, some messages seem to fail to be produced to Kafka. I've looked at what ends up on the stream using Apache's kafka-console-consumer and it shows only a fraction of the messages on the stream anywhere from half of the messages down to none. I wrote my own consumer using sarama and it's the same issue, however I get the below error message back from sarama. I want to use syncProducer because I need to guarantee that messages will be published to the stream in the order that they're received by my application. Maybe I've just implemented it wrong, but right now I'm out of ideas and I'm hoping someone on here can help me out.

    sarama: consumer.go:755: consumer/broker/0 abandoned subscription to people/0 because kafka: response did not contain all the expected topic/partition blocks
    Error: kafka: error while consuming people/0: kafka: response did not contain all the expected topic/partition blocks
    sarama: consumer.go:345: consumer/people/0 finding new broker
    sarama: client.go:644: client/metadata fetching metadata for [people] from broker localhost:9092
    sarama: consumer.go:711: consumer/broker/0 added subscription to people/0
    

    Here's how I created my topic: bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic people

    I'm running a single broker on my local machine. I've written a sample program that can reproduce the issue. It's also worth noting that none of the calls to sendMessage() are returning errors when I run the code.

    main.go

    package main
    
    import (
    	"bytes"
    	"fmt"
    	"log"
    	"strconv"
    	"sync"
    	"syncProducer/streamer"
    
    	"github.com/Shopify/sarama"
    	"github.com/linkedin/goavro"
    	uuid "github.com/satori/go.uuid"
    )
    
    const personSchema = `{
    	"type":"record",
    	"name":"Person",
    	"namespace":"com.example.people",
    	"fields":[
    		{
    			"name":"Name",
    			"type":"string"
    		},
    		{
    			"name":"Address",
    			"type":"string"
    		},{
    			"name":"City",
    			"type":"string"
    		},
    		{
    			"name":"State",
    			"type":"string"
    		},
    		{
    			"name":"ZIP",
    			"type":"long"
    		}
    	]
    }`
    
    var (
    	personCodec *goavro.Codec
    	buf         bytes.Buffer
    )
    
    type (
    	person struct {
    		Name    string
    		Address string
    		City    string
    		State   string
    		ZIP     int64
    	}
    )
    
    func main() {
    	var err error
    	personCodec, err = goavro.NewCodec(personSchema)
    	if err != nil {
    		panic(err)
    	}
    
    	producer, err := newSyncProducer()
    	if err != nil {
    		panic(err)
    	}
    	streamer := streamer.New(producer)
    
    	// Create 10 avro message bodies
    	var people [][]byte
    	for i := 1; i < 11; i++ {
    		aPerson := person{
    			Name:    "Bob #" + strconv.Itoa(i),
    			Address: strconv.Itoa(i) + " Main St.",
    			City:    "SomeTown",
    			State:   "CA",
    			ZIP:     90210,
    		}
    		data, err := convertToAvro(aPerson)
    		if err != nil {
    			panic("Could not convert aPerson " + strconv.Itoa(i) + " to avro.")
    		}
    		people = append(people, data)
    	}
    
    	errc := make(chan error, 10)
    
    	var wg sync.WaitGroup
    	// Send messages
    	for _, person := range people {
    		wg.Add(1)
    		go func(person []byte, c chan error, wg *sync.WaitGroup) {
    			uuid := uuid.NewV4().String()
    			err := streamer.SendActivity("people", uuid, "CreatePerson", person, nil)
    			c <- err
    			wg.Done()
    		}(person, errc, &wg)
    	}
    
    	wg.Wait()
    	close(errc)
    	fmt.Println("Completed!")
    	for i := range errc {
    		fmt.Println(i)
    		if i != nil {
    			fmt.Printf("Exit: %v\n", i)
    		}
    	}
    
    	fmt.Print(&buf)
    }
    
    func convertToAvro(aPerson person) ([]byte, error) {
    	data, err := personCodec.BinaryFromNative(nil, map[string]interface{}{
    		"Name":    aPerson.Name,
    		"Address": aPerson.Address,
    		"City":    aPerson.City,
    		"State":   aPerson.State,
    		"ZIP":     aPerson.ZIP,
    	})
    	if err != nil {
    		return nil, err
    	}
    
    	return data, nil
    }
    
    func newSyncProducer() (sarama.SyncProducer, error) {
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
    	config.Producer.Retry.Max = 10                   // Retry up to 10 times to produce the message
    	config.Producer.Return.Successes = true          // Required when using syncproducer
    	config.Producer.Return.Errors = true
    	config.Version = sarama.V0_11_0_0
    
    	sarama.Logger = log.New(&buf, "sarama: ", log.Lshortfile)
    
    	return sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    }
    

    streamer.go

    package streamer
    
    import (
    	"github.com/Shopify/sarama"
    	"github.com/pkg/errors"
    )
    
    const (
    	MessageTypeHeaderKey = "message-type"
    	MessageIDHeaderKey = "message-id"
    )
    
    type (
    	// Metadata contains metadata for a given activity.
    	Metadata map[string][]string
    
    	// Streamer handles streaming activities to a topic.
    	Streamer struct {
    		producer sarama.SyncProducer
    	}
    )
    
    var (
    	// ErrNoSubjects denotes that no subjects were provided.
    	ErrNoSubjects = errors.New("At least one subject is required")
    )
    
    // New creates a new streamer.
    func New(producer sarama.SyncProducer) *Streamer {
    	return &Streamer{
    		producer: producer,
    	}
    }
    
    // SendActivity encapsulates the provided metadata and data in a message and send it to a topic.
    func (s *Streamer) SendActivity(topic string, messageID string, messageHeaderValue string, data []byte, metadata Metadata) error {
    	_, _, err := s.producer.SendMessage(&sarama.ProducerMessage{
    		Topic: topic,
    		Key:   sarama.StringEncoder(messageID),
    		Value: sarama.ByteEncoder(data),
    		Headers: []sarama.RecordHeader{
    			sarama.RecordHeader{
    				Key:   []byte(MessageIDHeaderKey),
    				Value: []byte(messageID),
    			},
    			sarama.RecordHeader{
    				Key:   []byte(MessageTypeHeaderKey),
    				Value: []byte(messageHeaderValue),
    			},
    		},
    	})
    	if err != nil {
    		return errors.Wrapf(err, "Error sending message to topic %s for ID %s", topic, messageID)
    	}
    
    	return nil
    }
    
    bug :-( producer 
    opened by kenschneider18 36
  • MultiConsumer

    MultiConsumer

    @wvanbergen @snormore @mkobetic (and anyone else who cares)

    The current consumer works just fine, but is limited to fetching just a single partition of a single topic at once. You can run multiple consumers, but this has several issues:

    • They can't use the same client, or else the broker connection has contention problems (brokers handle requests on a single connection serially, so if it has to wait for MaxWaitTime before responding, this introduces severe latency when multiplexing busy and non-busy topics). Using separate clients works, but generates a lot of extra connections and metadata traffic.
    • You don't get messages from multiple partitions batched into a single response, when that makes sense for efficiency.

    The ideal solution is to have a consumer capable of fetching from multiple topic/partitions at once. This has been at the back of my mind for a while now, so I already have a design ready; if the design makes sense to y'all, then somebody just needs to write it :)

    While the user currently specifies a single topic/partition to the constructor, they should now be able to specify a set of topic/partitions. Since some of the configuration that is currently a singleton actually needs to be per-partition (OffsetMethod and OffsetValue at least), this would probably be of type map[string]map[int32]*ConsumerPartitionConfig. I considered permitting the dynamic adding/removing of partitions to the set, but I don't see a strong use case and it complicated a bunch of things.

    Events are returned to the user exactly the same way they are now, over a single channel. I considered a separate channel per topic/partition but it complicated the base case, and the events already contain information on topic/partition so it's not hard for the user to dispatch appropriately if they really want to.

    The constructor starts up one "controller" goroutine, which starts up and manages one goroutine per broker-that-has-a-partition-we-care-about and is responsible for (initially) dispatching each topic/partition to the appropriate broker's goroutine. The broker goroutine looks a lot like the current fetchMessages method with a few tweaks:

    • Some minor work needed to handle multiple blocks in the requests and responses.
    • When a topic/partition is reassigned to a new broker, that topic/partition gets returned to the controller via a channel; the goroutine tracks how many it is "responsible" for and exits if that reaches 0.
    • Similarly when a broker goes down hard, all topic/partitions are returned to the controller for re-dispatching and the goroutine exits.

    I expect the success case to be fairly straightforward - as always, the complexity will come when reasoning through the failure cases and ensuring that topic/partitions are redispatched correctly, messages are not accidentally skipped in that case, etc. etc.

    When the consumer is closed, it signals the controller which cleans up its children before exiting.

    Thoughts?

    enhancement consumer 
    opened by eapache 31
  • Better Broker Connection Management

    Better Broker Connection Management

    Following up to #7, #9, #10, #13 our connection management still has problems.

    First some things to know about Kafka that complicate the issue:

    • Broker metadata requests do not seem required to return all the brokers they know about (it is not documented which ones they do return, but I suspect it is the only the subset of brokers that are leading a partition whose data is also in the response).
    • The user only specifies (host, port) pairs while Kafka metadata returns (host, port, brokerId) triples, with no guarantee that the host matches the user-specified hostname, and with no apparent way to query a broker's own brokerId.

    Now some of the issues with our current implementation:

    • If the user provides three addresses, only the last of which is valid, we wait for the first two connections to completely fail when fetching metadata before we try the third. Since the TCP connection timeout can be very long (many minutes) this can take a long time. Trying them all in parallel and using the first valid one would be better.
    • Once we've had an address fail, we discard it forever. Over the lifetime of a long-running cluster, all of the nodes may be down at one point or another, meaning that eventually we may 'run out' of nodes and abort.

    Thoughts on potential future design:

    • We should connect to brokers lazily, ie only when we actually want to talk to them, not just when we know they exist.
    • Since there is only ever one leader for a partition, if the connection fails it doesn't make sense to just try another broker. Therefore the caller should decide to proceed, which means that the client's leader function shouldn't guarantee that the broker it returns is connected, only connecting.
    • However, when fetching metadata, any broker will do, so the any function should guarantee that the broker it returns is actually connected. Walking through bad brokers to find a good one could be slow, but connecting to all the brokers just to pick one and drop the others is gross. We could try to connect to one, and if it hasn't succeed in 5 seconds, start another, etc. That should cover the common case, since if it hasn't responded after 5 seconds it probably won't, and if it does then we'll drop it right away, oh well. More importantly, we won't spawn hundreds of connections if the first broker does respond normally.
    • User-provided addresses should be managed separately (which they already are) since they don't have brokerIds. They're only useful for metadata requests, so they should be disconnected entirely 99% of the time. If one of them fails, we should probably just mark it as recently-failed, and retry it after some time has passed. Only if all user-supplied nodes have recently failed (and we have no metadata from the cluster itself) should we abort somehow.

    Open Questions:

    • If a user-supplied broker is used to satisfy a call to any, when should it be disconnected? The extraBroker field in client currently is a hack to work around this, and it still leaves one connection floating around for now reason. Presumably disconnectBroker could be used and made smarter? However, if a metadata-supplied broker is used, calling disconnectBroker will pull it out from underneath other code that could be using it as a leader. Do we need reference-counting?
    • There is currently no way to tell that a Broker is 'Connecting' you can only tell that the lock is currently held. This means that if two concurrent leader calls would return the same broker and both call Open, the second one will block when we don't want it to. Do we need a second lock in the brokers? That gets really messy...
    enhancement client 
    opened by eapache 29
  • Async producer can overflow itself at high rate

    Async producer can overflow itself at high rate

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly. Sarama Version: b1da1753dedcf77d053613b7eae907b98a2ddad5 Kafka Version: Irrelevant Go Version: b1da1753dedcf77d053613b7eae907b98a2ddad5

    Configuration

    What configuration values are you using for Sarama and Kafka?

    Sarama async producer:

    	conf := sarama.NewConfig()
    	conf.Metadata.Retry.Max = 1
    	conf.Metadata.Retry.Backoff = 250 * time.Millisecond
    	conf.Producer.RequiredAcks = sarama.RequiredAcks(sarama.WaitForLocal)
    	conf.Producer.Timeout = 1 * time.Second
    	conf.Producer.MaxMessageBytes = 16 << 20 // 16MB
    	conf.Producer.Flush.Bytes = 16 << 20 // 16MB
    	conf.Producer.Flush.Frequency = time.Minute
    	conf.Producer.Compression = sarama.CompressionNone // otherwise Kafka goes nuts
    	conf.Producer.Return.Errors = true
    	conf.Producer.Partitioner = NewIdentityPartitioner
    
    Logs

    Sarama logs:

    2017-01-09T23:30:21.504 myhost 2017/01/09 23:30:19 Kafka producer err: kafka: Failed to produce message to topic requests: kafka server: Message was too large, server rejected it to avoid allocation error.
    
    Problem Description

    Problem is in this function:

    • https://github.com/Shopify/sarama/blob/0fb560e5f7fbcaee2f75e3c34174320709f69944/async_producer.go#L583-L648

    We produce messages at such rate (100K/s+) so select always picks up processing of new incoming messages over rolling over and flushing existing ones.

    I applied to following patch:

    diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
    index e7ae8c2..13a888b 100644
    --- a/vendor/github.com/Shopify/sarama/async_producer.go
    +++ b/vendor/github.com/Shopify/sarama/async_producer.go
    @@ -2,6 +2,7 @@ package sarama
     
     import (
     	"fmt"
    +	"log"
     	"sync"
     	"time"
     
    @@ -249,6 +250,7 @@ func (p *asyncProducer) dispatcher() {
     		}
     
     		if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
    +			log.Printf("Got message size bigger than allowed max message bytes %d > %d", msg.byteSize(), p.conf.Producer.MaxMessageBytes)
     			p.returnError(msg, ErrMessageSizeTooLarge)
     			continue
     		}
    @@ -577,9 +579,12 @@ func (bp *brokerProducer) run() {
     	var output chan<- *produceSet
     	Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
     
    +	wasReadyTimes := 0
    +
     	for {
     		select {
     		case msg := <-bp.input:
    +			log.Println("INPUT MESSAGE")
     			if msg == nil {
     				bp.shutdown()
     				return
    @@ -625,14 +630,23 @@ func (bp *brokerProducer) run() {
     				bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
     			}
     		case <-bp.timer:
    +			log.Println("TIMER FIRED")
     			bp.timerFired = true
     		case output <- bp.buffer:
    +			wasReadyTimes = 0
    +			log.Println("ROLL OVER")
     			bp.rollOver()
     		case response := <-bp.responses:
    +			log.Println("HANDLING RESPONSE")
     			bp.handleResponse(response)
     		}
     
     		if bp.timerFired || bp.buffer.readyToFlush() {
    +			log.Println("READY TO FLUSH YAY")
    +			wasReadyTimes++
    +			if wasReadyTimes > 10 {
    +				log.Fatal("We were ready for a long time, but it did not happen. Exiting.")
    +			}
     			output = bp.output
     		} else {
     			output = nil
    diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
    index 9fe5f79..91a127f 100644
    --- a/vendor/github.com/Shopify/sarama/produce_set.go
    +++ b/vendor/github.com/Shopify/sarama/produce_set.go
    @@ -1,6 +1,9 @@
     package sarama
     
    -import "time"
    +import (
    +	"log"
    +	"time"
    +)
     
     type partitionSet struct {
     	msgs        []*ProducerMessage
    @@ -147,6 +150,7 @@ func (ps *produceSet) readyToFlush() bool {
     		return true
     	// If we've passed the byte trigger-point
     	case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
    +		log.Printf("ready to flush because buffer bytes are big enough: %d >= %d", ps.bufferBytes, ps.parent.conf.Producer.Flush.Bytes)
     		return true
     	default:
     		return false
    
    

    The output looks like this: https://gist.github.com/bobrik/27071d61d5ec98ed15ffd1cb5331f3f4.

    Without log.Fatal I've seen buffer sizes go as high as 40MB, which is bigger than message.max.bytes=33554432 in our cluster.

    The solution is probably to do rollOver outside of select.

    bug :-( producer upstream 
    opened by bobrik 26
  • Memory leak sometimes when restarting kafka partitions

    Memory leak sometimes when restarting kafka partitions

    I know this isn't a lot to go on, but hope you can give me pointers to debug this.

    Restarting kafka brokers occasionally produces ever increasing memory usage until the go process dies OOM. This isn't consistent, but especially happens we have to restart multiple kafka nodes at once. When I run pprof/heap I see the memory usage by kafka messages but cannot tell where inside the kafka code the memory is being held onto :( It only happens occasionally and only on our production tier. I also don't see increased goroutine numbers. We're running with 256 brokers and about 12 kafka hosts, using the kafka revision just after you fixed the WaitGroup bug. The log has messages like

    Failed to connect to XYZ: getsocopt: connection refused client/brokers deregistering XYZ client/brokers deregistered broker #-1 at kafkaXYZ:9092 ...

    Even when kafka is stable, and the messages go away, memory usage still increases forever until we OOM. A restart and things work great.

    Here is the config we use

        conf.Net.DialTimeout = time.Second * 30
    
        conf.ChannelBufferSize = 256
    
        conf.Producer.RequiredAcks = sarama.WaitForLocal
        conf.Producer.Return.Errors = true
        conf.Producer.Return.Successes = true
        conf.Producer.Partitioner = partitioner
        conf.Producer.Flush.Frequency = time.Millisecond * 250
        conf.Producer.Flush.Messages = 3000
        conf.Producer.Flush.Bytes = 83886080
    

    The biggest issues for us are that even when kafka recovers memory usage still increases forever in this strait line upward.

    When kafka failures happen, is it possible that multiple buffers could be created internally that end up buffering points?

    Note we also don't see increases in messages on the error channel.

    producer 
    opened by cep21 26
  • [WIP] Offset manager tests

    [WIP] Offset manager tests

    This PR adds tests for the offset manager in PR #461. The majority of the code now has coverage.

    Notes

    • I was unable to reach the bom.updateSubscriptions loop in the brokerOffsetManager.abort() method; pointers for that would be appreciated.
    • The tests use a pretty simple setup; adding a second partition offset manager (in an attempt to reach the code in the previous point) led to a a broker unreference/refresh and a need for more mock responses that I didn't get to.
    • After spending a while with it, the code is still somewhat difficult to reason about, due to a lot of channels and resultant side effects. Also, 3 different objects whose responsibilities are not totally clear.

    @eapache @wvanbergen @horkhe

    opened by aaronkavlie-wf 25
  • add: unit test for consumer group strategy compatibility

    add: unit test for consumer group strategy compatibility

    Sorry for the trouble in https://github.com/Shopify/sarama/issues/2351

    Add a simple unit test for strategy backward compatibility.

    This test should fail now, after fixing in https://github.com/Shopify/sarama/pull/2352, it should pass.

    opened by Jacob-bzx 0
  • Configure backward compatibility on Group.Rebalance.GroupStrategies

    Configure backward compatibility on Group.Rebalance.GroupStrategies

    In d66826a0 commit, new configuration options Group.Rebalance.GroupStrategies have been introduced.

    But it is not backward compatibility.

    If I do the below starategy set in my older code, but use the latest sarama package, will encounter Consumer.Group.Rebalance.GroupStrategies and Consumer.Group.Rebalance.Strategy cannot be set at the same time error. Please see https://github.com/Shopify/sarama/blob/v1.37.0/config.go#L808

    // config setting in my older setting
    kconfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    

    That is because sarama always set a default value on Group.Rebalance.GroupStrategies. Please see https://github.com/Shopify/sarama/blob/v1.37.0/config.go#L542

    The code is very confusing, util I read d66826a0 commit. Does it a violated Object-oriented design pattern? The Config should not exposed, so set Consumer.Group.Rebalance.Strategy and Consumer.Group.Rebalance.GroupStrategies can be atomic. Just setting Consumer.Group.Rebalance.Strategy Deprecated is not enough.

    opened by spongecaptain 3
  • Sarama API about consumer groups are not idiot-proof enough.

    Sarama API about consumer groups are not idiot-proof enough.

    It's particularly tricky to have a consumer groups working correctly when a rebalancing happens. The sample code was wrong, it has been corrected with https://github.com/Shopify/sarama/pull/2240/commits/bddf37e2c3f7241c1398478c73205094507dac90 It is still wrong

    https://github.com/Shopify/sarama/blob/d0a00ae5507996d3b7c06aa9e8d1fa58f8013c6b/examples/consumergroup/main.go#L184

    The remaining mistake is that the close of the channel might be received before the cancellation of the context, so the correct code should be like this:

    for {
      select {
      case message, ok := <-claim.Messages():
         if !ok {
           return nil
         }
         log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
         session.MarkMessage(message, "")
    …
    

    By the way, the example code there https://pkg.go.dev/github.com/Shopify/sarama?utm_source=godoc#example-ConsumerGroup maybe deserves to be fixed.

    I was bitten by the first version of the example (like many others I presume). I had a slow consumer, so claim.Messages() was a go buffered channel saturated with 256 elements at the time the rebalancing happens. And of course, naively iterating until the end of the channel triggers various io timeouts.

    The big issue is that the naïve code works well until a rebalancing occurs with slow consumers. Something that typically happens in prod and not during the tests. I would say that the APIs don't respect the principle of least astonishment (POLA) and you should do something about it. Here are some suggestions:

    add a helper function/method

    var message *sarama.ConsumerMessage
    for claim.NextMessage(&message) {
        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
         session.MarkMessage(message, "")
    }
    

    I agree that the channel based API has some elegance and fit perfectly with the business to do. The naive code is pretty (but wrong). But given how complex it is to have it right, let's drop the channel based API and use some iterator based API.

    make the naive code correct

    That can be done very easily with an helper like this (totally untested):

    // forward anything from in to out until the end of in or the context cancelled.
    func forward[A any](ctx context.Context, in <-chan A, out chan<- A) {
    	defer close(out)
    
    	var pending A
    	somethingToWrite := false
    
    	for {
    		var input <-chan A
    		var output chan<- A
    		if somethingToWrite {
    			output = out
    		} else {
    			input = in
    		}
    		select {
    		case a, ok := <-input:
    			if !ok {
    				return
    			}
    			somethingToWrite = true
    			pending = a
    		case output <- pending:
    			somethingToWrite = false
    		case <-ctx.Done():
    			return
    		}
    	}
    }
    
    // Unbuffered returns a new unbuffered read channel with the same content of `in` that is closed immediately when the context is cancelled.
    func Unbuffered[A any](ctx context.Context, in <-chan A) <-chan A {
    	ret := make(chan A, 0) // unbuffered
    
    	go forward(ctx, in, ret)
    
    	return ret
    }
    

    claim.Messages() shall return an unbuffered channel and all the naive consumer code on earth suddenly becomes correct. The runtime cost is one additional go routine, no additional allocation. I doubt this has any performance impact. I don't see how it can break existing clients (existing clients that assumes the channel is buffered and play with len/cap on it, look very dirty. Maybe they deserve to be broken)

    Warn in the docs

    At the very least, you should warn people better than what is done. I had read the doc here https://github.com/Shopify/sarama/blob/d0a00ae5507996d3b7c06aa9e8d1fa58f8013c6b/consumer_group.go#L46 and also the doc here https://github.com/Shopify/sarama/blob/d0a00ae5507996d3b7c06aa9e8d1fa58f8013c6b/consumer_group.go#L987 but I wrongly interpreted them.
    You should add in both places a warning that the channel is buffered and the user must watch for the cancellation of the context to exit early.

    opened by fdejaeger 1
  • Enabling batching/compression for Sync Producers

    Enabling batching/compression for Sync Producers

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

    | Sarama | Kafka | Go | |--------|-------|----| | 1.33.0 | 2.2.1 | 1.16 |

    Configuration

    What configuration values are you using for Sarama and Kafka?

    config.Consumer.Group.Session.Timeout = 20 * time.Second
    config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
    config.Consumer.Fetch.Min = 2000
    config.Consumer.MaxWaitTime  = 100 * time.Millisecond
    config.Consumer.MaxProcessingTime = 15 * time.Second
    config.Consumer.Offsets.AutoCommit.Enable = false
    config.Consumer.Return.Errors = true
    config.Version = sarama.V2_2_1_0
    
    config.Producer.Return.Successes = true
    config.Version = sarama.V2_2_1_0
    config.Producer.Compression = sarama.CompressionSnappy
    config.Producer.Flush.Frequency = 100 * time.Millisecond
    config.Producer.Flush.Bytes = 2000
    
    Logs

    When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

    logs: CLICK ME

    2022/09/15 09:25:11.901347 sync_producer.go:53: - DEBUG > message sent to partition 2at offset 47443
    
    2022/09/15 09:25:11.901688 sync_producer.go:53: - DEBUG > message sent to partition 2at offset 47442
    
    

    Problem Description

    We are using SyncProducer in our application. We are trying to enable batching and compression at the producing by setting the below configuration config.Producer.Compression = sarama.CompressionSnappy config.Producer.Flush.Frequency = 100 * time.Millisecond config.Producer.Flush.Bytes = 2000

    The expectation is that this would increase the throughput and reduce lag as we now wont be connecting to kafka to produce each message individually but only once for a batch of messages.

    But strangely after introducing batching and compression, we observe a little lag in our producer application. Is batching and compression applicable for Sync producers as well or is it only for Async producers?

    Based on the above configuration, Does the Sync producer wait for 100ms on each message before it can return and produce the next message as Sync producer blocks on acknowledgement? Have we now introduced a lag of 100ms on each message? From the logs we see message sent to partition message for each individual message, does it mean we produce message one at a time even though batch size is more? Our messages size is less than 1KB per message.

    Please clarify! Thanks in advance.

    opened by shweta-fourkites 1
  • Large-scale unrecoverable failure of consumer groups.

    Large-scale unrecoverable failure of consumer groups.

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

    | Sarama | Kafka | Go | |--------|-------|----| | 1.36.0 | 2.8.12 | 1.14-1.19 |

    Configuration

    What configuration values are you using for Sarama and Kafka?

    k := sarama.NewConfig()
    k.Consumer.Offsets.Initial = sarama.OffsetNewest
    
    Logs

    When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

    logs: CLICK ME

    [2022-09-19T11:14:28.416+08:00] DBG Kafka: kafka: error while consuming *****/24: kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition
    [2022-09-19T11:14:28.417+08:00] DBG Kafka: kafka: error while consuming *****/3: kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition
    [2022-09-19T11:14:28.417+08:00] DBG Kafka: kafka: error while consuming *****/6: kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition
    [2022-09-19T11:14:28.417+08:00] DBG Kafka: kafka: error while consuming *****/18: kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition
    [2022-09-19T11:14:28.417+08:00] DBG Kafka: consumergroup/session/test-1663557258425244539-a9819ae8-4030-4529-a3c8-6a5d5067e684/944 heartbeat loop stopped
    [2022-09-19T11:14:28.417+08:00] DBG Kafka: consumergroup/session/test-1663557258425244539-a9819ae8-4030-4529-a3c8-6a5d5067e684/944 released
    [2022-09-19T11:14:28.417+08:00] DBG Kafka: Initializing new client
    [2022-09-19T11:14:28.417+08:00] DBG Kafka: client/metadata fetching metadata for all topics from broker *****:9092
    [2022-09-19T11:14:28.417+08:00] DBG Kafka: Connected to broker at *****:9092 (unregistered)
    [2022-09-19T11:14:28.418+08:00] DBG Kafka: client/brokers registered new broker #476156 at *****:9092
    [2022-09-19T11:14:28.418+08:00] DBG Kafka: client/brokers registered new broker #476157 at *****:9092
    [2022-09-19T11:14:28.418+08:00] DBG Kafka: client/brokers registered new broker #476158 at *****:9092
    [2022-09-19T11:14:28.418+08:00] DBG Kafka: Successfully initialized new client
    [2022-09-19T11:14:28.418+08:00] DBG Kafka: client/metadata fetching metadata for [*****] from broker *****:9092
    [2022-09-19T11:14:28.419+08:00] DBG Kafka: client/coordinator requesting coordinator for consumergroup ***** from *****:9092
    [2022-09-19T11:14:28.419+08:00] DBG Kafka: client/coordinator coordinator for consumergroup ***** is #476157 (*****:9092)
    [2022-09-19T11:14:28.420+08:00] DBG Kafka: Connected to broker at *****:9092 (registered as #476157)
    

    Problem Description

    https://github.com/Shopify/sarama/pull/2252

    Has this new feature been tested at scale? At present, our production environment has experienced large-scale consumption stagnation after upgrading to the latest version, and the failure cannot be recovered by restarting the program. The final analysis determined that it was caused by this new feature (not fully backward compatible, heartbeat loop stopped).

    opened by edoger 5
Releases(v1.37.0)
  • v1.37.0(Sep 27, 2022)

    What's Changed

    :tada: New Features / Improvements

    • feat(consumer): support multiple balance strategies by @Jacob-bzx in https://github.com/Shopify/sarama/pull/2339
    • feat(producer): transactional API by @ryarnyah in https://github.com/Shopify/sarama/pull/2295
    • feat(mocks): support key in MockFetchResponse. by @Skandalik in https://github.com/Shopify/sarama/pull/2328

    :bug: Fixes

    • fix: avoid panic when Metadata.RefreshFrequency is 0 by @Jacob-bzx in https://github.com/Shopify/sarama/pull/2329
    • fix(consumer): avoid pushing unrelated responses to paused children by @pkoutsovasilis in https://github.com/Shopify/sarama/pull/2317
    • fix: prevent metrics leak with cleanup by @auntan in https://github.com/Shopify/sarama/pull/2340
    • fix: race condition(may panic) when closing consumer group by @Jacob-bzx in https://github.com/Shopify/sarama/pull/2331
    • fix(consumer): default ResetInvalidOffsets to true by @dnwe in https://github.com/Shopify/sarama/pull/2345
    • Validate the Config when creating a mock producer/consumer by @joewreschnig in https://github.com/Shopify/sarama/pull/2327

    :package: Dependency updates

    • chore(deps): bump module github.com/pierrec/lz4/v4 to v4.1.16 by @dnwe in https://github.com/Shopify/sarama/pull/2335
    • chore(deps): bump golang.org/x/net digest to bea034e by @dnwe in https://github.com/Shopify/sarama/pull/2333
    • chore(deps): bump golang.org/x/sync digest to 7f9b162 by @dnwe in https://github.com/Shopify/sarama/pull/2334
    • chore(deps): bump golang.org/x/net digest to f486391 by @dnwe in https://github.com/Shopify/sarama/pull/2348
    • chore(deps): bump module github.com/shopify/toxiproxy/v2 to v2.5.0 by @dnwe in https://github.com/Shopify/sarama/pull/2336
    • chore(deps): bump module github.com/klauspost/compress to v1.15.11 by @dnwe in https://github.com/Shopify/sarama/pull/2349
    • chore(deps): bump module github.com/pierrec/lz4/v4 to v4.1.17 by @dnwe in https://github.com/Shopify/sarama/pull/2350

    :wrench: Maintenance

    • chore(ci): bump kafka-versions to latest by @dnwe in https://github.com/Shopify/sarama/pull/2346
    • chore(ci): bump go-versions to N and N-1 by @dnwe in https://github.com/Shopify/sarama/pull/2347

    New Contributors

    • @Jacob-bzx made their first contribution in https://github.com/Shopify/sarama/pull/2329
    • @pkoutsovasilis made their first contribution in https://github.com/Shopify/sarama/pull/2317
    • @Skandalik made their first contribution in https://github.com/Shopify/sarama/pull/2328
    • @auntan made their first contribution in https://github.com/Shopify/sarama/pull/2340
    • @ryarnyah made their first contribution in https://github.com/Shopify/sarama/pull/2295

    Full Changelog: https://github.com/Shopify/sarama/compare/v1.36.0...v1.37.0

    Source code(tar.gz)
    Source code(zip)
  • v1.36.0(Aug 11, 2022)

    What's Changed

    :tada: New Features / Improvements

    • feat: add option to propagate OffsetOutOfRange error by @dkolistratova in https://github.com/Shopify/sarama/pull/2252
    • feat(producer): expose ProducerMessage.byteSize() function by @k8scat in https://github.com/Shopify/sarama/pull/2315
    • feat(metrics): track consumer fetch request rates by @dnwe in https://github.com/Shopify/sarama/pull/2299

    :bug: Fixes

    • fix(consumer): avoid submitting empty fetch requests when paused by @raulnegreiros in https://github.com/Shopify/sarama/pull/2143

    :package: Dependency updates

    • chore(deps): bump module github.com/klauspost/compress to v1.15.9 by @dnwe in https://github.com/Shopify/sarama/pull/2304
    • chore(deps): bump golang.org/x/net digest to c7608f3 by @dnwe in https://github.com/Shopify/sarama/pull/2301
    • chore(deps): bump golangci/golangci-lint-action action to v3 by @dnwe in https://github.com/Shopify/sarama/pull/2311
    • chore(deps): bump golang.org/x/net digest to 07c6da5 by @dnwe in https://github.com/Shopify/sarama/pull/2307
    • chore(deps): bump github actions versions (major) by @dnwe in https://github.com/Shopify/sarama/pull/2313
    • chore(deps): bump module github.com/jcmturner/gofork to v1.7.6 by @dnwe in https://github.com/Shopify/sarama/pull/2305
    • chore(deps): bump golang.org/x/sync digest to 886fb93 by @dnwe in https://github.com/Shopify/sarama/pull/2302
    • chore(deps): bump module github.com/jcmturner/gokrb5/v8 to v8.4.3 by @dnwe in https://github.com/Shopify/sarama/pull/2303

    :wrench: Maintenance

    • chore: add kafka 3.1.1 to the version matrix by @dnwe in https://github.com/Shopify/sarama/pull/2300

    :heavy_plus_sign: Other Changes

    • Migrate off probot-CLA to new GitHub Action by @cursedcoder in https://github.com/Shopify/sarama/pull/2294
    • Forgot to remove cla probot by @cursedcoder in https://github.com/Shopify/sarama/pull/2297
    • chore(lint): re-enable a small amount of go-critic by @dnwe in https://github.com/Shopify/sarama/pull/2312

    New Contributors

    • @cursedcoder made their first contribution in https://github.com/Shopify/sarama/pull/2294
    • @dkolistratova made their first contribution in https://github.com/Shopify/sarama/pull/2252
    • @k8scat made their first contribution in https://github.com/Shopify/sarama/pull/2315

    Full Changelog: https://github.com/Shopify/sarama/compare/v1.35.0...v1.36.0

    Source code(tar.gz)
    Source code(zip)
  • v1.35.0(Jul 22, 2022)

    What's Changed

    :bug: Fixes

    • fix: fix metadata retry backoff invalid when get metadata failed by @Stephan14 in https://github.com/Shopify/sarama/pull/2256
    • fix(balance): sort and de-deplicate memberIDs by @dnwe in https://github.com/Shopify/sarama/pull/2285
    • fix: prevent DescribeLogDirs hang in admin client by @zerowidth in https://github.com/Shopify/sarama/pull/2269
    • fix: include assignment-less members in SyncGroup by @dnwe in https://github.com/Shopify/sarama/pull/2292

    :package: Dependency updates

    • chore(deps): bump module github.com/stretchr/testify to v1.8.0 by @dnwe in https://github.com/Shopify/sarama/pull/2284
    • chore(deps): bump module github.com/eapache/go-resiliency to v1.3.0 by @dnwe in https://github.com/Shopify/sarama/pull/2283
    • chore(deps): bump golang.org/x/net digest to 1185a90 by @dnwe in https://github.com/Shopify/sarama/pull/2279
    • chore(deps): bump module github.com/pierrec/lz4/v4 to v4.1.15 by @dnwe in https://github.com/Shopify/sarama/pull/2281
    • chore(deps): bump module github.com/klauspost/compress to v1.15.8 by @dnwe in https://github.com/Shopify/sarama/pull/2280

    :wrench: Maintenance

    • chore: rename any func to avoid identifier by @dnwe in https://github.com/Shopify/sarama/pull/2272
    • chore: add and test against kafka 3.2.0 by @dnwe in https://github.com/Shopify/sarama/pull/2288
    • chore: document Fetch protocol fields by @dnwe in https://github.com/Shopify/sarama/pull/2289

    :heavy_plus_sign: Other Changes

    • chore(ci): fix redirect with GITHUB_STEP_SUMMARY by @dnwe in https://github.com/Shopify/sarama/pull/2286
    • fix(test): permit ECONNRESET in TestInitProducerID by @dnwe in https://github.com/Shopify/sarama/pull/2287
    • fix: ensure empty or devel version valid by @dnwe in https://github.com/Shopify/sarama/pull/2291

    New Contributors

    • @zerowidth made their first contribution in https://github.com/Shopify/sarama/pull/2269

    Full Changelog: https://github.com/Shopify/sarama/compare/v1.34.1...v1.35.0

    Source code(tar.gz)
    Source code(zip)
  • v1.34.1(Jun 7, 2022)

    What's Changed

    :bug: Fixes

    • fix(examples): check session.Context().Done() in examples/consumergroup by @zxc111 in https://github.com/Shopify/sarama/pull/2240
    • fix(protocol): move AuthorizedOperations into GroupDescription of DescribeGroupsResponse by @aiquestion in https://github.com/Shopify/sarama/pull/2247
    • fix(protocol): tidyup DescribeGroupsResponse by @dnwe in https://github.com/Shopify/sarama/pull/2248
    • fix(consumer): range balance strategy not like reference by @njhartwell in https://github.com/Shopify/sarama/pull/2245

    :wrench: Maintenance

    • chore(ci): experiment with using tparse by @dnwe in https://github.com/Shopify/sarama/pull/2236
    • chore(deps): bump thirdparty dependencies to latest releases by @dnwe in https://github.com/Shopify/sarama/pull/2242

    New Contributors

    • @zxc111 made their first contribution in https://github.com/Shopify/sarama/pull/2240
    • @njhartwell made their first contribution in https://github.com/Shopify/sarama/pull/2245

    Full Changelog: https://github.com/Shopify/sarama/compare/v1.34.0...v1.34.1

    Source code(tar.gz)
    Source code(zip)
  • v1.34.0(May 30, 2022)

    What's Changed

    :tada: New Features / Improvements

    • KIP-345: support static membership by @aiquestion in https://github.com/Shopify/sarama/pull/2230

    :bug: Fixes

    • fix: KIP-368 use receiver goroutine to process all sasl v1 responses by @k-wall in https://github.com/Shopify/sarama/pull/2234

    :wrench: Maintenance

    • chore(deps): bump module github.com/pierrec/lz4 to v4 by @dnwe in https://github.com/Shopify/sarama/pull/2231
    • chore(deps): bump golang.org/x/net digest to 2e3eb7b by @dnwe in https://github.com/Shopify/sarama/pull/2232

    New Contributors

    • @aiquestion made their first contribution in https://github.com/Shopify/sarama/pull/2230

    Full Changelog: https://github.com/Shopify/sarama/compare/v1.33.0...v1.34.0

    Source code(tar.gz)
    Source code(zip)
  • v1.33.0(May 11, 2022)

    What's Changed

    :rotating_light: Breaking Changes

    Note: with this change, the user of Sarama is required to use Go 1.13's errors.Is etc (rather then ==) when forming conditionals returned by this library.

    • feat: make ErrOutOfBrokers wrap the underlying error that prevented connections to the brokers by @k-wall in https://github.com/Shopify/sarama/pull/2131

    :tada: New Features / Improvements

    • feat(message): add UnmarshalText method to CompressionCodec by @vincentbernat in https://github.com/Shopify/sarama/pull/2172
    • KIP-368 : Allow SASL Connections to Periodically Re-Authenticate by @k-wall in https://github.com/Shopify/sarama/pull/2197
    • feat: add batched CreateACLs func to ClusterAdmin by @nkostoulas in https://github.com/Shopify/sarama/pull/2191

    :bug: Fixes

    • fix: TestRecordBatchDecoding failing sporadically by @k-wall in https://github.com/Shopify/sarama/pull/2154
    • feat(test): add an fvt for broker deadlock by @dnwe in https://github.com/Shopify/sarama/pull/2144
    • fix: avoid starvation in subscriptionManager by @dnwe in https://github.com/Shopify/sarama/pull/2109
    • fix: remove "Is your cluster reachable?" from msg by @dnwe in https://github.com/Shopify/sarama/pull/2165
    • fix: remove trailing fullstop from error strings by @dnwe in https://github.com/Shopify/sarama/pull/2166
    • fix: return underlying sasl error message by @dnwe in https://github.com/Shopify/sarama/pull/2164
    • fix: potential data race on a global variable by @pior in https://github.com/Shopify/sarama/pull/2171
    • fix: AdminClient | CreateACLs | check for error in response, return error if needed by @omris94 in https://github.com/Shopify/sarama/pull/2185
    • producer: ensure that the management message (fin) is never "leaked" by @niamster in https://github.com/Shopify/sarama/pull/2182
    • fix: prevent RefreshBrokers leaking old brokers by @k-wall in https://github.com/Shopify/sarama/pull/2203
    • fix: prevent RefreshController leaking controller by @k-wall in https://github.com/Shopify/sarama/pull/2204
    • fix: prevent AsyncProducer retryBatch from leaking by @k-wall in https://github.com/Shopify/sarama/pull/2208
    • fix: prevent metrics leak when authenticate fails by @Stephan14 in https://github.com/Shopify/sarama/pull/2205
    • fix: prevent deadlock between subscription manager and consumer goroutines by @niamster in https://github.com/Shopify/sarama/pull/2194
    • fix: prevent idempotent producer epoch exhaustion by @ladislavmacoun in https://github.com/Shopify/sarama/pull/2178
    • fix(test): mockbroker offsetResponse vers behavior by @dnwe in https://github.com/Shopify/sarama/pull/2213
    • fix: cope with OffsetsLoadInProgress on Join+Sync by @dnwe in https://github.com/Shopify/sarama/pull/2214
    • fix: make default MaxWaitTime 500ms by @dnwe in https://github.com/Shopify/sarama/pull/2227

    :package: Dependency updates

    • chore(deps): bump xdg-go/scram and klauspost/compress by @dnwe in https://github.com/Shopify/sarama/pull/2170

    :wrench: Maintenance

    • fix(test): skip TestReadOnlyAndAllCommittedMessages by @dnwe in https://github.com/Shopify/sarama/pull/2161
    • fix(test): remove t.Parallel() by @dnwe in https://github.com/Shopify/sarama/pull/2162
    • chore(ci): bump along to Go 1.17+1.18 and bump golangci-lint by @dnwe in https://github.com/Shopify/sarama/pull/2183
    • chore: switch to multi-arch compatible docker images by @dnwe in https://github.com/Shopify/sarama/pull/2210

    :heavy_plus_sign: Other Changes

    • Remediate a number go-routine leaks (mainly test issues) by @k-wall in https://github.com/Shopify/sarama/pull/2198
    • chore: retract v1.32.0 due to #2150 by @dnwe in https://github.com/Shopify/sarama/pull/2199
    • chore: bump functional test timeout to 12m by @dnwe in https://github.com/Shopify/sarama/pull/2200
    • fix(admin): make DeleteRecords err consistent by @dnwe in https://github.com/Shopify/sarama/pull/2226

    New Contributors

    • @k-wall made their first contribution in https://github.com/Shopify/sarama/pull/2154
    • @pior made their first contribution in https://github.com/Shopify/sarama/pull/2171
    • @omris94 made their first contribution in https://github.com/Shopify/sarama/pull/2185
    • @vincentbernat made their first contribution in https://github.com/Shopify/sarama/pull/2172
    • @niamster made their first contribution in https://github.com/Shopify/sarama/pull/2182
    • @ladislavmacoun made their first contribution in https://github.com/Shopify/sarama/pull/2178
    • @nkostoulas made their first contribution in https://github.com/Shopify/sarama/pull/2191

    Full Changelog: https://github.com/Shopify/sarama/compare/v1.32.0...v1.33.0

    Source code(tar.gz)
    Source code(zip)
  • v1.32.0(Feb 24, 2022)

    ⚠️ This release has been superseded by v1.33.0 and should not be used.

    • chore: retract v1.32.0 due to #2150 by @dnwe in https://github.com/Shopify/sarama/pull/2199

    What's Changed

    :bug: Fixes

    • Fix deadlock when closing Broker in brokerProducer by @slaunay in https://github.com/Shopify/sarama/pull/2133

    :package: Dependency updates

    • chore: refresh dependencies to latest by @dnwe in https://github.com/Shopify/sarama/pull/2159

    :wrench: Maintenance

    • fix: rework RebalancingMultiplePartitions test by @dnwe in https://github.com/Shopify/sarama/pull/2130
    • fix(test): use Sarama transactional producer by @dnwe in https://github.com/Shopify/sarama/pull/1939
    • chore: enable t.Parallel() wherever possible by @dnwe in https://github.com/Shopify/sarama/pull/2138

    :heavy_plus_sign: Other Changes

    • chore: restrict to 1 testbinary at once by @dnwe in https://github.com/Shopify/sarama/pull/2145
    • chore: restrict to 1 parallel test at once by @dnwe in https://github.com/Shopify/sarama/pull/2146
    • Remove myself from codeowners by @bai in https://github.com/Shopify/sarama/pull/2147
    • chore: add retractions for known bad versions by @dnwe in https://github.com/Shopify/sarama/pull/2160

    Full Changelog: https://github.com/Shopify/sarama/compare/v1.31.1...v1.32.0

    Source code(tar.gz)
    Source code(zip)
  • v1.31.1(Feb 1, 2022)

    What's Changed

    :tada: New Features / Improvements

    • feat: add method MockApiVersionsResponse.SetApiKeys by @wuhuizuo in https://github.com/Shopify/sarama/pull/2117
    • Expose the TLS connection state of a broker connection by @seveas in https://github.com/Shopify/sarama/pull/2051
    • feat: add methods to pause/resume consumer's consumption by @raulnegreiros in https://github.com/Shopify/sarama/pull/2005

    :bug: Fixes

    • fix: ensure heartbeats only stop after cleanup by @dnwe in https://github.com/Shopify/sarama/pull/2110

    :package: Dependency updates

    • Update klauspost/compress to 0.14 by @bai in https://github.com/Shopify/sarama/pull/2123

    :wrench: Maintenance

    • Fix typo by @mosceo in https://github.com/Shopify/sarama/pull/2113
    • Add Kafka 3.1.0 version number by @bai in https://github.com/Shopify/sarama/pull/2119
    • fix(test): make it simpler to re-use toxiproxy by @dnwe in https://github.com/Shopify/sarama/pull/2122
    • Add Kafka 3.1.0 to CI matrix, migrate to bitnami kafka image by @bai in https://github.com/Shopify/sarama/pull/2124
    • Populate missing kafka versions by @bai in https://github.com/Shopify/sarama/pull/2126
    • Backport missing changelog entries from 1.28 onwards by @bai in https://github.com/Shopify/sarama/pull/2127

    New Contributors

    • @wuhuizuo made their first contribution in https://github.com/Shopify/sarama/pull/2117
    • @seveas made their first contribution in https://github.com/Shopify/sarama/pull/2051
    • @raulnegreiros made their first contribution in https://github.com/Shopify/sarama/pull/2005

    Full Changelog: https://github.com/Shopify/sarama/compare/v1.31.0...v1.31.1

    Source code(tar.gz)
    Source code(zip)
  • v1.31.0(Jan 18, 2022)

    What's Changed

    :tada: New Features / Improvements

    • feat: expose IncrementalAlterConfigs API in admin.go by @fengyinqiao in https://github.com/Shopify/sarama/pull/2088
    • feat: allow AsyncProducer to have MaxOpenRequests inflight produce requests per broker by @xujianhai666 in https://github.com/Shopify/sarama/pull/1686
    • Support request pipelining in AsyncProducer by @slaunay in https://github.com/Shopify/sarama/pull/2094

    :bug: Fixes

    • fix(test): add fluent interface for mocks where missing by @grongor in https://github.com/Shopify/sarama/pull/2080
    • fix(test): test for ConsumePartition with OffsetOldest by @grongor in https://github.com/Shopify/sarama/pull/2081
    • fix: set HWMO during creation of partitionConsumer (fix incorrect HWMO before first fetch) by @grongor in https://github.com/Shopify/sarama/pull/2082
    • fix: ignore non-nil but empty error strings in Describe/Alter client quotas responses by @agriffaut in https://github.com/Shopify/sarama/pull/2096
    • fix: skip over KIP-482 tagged fields by @dnwe in https://github.com/Shopify/sarama/pull/2107
    • fix: clear preferredReadReplica if broker shutdown by @dnwe in https://github.com/Shopify/sarama/pull/2108
    • fix(test): correct wrong offsets in mock Consumer by @grongor in https://github.com/Shopify/sarama/pull/2078
    • fix: correct bugs in DescribeGroupsResponse by @dnwe in https://github.com/Shopify/sarama/pull/2111

    :wrench: Maintenance

    • chore: bump runtime and test dependencies by @dnwe in https://github.com/Shopify/sarama/pull/2100

    :memo: Documentation

    • docs: refresh README.md for Kafka 3.0.0 by @dnwe in https://github.com/Shopify/sarama/pull/2099

    :heavy_plus_sign: Other Changes

    • Fix typo by @mosceo in https://github.com/Shopify/sarama/pull/2084

    New Contributors

    • @grongor made their first contribution in https://github.com/Shopify/sarama/pull/2080
    • @fengyinqiao made their first contribution in https://github.com/Shopify/sarama/pull/2088
    • @xujianhai666 made their first contribution in https://github.com/Shopify/sarama/pull/1686
    • @mosceo made their first contribution in https://github.com/Shopify/sarama/pull/2084

    Full Changelog: https://github.com/Shopify/sarama/compare/v1.30.1...v1.31.0

    Source code(tar.gz)
    Source code(zip)
  • v1.30.1(Dec 4, 2021)

    What's Changed

    :tada: New Features / Improvements

    • feat(zstd): pass level param through to compress/zstd encoder by @lizthegrey in https://github.com/Shopify/sarama/pull/2045

    :bug: Fixes

    • fix: set min-go-version to 1.16 by @troyanov in https://github.com/Shopify/sarama/pull/2048
    • logger: fix debug logs' formatting directives by @utrack in https://github.com/Shopify/sarama/pull/2054
    • fix: stuck on the batch with zero records length by @pachmu in https://github.com/Shopify/sarama/pull/2057
    • fix: only update preferredReadReplica if valid by @dnwe in https://github.com/Shopify/sarama/pull/2076

    :wrench: Maintenance

    • chore: add release notes configuration by @dnwe in https://github.com/Shopify/sarama/pull/2046
    • chore: confluent platform version bump by @lizthegrey in https://github.com/Shopify/sarama/pull/2070

    Notes

    • ℹ️ from Sarama 1.30.x onward the minimum version of Go toolchain required is 1.16.x

    New Contributors

    • @troyanov made their first contribution in https://github.com/Shopify/sarama/pull/2048
    • @lizthegrey made their first contribution in https://github.com/Shopify/sarama/pull/2045
    • @utrack made their first contribution in https://github.com/Shopify/sarama/pull/2054
    • @pachmu made their first contribution in https://github.com/Shopify/sarama/pull/2057

    Full Changelog: https://github.com/Shopify/sarama/compare/v1.30.0...v1.30.1

    Source code(tar.gz)
    Source code(zip)
  • v1.30.0(Sep 29, 2021)

    ⚠️ This release has been superseded by v1.30.1 and should not be used.

    regression: enabling rackawareness causes severe throughput drops (#2071) — fixed in v1.30.1 via #2076


    ℹ️ Note: from Sarama 1.30.0 the minimum version of Go toolchain required is 1.16.x


    New Features / Improvements

    • #1983 - @zifengyu - allow configure AllowAutoTopicCreation argument in metadata refresh
    • #2000 - @matzew - Using xdg-go module for SCRAM
    • #2003 - @gdm85 - feat: add counter metrics for consumer group join/sync and their failures
    • #1992 - @zhaomoran - feat: support SaslHandshakeRequest v0 for SCRAM
    • #2006 - @faillefer - Add support for DeleteOffsets operation
    • #1909 - @agriffaut - KIP-546 Client quota APIs
    • #1633 - @aldelucca1 - feat: allow balance strategies to provide initial state
    • #1275 - @dnwe - log: add a DebugLogger that proxies to Logger
    • #2018 - @dnwe - feat: use DebugLogger reference for goldenpath log
    • #2019 - @dnwe - feat: add logging & a metric for producer throttle
    • #2023 - @dnwe - feat: add Controller() to ClusterAdmin interface
    • #2025 - @dnwe - feat: support ApiVersionsRequest V3 protocol
    • #2028 - @dnwe - feat: send ApiVersionsRequest on broker open
    • #2034 - @bai - Add support for kafka 3.0.0

    Fixes

    • #1990 - @doxsch - fix: correctly pass ValidateOnly through to CreatePartitionsRequest
    • #1988 - @LubergAlexander - fix: correct WithCustomFallbackPartitioner implementation
    • #2001 - @HurSungYun - docs: inform AsyncProducer Close pitfalls
    • #1973 - @qiangmzsx - fix: metrics still taking up too much memory when metrics.UseNilMetrics=true
    • #2007 - @bai - Add support for Go 1.17
    • #2009 - @dnwe - fix: enable nilerr linter and fix iferr checks
    • #2010 - @dnwe - chore: enable exportloopref and misspell linters
    • #2013 - @faillefer - fix(test): disable encoded response/request check when map contains multiple elements
    • #2015 - @bai - Change default branch to main
    • #1718 - @crivera-fastly - fix: correct the error handling in client.InitProducerID()
    • #1984 - @null-sleep - fix(test): bump confluentPlatformVersion from 6.1.1 to 6.2.0
    • #2016 - @dnwe - chore: replace deprecated Go calls
    • #2017 - @dnwe - chore: delete legacy vagrant script
    • #2020 - @dnwe - fix(test): remove testLogger from TrackLeader test
    • #2024 - @dnwe - chore: bump toxiproxy container to v2.1.5
    • #2033 - @bai - Update dependencies
    • #2031 - @gdm85 - docs: do not mention buffered messages in sync producer Close method
    • #2035 - @dnwe - chore: populate the missing kafka versions
    • #2038 - @dnwe - feat: add a fuzzing workflow to github actions

    New Contributors

    • @zifengyu made their first contribution in https://github.com/Shopify/sarama/pull/1983
    • @doxsch made their first contribution in https://github.com/Shopify/sarama/pull/1990
    • @LubergAlexander made their first contribution in https://github.com/Shopify/sarama/pull/1988
    • @HurSungYun made their first contribution in https://github.com/Shopify/sarama/pull/2001
    • @gdm85 made their first contribution in https://github.com/Shopify/sarama/pull/2003
    • @qiangmzsx made their first contribution in https://github.com/Shopify/sarama/pull/1973
    • @zhaomoran made their first contribution in https://github.com/Shopify/sarama/pull/1992
    • @faillefer made their first contribution in https://github.com/Shopify/sarama/pull/2006
    • @crivera-fastly made their first contribution in https://github.com/Shopify/sarama/pull/1718
    • @null-sleep made their first contribution in https://github.com/Shopify/sarama/pull/1984

    Full Changelog: https://github.com/Shopify/sarama/compare/v1.29.1...v1.30.0

    Source code(tar.gz)
    Source code(zip)
  • v1.29.1(Jun 24, 2021)

    New Features / Improvements

    • #1966 - @ajanikow - KIP-339: Add Incremental Config updates API
    • #1964 - @ajanikow - Add DelegationToken ResourceType

    Fixes

    • #1962 - @hanxiaolin - fix(consumer): call interceptors when MaxProcessingTime expire
    • #1971 - @KerryJava - fix kafka-producer-performance throughput panic
    • #1968 - @dnwe - chore: bump golang.org/x versions
    • #1956 - @joewreschnig - Allow checking the entire ProducerMessage in the mock producers
    • #1963 - @dnwe - fix: ensure backoff timer is re-used
    • #1949 - @dnwe - fix: explicitly use uint64 for payload length
    Source code(tar.gz)
    Source code(zip)
  • v1.29.0(May 7, 2021)

    New Features / Improvements

    • #1917 - @arkady-emelyanov - KIP-554: Add Broker-side SCRAM Config API
    • #1869 - @wyndhblb - zstd: encode+decode performance improvements
    • #1541 - @izolight - add String, (Un)MarshalText for acl types.
    • #1921 - @bai - Add support for Kafka 2.8.0

    Fixes

    • #1936 - @dnwe - fix(consumer): follow preferred broker
    • #1933 - @ozzieba - Use gofork for encoding/asn1 to fix ASN errors during Kerberos authentication
    • #1929 - @celrenheit - Handle isolation level in Offset(Request|Response) and require stable offset in FetchOffset(Request|Response)
    • #1926 - @dnwe - fix: correct initial CodeQL findings
    • #1925 - @bai - Test out CodeQL
    • #1923 - @bestgopher - Remove redundant switch-case, fix doc typos
    • #1922 - @bai - Update go dependencies
    • #1898 - @mmaslankaprv - Parsing only known control batches value
    • #1887 - @withshubh - Fix: issues affecting code quality
    Source code(tar.gz)
    Source code(zip)
  • v1.28.0(Feb 15, 2021)

    Note that with this release we change RoundRobinBalancer strategy to match Java client behavior. See #1788 for details.

    • #1870 - @kvch - Update Kerberos library to latest major
    • #1876 - @bai - Update docs, reference pkg.go.dev
    • #1846 - @wclaeys - Do not ignore Consumer.Offsets.AutoCommit.Enable config on Close
    • #1747 - @XSAM - fix: mock sync producer does not handle the offset while sending messages
    • #1863 - @bai - Add support for Kafka 2.7.0 + update lz4 and klauspost/compress dependencies
    • #1788 - @kzinglzy - feat[balance_strategy]: announcing a new round robin balance strategy
    • #1862 - @bai - Fix CI setenv permissions issues
    • #1832 - @ilyakaznacheev - Update Godoc link to pkg.go.dev
    • #1822 - @danp - KIP-392: Allow consumers to fetch from closest replica
    Source code(tar.gz)
    Source code(zip)
  • v1.27.2(Oct 21, 2020)

    Improvements

    #1750 - @krantideep95 Adds missing mock responses for mocking consumer group

    Fixes

    #1817 - reverts #1785 - Add private method to Client interface to prevent implementation

    Source code(tar.gz)
    Source code(zip)
  • v1.27.1(Oct 7, 2020)

    Improvements

    #1775 - @d1egoaz - Adds a Producer Interceptor example #1781 - @justin-chen - Refresh brokers given list of seed brokers #1784 - @justin-chen - Add randomize seed broker method #1790 - @d1egoaz - remove example binary #1798 - @bai - Test against Go 1.15 #1785 - @justin-chen - Add private method to Client interface to prevent implementation #1802 - @uvw - Support Go 1.13 error unwrapping

    Fixes

    #1791 - @stanislavkozlovski - bump default version to 1.0.0

    Source code(tar.gz)
    Source code(zip)
  • v1.27.0(Aug 11, 2020)

    Improvements

    #1466 - @rubenvp8510 - Expose kerberos fast negotiation configuration #1695 - @KJTsanaktsidis - Use docker-compose to run the functional tests #1699 - @wclaeys - Consumer group support for manually comitting offsets #1714 - @bai - Bump Go to version 1.14.3, golangci-lint to 1.27.0 #1726 - @d1egoaz - Include zstd on the functional tests #1730 - @d1egoaz - KIP-42 Add producer and consumer interceptors #1738 - @varun06 - fixed variable names that are named same as some std lib package names #1741 - @varun06 - updated zstd dependency to latest v1.10.10 #1743 - @varun06 - Fixed declaration dependencies and other lint issues in code base #1763 - @alrs - remove deprecated tls options from test #1769 - @bai - Add support for Kafka 2.6.0

    Fixes

    #1697 - @kvch - Use gofork for encoding/asn1 to fix ASN errors during Kerberos authentication #1744 - @alrs - Fix isBalanced Function Signature

    Source code(tar.gz)
    Source code(zip)
  • v1.26.4(May 19, 2020)

  • v1.26.3(May 7, 2020)

  • v1.26.2(May 6, 2020)

    ⚠️ Known Issues

    This release has been marked as not ready for production and may be unstable, please use v1.26.4.

    Improvements

    • #1560 - @iyacontrol - add sync pool for gzip 1-9
    • #1605 - @dnwe - feat: protocol support for V11 fetch w/ rackID
    • #1617 - @sladkoff / @dwi-di / @random-dwi - Add support for alter/list partition reassignements APIs
    • #1632 - @bai - Add support for Go 1.14
    • #1640 - @random-dwi - Feature/fix list partition reassignments
    • #1646 - @mimaison - Add DescribeLogDirs to admin client
    • #1667 - @bai - Add support for kafka 2.5.0

    Fixes

    • #1594 - @sladkoff - Sets ConfigEntry.Default flag in addition to the ConfigEntry.Source for Kafka versions > V1_1_0_0
    • #1601 - @alrs - fix: remove use of testing.T.FailNow() inside goroutine
    • #1602 - @d1egoaz - adds a note about consumer groups Consume method
    • #1607 - @darklore - Fix memory leak when Broker.Open and Broker.Close called repeatedly
    • #1613 - @wblakecaldwell - Updated "retrying" log message when BackoffFunc implemented
    • #1614 - @alrs - produce_response.go: Remove Unused Functions
    • #1619 - @alrs - tools/kafka-producer-performance: prune unused flag variables
    • #1639 - @agriffaut - Handle errors with no message but error code
    • #1643 - @kzinglzy - fix config.net.keepalive
    • #1644 - @KJTsanaktsidis - Fix brokers continually allocating new Session IDs
    • #1645 - @Stephan14 - Remove broker(s) which no longer exist in metadata
    • #1650 - @lavoiesl - Return the response error in heartbeatLoop
    • #1661 - @KJTsanaktsidis - Fix "broker received out of order sequence" when brokers die
    • #1666 - @KevinJCross - Bugfix: Allow TLS connections to work over socks proxy.
    Source code(tar.gz)
    Source code(zip)
  • v1.26.1(Feb 4, 2020)

    ⚠️ This release has been superseded by v1.26.4 and should not be used.

    Fetch requests will cause the Kafka broker to continously allocate new Fetch sessions in its cache on every request. Fixed in v1.26.2 via https://github.com/Shopify/sarama/pull/1644


    Improvements

    • Add requests-in-flight metric (1539)
    • Fix misleading example for cluster admin (1595)
    • Replace Travis with GitHub Actions, linters housekeeping (1573)
    • Allow BalanceStrategy to provide custom assignment data (1592)

    Bug Fixes

    • Adds back Consumer.Offsets.CommitInterval to fix API (1590)
    • Fix error message s/CommitInterval/AutoCommit.Interval (1589)
    Source code(tar.gz)
    Source code(zip)
  • v1.26.0(Jan 24, 2020)

    ⚠️ Known Issues

    This release has been superceded by v1.26.4 and should not be used.

    Fetch requests will cause the Kafka broker to continously allocate new Fetch sessions in its cache on every request. Fixed in v1.26.2 via https://github.com/Shopify/sarama/pull/1644


    New Features:

    • Enable zstd compression (1574,1582)
    • Support headers in tools kafka-console-producer (1549)

    Improvements:

    • Add SASL AuthIdentity to SASL frames (authzid) (1585).

    Bug Fixes:

    • Sending messages with ZStd compression enabled fails in multiple ways (1252).
    • Use the broker for any admin on BrokerConfig (1571).
    • Set DescribeConfigRequest Version field (1576).
    • ConsumerGroup flooding logs with client/metadata update req (1578).
    • MetadataRequest version in DescribeCluster (1580).
    • Fix deadlock in consumer group handleError (1581)
    • Fill in the Fetch{Request,Response} protocol (1582).
    • Retry topic request on ControllerNotAvailable (1586).
    Source code(tar.gz)
    Source code(zip)
  • v1.25.0(Jan 13, 2020)

    ⚠️ This release still contained known issues introduced in v1.24.1 and should not be used

    Recommended to upgrade to v1.26.4 or rollback to v1.24.0

    Known Issues

    • ConsumerGroup flooding logs with client/metadata update req (1544) introduced in v1.24.1
    • Unexpected user-specified time limit error (1562) introduced in v1.24.1

    New Features:

    • Support TLS protocol in kafka-producer-performance (1538).
    • Add support for kafka 2.4.0 (1552).

    Improvements:

    • Allow the Consumer to disable auto-commit offsets (1164).
    • Produce records with consistent timestamps (1455).

    Bug Fixes:

    • Fix incorrect SetTopicMetadata name mentions (1534).
    • Fix client.tryRefreshMetadata Println (1535).
    • Fix panic on calling updateMetadata on closed client (1531).
    • Fix possible faulty metrics in TestFuncProducing (1545).
    Source code(tar.gz)
    Source code(zip)
  • v1.24.1(Oct 31, 2019)

    ⚠️ This release introduced two new major regressions over v1.24.0 and should not be used

    Known Issues

    • ConsumerGroup flooding logs with client/metadata update req (1544)
    • Unexpected user-specified time limit error (1562)

    New Features:

    • Add DescribeLogDirs Request/Response pair (1520).

    Bug Fixes:

    • Fix ClusterAdmin returning invalid controller ID on DescribeCluster (1518).
    • Fix issue with consumergroup not rebalancing when new partition is added (1525).
    • Ensure consistent use of read/write deadlines (1529).
    Source code(tar.gz)
    Source code(zip)
  • v1.24.0(Oct 9, 2019)

    New Features:

    • Add sticky partition assignor (1416).
    • Switch from cgo zstd package to pure Go implementation (1477).

    Improvements:

    • Allow creating ClusterAdmin from client (1415).
    • Set KafkaVersion in ListAcls method (1452).
    • Set request version in CreateACL ClusterAdmin method (1458).
    • Set request version in DeleteACL ClusterAdmin method (1461).
    • Handle missed error codes on TopicMetaDataRequest and GroupCoordinatorRequest (1464).
    • Remove direct usage of gofork (1465).
    • Add support for Go 1.13 (1478).
    • Improve behavior of NewMockListAclsResponse (1481).

    Bug Fixes:

    • Fix race condition in consumergroup example (1434).
    • Fix brokerProducer goroutine leak (1442).
    • Use released version of lz4 library (1469).
    • Set correct version in MockDeleteTopicsResponse (1484).
    • Fix CLI help message typo (1494).

    Known Issues:

    • Please don't use Zstd, as it doesn't work right now. See https://github.com/Shopify/sarama/issues/1252.
    Source code(tar.gz)
    Source code(zip)
  • v1.23.1(Jul 22, 2019)

  • v1.23.0(Jul 3, 2019)

    New Features:

    • Add support for Kafka 2.3.0 (1418).
    • Add support for ListConsumerGroupOffsets v2 (1374).
    • Add support for DeleteConsumerGroup (1417).
    • Add support for SASLVersion configuration (1410).
    • Add kerberos support (1366).

    Improvements:

    • Improve sasl_scram_client example (1406).
    • Fix shutdown and race-condition in consumer-group example (1404).
    • Add support for error codes 77—81 (1397).
    • Pool internal objects allocated per message (1385).
    • Reduce packet decoder allocations (1373).
    • Support timeout when fetching metadata (1359).

    Bug Fixes:

    • Fix fetch size integer overflow (1376).
    • Handle and log throttled FetchResponses (1383).
    • Refactor misspelled word Resouce to Resource (1368).
    Source code(tar.gz)
    Source code(zip)
  • v1.22.1(Apr 29, 2019)

    Improvements:

    • Use zstd 1.3.8 (1350).
    • Add support for SaslHandshakeRequest v1 (1354).

    Bug Fixes:

    • Fix V5 MetadataRequest nullable topics array (1353).
    • Use a different SCRAM client for each broker connection (1349).
    • Fix AllowAutoTopicCreation for MetadataRequest greater than v3 (1344).
    Source code(tar.gz)
    Source code(zip)
  • v1.22.0(Apr 9, 2019)

    New Features:

    • Add Offline Replicas Operation to Client (1318).
    • Allow using proxy when connecting to broker (1326).
    • Implement ReadCommitted (1307).
    • Add support for Kafka 2.2.0 (1331).
    • Add SASL SCRAM-SHA-512 and SCRAM-SHA-256 mechanismes (1331).

    Improvements:

    • Unregister all broker metrics on broker stop (1232).
    • Add SCRAM authentication example (1303).
    • Add consumergroup examples (1304).
    • Expose consumer batch size metric (1296).
    • Add TLS options to console producer and consumer (1300).
    • Reduce client close bookkeeping (1297).
    • Satisfy error interface in create responses (1154).
    • Please lint gods (1346).

    Bug Fixes:

    • Fix multi consumer group instance crash (1338).
    • Update lz4 to latest version (1347).
    • Retry ErrNotCoordinatorForConsumer in new consumergroup session (1231).
    • Fix cleanup error handler (1332).
    • Fix rate condition in PartitionConsumer (1156).
    Source code(tar.gz)
    Source code(zip)
  • v1.21.0(Feb 24, 2019)

    New Features:

    • Add CreateAclRequest, DescribeAclRequest, DeleteAclRequest (#1236).
    • Add DescribeTopic, DescribeConsumerGroup, ListConsumerGroups, ListConsumerGroupOffsets admin requests (#1178).
    • Implement SASL/OAUTHBEARER (#1240).

    Improvements:

    • Add Go mod support (#1282).
    • Add error codes 73—76 (#1239).
    • Add retry backoff function (#1160).
    • Maintain metadata in the producer even when retries are disabled (#1189).
    • Include ReplicaAssignment in ListTopics (#1274).
    • Add producer performance tool (#1222).
    • Add support LogAppend timestamps (#1258).

    Bug Fixes:

    • Fix potential deadlock when a heartbeat request fails (#1286).
    • Fix consuming compacted topic (#1227).
    • Set correct Kafka version for DescribeConfigsRequest v1 (#1277).
    • Update kafka test version (#1273).
    Source code(tar.gz)
    Source code(zip)
pubsub controller using kafka and base on sarama. Easy controll flow for actions streamming, event driven.

Psub helper for create system using kafka to streaming and events driven base. Install go get github.com/teng231/psub have 3 env variables for config

Te Nguyen 6 Sep 26, 2022
Higher level abstraction for Sarama.

kafka-do v0.1.5 kafka-do What Higher level abstraction for Sarama. Why We want to be able to write our kafka applications without making the same thin

seo.do 21 Sep 30, 2021
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

CloudHut 2.7k Oct 1, 2022
Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

Diego Hordi 0 Dec 5, 2021
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

bagher sohrabi 2 Oct 27, 2021
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Confluent Inc. 3.6k Sep 27, 2022
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Johannes Brüderl 1.7k Sep 23, 2022
CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Matheus Fidelis 25 Jul 28, 2022
franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Travis Bischel 759 Sep 25, 2022
Testing Apache Kafka using Go.

Apache Kafka Go Testing Apache Kafka using Go. Instructions Provision the single node Kafka cluster using Docker: docker-compose -p apache-kafka-go up

Gabriel Kim 0 Dec 17, 2021
provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

provider-kafka provider-kafka is a Crossplane Provider that is used to manage Kafka resources. Usage Create a provider secret containing a json like t

Crossplane Contrib 17 Sep 22, 2022
A CLI tool for interacting with Kafka through the Confluent Kafka Rest Proxy

kafkactl Table of contents kafkactl Table of contents Overview Build Development Overview kafkactl is a CLI tool to interact with Kafka through the Co

Alexandre Barone 0 Nov 1, 2021
Apache Pulsar Go Client Library

Apache Pulsar Go Client Library A Go client library for the Apache Pulsar project. Goal This projects is developing a pure-Go client library for Pulsa

The Apache Software Foundation 499 Sep 27, 2022
Producer x Consumer example using Kafka library and Go.

Go - Kafka - Example Apache Kafka Commands First of all, run the docker-compose docker-compose up, than run docker exec -it kafka_kafka_1 bash Topics

Roberto Morel 0 Dec 8, 2021
franz-go contains a high performance, pure Go library for interacting with Kafka from 0.8.0 through 2.7.0+. Producing, consuming, transacting, administrating, etc.

franz-go - Apache Kafka client written in Go Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every

Travis Bischel 761 Sep 29, 2022
kafka watcher for casbin library

Casbin Kafka Watcher Casbin watcher for kafka This watcher library will enable users to dynamically change casbin policies through kakfa messages Infl

Aruna Prabashwara 3 May 8, 2021
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Obsidian Dynamics 58 Jul 6, 2022
Kafka producer and consumer tool in protobuf format.

protokaf Kafka producer and consumer tool in protobuf format. Features Consume and produce messages using Protobuf protocol Trace messages with Jaeger

SberMarket Tech 119 Sep 29, 2022
ChizBroker is a fast and simple GRPC based implementation of kafka.

Chiz Broker: a broker for fun ChizBroker is a fast and simple GRPC based implementation of kafka. Features: Ready to be deployed on kubernetes Prometh

Sina Amininasab 41 Sep 7, 2022