Sarama is a Go library for Apache Kafka 0.8, and up.

Related tags

go kafka kafka-client
Overview

sarama

Go Reference Build Status Coverage

Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later).

Getting started

  • API documentation and examples are available via pkg.go.dev.
  • Mocks for testing are available in the mocks subpackage.
  • The examples directory contains more elaborate example applications.
  • The tools directory contains command line tools that can be useful for testing, diagnostics, and instrumentation.

You might also want to look at the Frequently Asked Questions.

Compatibility and API stability

Sarama provides a "2 releases + 2 months" compatibility guarantee: we support the two latest stable releases of Kafka and Go, and we provide a two month grace period for older releases. This means we currently officially support Go 1.14 through 1.15, and Kafka 2.5 through 2.7, although older releases are still likely to work.

Sarama follows semantic versioning and provides API stability via the gopkg.in service. You can import a version with a guaranteed stable API via http://gopkg.in/Shopify/sarama.v1. A changelog is available here.

Contributing

Issues
  • Implement a higher-level consumer group

    Implement a higher-level consumer group

    Related to my other PR #1083, this is another attempt, this time with a higher-level consumer API. I have been going back and forth on this one, I can't find a better way to make it flexible enough to support all the use cases and comprehensible at the same time. I would really appreciate some feedback.

    opened by dim 83
  • OffsetManager Implementation

    OffsetManager Implementation

    There are still some TODOs sprinkled throughout, but not all of them necessarily need to be fixed before we ship this (it's already pretty close to the minimum viable implementation). The only critical missing piece is implementing AsyncClose for the PartitionOffsetManager.

    Regardless, it's at the state where it can be opened up for comments. This should completely supersede #379.

    opened by eapache 55
  • Lockless multiproducer

    Lockless multiproducer

    CC @sirupsen @wvanbergen @burke @fw42 @graemej

    Simple non-batching synchronous tests are working. Needs batching tests, but as Burke mentioned yesterday they are very messy with the current framework.

    opened by eapache 51
  • New producer design [beta]

    New producer design [beta]

    @wvanbergen @burke @Sirupsen @graemej and whoever else might be interested.

    This is an idea for a different API for the producer that is somewhat more channel/goroutine based. I was thinking about alternative architectures after reviewing the PR which adds the channels returning failed/successful messages. I started playing with things (after also reviewing the original multiproducer PRs) and came up with this.

    It ended up being basically a complete rewrite, so reviewing the diff won't be too helpful, just read the new producer.go straight. It's very much a work-in-progress, I'm more interested in architectural thoughts right now than code nits. It seems to pass the simple tests, but that's primarily accidental, shrug.

    Misc. notes on things that might cause questions:

    • User communication (input of new messages, receipt of errors) is all done with channels now, so the common idiom I expect would be a select on those two channels to avoid blocking.
    • It doesn't have a "synchronous" mode, instead the separate SimpleProducer uses the AckSuccesses flag to effectively imitate that.
    • In the normal success path, a message ends up passing through five goroutines before being bundled into a message and passed to the Broker object to put on the wire. Not sure if this will impact performance noticeably, but it means that if any intermediate step blocks (i.e. a metadata request for a new leader for a topic) then all other messages not directly impacted will continue processing, which is nice (assuming you configure a large enough channel buffer).
    • I delayed actually converting the Key/Value elements to bytes until the very last moment. This seems cleaner, and also potentially simplifies the addition of streaming messages which somebody asked for recently.
    • Hopefully the retry path is easier to reason about than the old one, as it is exactly the same as the normal path now (the message gets resubmitted on exactly the same channel as the user submits messages on, it just has an extra flag set). It should still preserve ordering correctly though (see the leaderDispatcher logic for this).
    opened by eapache 47
  • panic: sync: negative WaitGroup counter

    panic: sync: negative WaitGroup counter

    Seeing this occasionally in production:

    Revision: 23d523386ce0c886e56c9faf1b9c78b07e5b8c90

    panic: sync: negative WaitGroup counter
    
    goroutine 444 [running]:
    sync.(*WaitGroup).Add(0xc208e6c510, 0xffffffffffffffff)
            /usr/src/go/src/sync/waitgroup.go:67 +0x96
    sync.(*WaitGroup).Done(0xc208e6c510)
            /usr/src/go/src/sync/waitgroup.go:85 +0x31
    github.com/Shopify/sarama.(*asyncProducer).returnSuccesses(0xc208e6c4d0, 0xc29ffe9600, 0x3f, 0x40)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:753 +0xd3
    github.com/Shopify/sarama.(*asyncProducer).flusher(0xc208e6c4d0, 0xc20814c540, 0xc2080b9800)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:560 +0xafe
    github.com/Shopify/sarama.func·006()
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:430 +0x43
    github.com/Shopify/sarama.withRecover(0xc20802b220)
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/utils.go:42 +0x3a
    created by github.com/Shopify/sarama.(*asyncProducer).messageAggregator
            /opt/goproj/Godeps/_workspace/src/github.com/Shopify/sarama/async_producer.go:430 +0x205
    
    bug :-( producer 
    opened by cep21 38
  • Not seeing all messages when syncProducer called from goroutines

    Not seeing all messages when syncProducer called from goroutines

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly. Sarama Version: 1.15.0 Kafka Version: 0.11.0 Go Version: 1.9.2 darwin/amd64

    Configuration

    What configuration values are you using for Sarama and Kafka?

    Sarama configuration

    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 10
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    config.Version = sarama.V0_11_0_0
    

    Kafka configuration

    broker.id=0
    delete.topic.enable=true
    listeners=PLAINTEXT://localhost:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/usr/local/Cellar/kafka/kafka-log-1
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    auto.create.topics.enable=false 
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    
    Logs

    When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

    sarama: client.go:115: Initializing new client
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: broker.go:146: Connected to broker at localhost:9092 (unregistered)
    sarama: client.go:429: client/brokers registered new broker #0 at localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (3 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (2 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:635: client/metadata retrying after 250ms... (1 attempts remaining)
    sarama: client.go:646: client/metadata fetching metadata for all topics from broker localhost:9092
    sarama: client.go:655: client/metadata found some partitions to be leaderless
    sarama: client.go:161: Successfully initialized new client
    sarama: config.go:351: ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    sarama: async_producer.go:601: producer/broker/0 starting up
    sarama: async_producer.go:612: producer/broker/0 state change to [open] on people/0
    sarama: broker.go:144: Connected to broker at localhost:9092 (registered as #0)
    
    Problem Description

    When calling SendMessage on a single instance of syncProducer from multiple goroutines, some messages seem to fail to be produced to Kafka. I've looked at what ends up on the stream using Apache's kafka-console-consumer and it shows only a fraction of the messages on the stream anywhere from half of the messages down to none. I wrote my own consumer using sarama and it's the same issue, however I get the below error message back from sarama. I want to use syncProducer because I need to guarantee that messages will be published to the stream in the order that they're received by my application. Maybe I've just implemented it wrong, but right now I'm out of ideas and I'm hoping someone on here can help me out.

    sarama: consumer.go:755: consumer/broker/0 abandoned subscription to people/0 because kafka: response did not contain all the expected topic/partition blocks
    Error: kafka: error while consuming people/0: kafka: response did not contain all the expected topic/partition blocks
    sarama: consumer.go:345: consumer/people/0 finding new broker
    sarama: client.go:644: client/metadata fetching metadata for [people] from broker localhost:9092
    sarama: consumer.go:711: consumer/broker/0 added subscription to people/0
    

    Here's how I created my topic: bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic people

    I'm running a single broker on my local machine. I've written a sample program that can reproduce the issue. It's also worth noting that none of the calls to sendMessage() are returning errors when I run the code.

    main.go

    package main
    
    import (
    	"bytes"
    	"fmt"
    	"log"
    	"strconv"
    	"sync"
    	"syncProducer/streamer"
    
    	"github.com/Shopify/sarama"
    	"github.com/linkedin/goavro"
    	uuid "github.com/satori/go.uuid"
    )
    
    const personSchema = `{
    	"type":"record",
    	"name":"Person",
    	"namespace":"com.example.people",
    	"fields":[
    		{
    			"name":"Name",
    			"type":"string"
    		},
    		{
    			"name":"Address",
    			"type":"string"
    		},{
    			"name":"City",
    			"type":"string"
    		},
    		{
    			"name":"State",
    			"type":"string"
    		},
    		{
    			"name":"ZIP",
    			"type":"long"
    		}
    	]
    }`
    
    var (
    	personCodec *goavro.Codec
    	buf         bytes.Buffer
    )
    
    type (
    	person struct {
    		Name    string
    		Address string
    		City    string
    		State   string
    		ZIP     int64
    	}
    )
    
    func main() {
    	var err error
    	personCodec, err = goavro.NewCodec(personSchema)
    	if err != nil {
    		panic(err)
    	}
    
    	producer, err := newSyncProducer()
    	if err != nil {
    		panic(err)
    	}
    	streamer := streamer.New(producer)
    
    	// Create 10 avro message bodies
    	var people [][]byte
    	for i := 1; i < 11; i++ {
    		aPerson := person{
    			Name:    "Bob #" + strconv.Itoa(i),
    			Address: strconv.Itoa(i) + " Main St.",
    			City:    "SomeTown",
    			State:   "CA",
    			ZIP:     90210,
    		}
    		data, err := convertToAvro(aPerson)
    		if err != nil {
    			panic("Could not convert aPerson " + strconv.Itoa(i) + " to avro.")
    		}
    		people = append(people, data)
    	}
    
    	errc := make(chan error, 10)
    
    	var wg sync.WaitGroup
    	// Send messages
    	for _, person := range people {
    		wg.Add(1)
    		go func(person []byte, c chan error, wg *sync.WaitGroup) {
    			uuid := uuid.NewV4().String()
    			err := streamer.SendActivity("people", uuid, "CreatePerson", person, nil)
    			c <- err
    			wg.Done()
    		}(person, errc, &wg)
    	}
    
    	wg.Wait()
    	close(errc)
    	fmt.Println("Completed!")
    	for i := range errc {
    		fmt.Println(i)
    		if i != nil {
    			fmt.Printf("Exit: %v\n", i)
    		}
    	}
    
    	fmt.Print(&buf)
    }
    
    func convertToAvro(aPerson person) ([]byte, error) {
    	data, err := personCodec.BinaryFromNative(nil, map[string]interface{}{
    		"Name":    aPerson.Name,
    		"Address": aPerson.Address,
    		"City":    aPerson.City,
    		"State":   aPerson.State,
    		"ZIP":     aPerson.ZIP,
    	})
    	if err != nil {
    		return nil, err
    	}
    
    	return data, nil
    }
    
    func newSyncProducer() (sarama.SyncProducer, error) {
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
    	config.Producer.Retry.Max = 10                   // Retry up to 10 times to produce the message
    	config.Producer.Return.Successes = true          // Required when using syncproducer
    	config.Producer.Return.Errors = true
    	config.Version = sarama.V0_11_0_0
    
    	sarama.Logger = log.New(&buf, "sarama: ", log.Lshortfile)
    
    	return sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    }
    

    streamer.go

    package streamer
    
    import (
    	"github.com/Shopify/sarama"
    	"github.com/pkg/errors"
    )
    
    const (
    	MessageTypeHeaderKey = "message-type"
    	MessageIDHeaderKey = "message-id"
    )
    
    type (
    	// Metadata contains metadata for a given activity.
    	Metadata map[string][]string
    
    	// Streamer handles streaming activities to a topic.
    	Streamer struct {
    		producer sarama.SyncProducer
    	}
    )
    
    var (
    	// ErrNoSubjects denotes that no subjects were provided.
    	ErrNoSubjects = errors.New("At least one subject is required")
    )
    
    // New creates a new streamer.
    func New(producer sarama.SyncProducer) *Streamer {
    	return &Streamer{
    		producer: producer,
    	}
    }
    
    // SendActivity encapsulates the provided metadata and data in a message and send it to a topic.
    func (s *Streamer) SendActivity(topic string, messageID string, messageHeaderValue string, data []byte, metadata Metadata) error {
    	_, _, err := s.producer.SendMessage(&sarama.ProducerMessage{
    		Topic: topic,
    		Key:   sarama.StringEncoder(messageID),
    		Value: sarama.ByteEncoder(data),
    		Headers: []sarama.RecordHeader{
    			sarama.RecordHeader{
    				Key:   []byte(MessageIDHeaderKey),
    				Value: []byte(messageID),
    			},
    			sarama.RecordHeader{
    				Key:   []byte(MessageTypeHeaderKey),
    				Value: []byte(messageHeaderValue),
    			},
    		},
    	})
    	if err != nil {
    		return errors.Wrapf(err, "Error sending message to topic %s for ID %s", topic, messageID)
    	}
    
    	return nil
    }
    
    bug :-( producer 
    opened by kenschneider18 36
  • Plans to support upcoming Kafka v0.11 ?

    Plans to support upcoming Kafka v0.11 ?

    Any plans so far? 0.11 is about to be released fairly soon, adding support for message headers.

    Thanks.

    enhancement 
    opened by dvsekhvalnov 32
  • MultiConsumer

    MultiConsumer

    @wvanbergen @snormore @mkobetic (and anyone else who cares)

    The current consumer works just fine, but is limited to fetching just a single partition of a single topic at once. You can run multiple consumers, but this has several issues:

    • They can't use the same client, or else the broker connection has contention problems (brokers handle requests on a single connection serially, so if it has to wait for MaxWaitTime before responding, this introduces severe latency when multiplexing busy and non-busy topics). Using separate clients works, but generates a lot of extra connections and metadata traffic.
    • You don't get messages from multiple partitions batched into a single response, when that makes sense for efficiency.

    The ideal solution is to have a consumer capable of fetching from multiple topic/partitions at once. This has been at the back of my mind for a while now, so I already have a design ready; if the design makes sense to y'all, then somebody just needs to write it :)

    While the user currently specifies a single topic/partition to the constructor, they should now be able to specify a set of topic/partitions. Since some of the configuration that is currently a singleton actually needs to be per-partition (OffsetMethod and OffsetValue at least), this would probably be of type map[string]map[int32]*ConsumerPartitionConfig. I considered permitting the dynamic adding/removing of partitions to the set, but I don't see a strong use case and it complicated a bunch of things.

    Events are returned to the user exactly the same way they are now, over a single channel. I considered a separate channel per topic/partition but it complicated the base case, and the events already contain information on topic/partition so it's not hard for the user to dispatch appropriately if they really want to.

    The constructor starts up one "controller" goroutine, which starts up and manages one goroutine per broker-that-has-a-partition-we-care-about and is responsible for (initially) dispatching each topic/partition to the appropriate broker's goroutine. The broker goroutine looks a lot like the current fetchMessages method with a few tweaks:

    • Some minor work needed to handle multiple blocks in the requests and responses.
    • When a topic/partition is reassigned to a new broker, that topic/partition gets returned to the controller via a channel; the goroutine tracks how many it is "responsible" for and exits if that reaches 0.
    • Similarly when a broker goes down hard, all topic/partitions are returned to the controller for re-dispatching and the goroutine exits.

    I expect the success case to be fairly straightforward - as always, the complexity will come when reasoning through the failure cases and ensuring that topic/partitions are redispatched correctly, messages are not accidentally skipped in that case, etc. etc.

    When the consumer is closed, it signals the controller which cleans up its children before exiting.

    Thoughts?

    enhancement consumer 
    opened by eapache 31
  • Better Broker Connection Management

    Better Broker Connection Management

    Following up to #7, #9, #10, #13 our connection management still has problems.

    First some things to know about Kafka that complicate the issue:

    • Broker metadata requests do not seem required to return all the brokers they know about (it is not documented which ones they do return, but I suspect it is the only the subset of brokers that are leading a partition whose data is also in the response).
    • The user only specifies (host, port) pairs while Kafka metadata returns (host, port, brokerId) triples, with no guarantee that the host matches the user-specified hostname, and with no apparent way to query a broker's own brokerId.

    Now some of the issues with our current implementation:

    • If the user provides three addresses, only the last of which is valid, we wait for the first two connections to completely fail when fetching metadata before we try the third. Since the TCP connection timeout can be very long (many minutes) this can take a long time. Trying them all in parallel and using the first valid one would be better.
    • Once we've had an address fail, we discard it forever. Over the lifetime of a long-running cluster, all of the nodes may be down at one point or another, meaning that eventually we may 'run out' of nodes and abort.

    Thoughts on potential future design:

    • We should connect to brokers lazily, ie only when we actually want to talk to them, not just when we know they exist.
    • Since there is only ever one leader for a partition, if the connection fails it doesn't make sense to just try another broker. Therefore the caller should decide to proceed, which means that the client's leader function shouldn't guarantee that the broker it returns is connected, only connecting.
    • However, when fetching metadata, any broker will do, so the any function should guarantee that the broker it returns is actually connected. Walking through bad brokers to find a good one could be slow, but connecting to all the brokers just to pick one and drop the others is gross. We could try to connect to one, and if it hasn't succeed in 5 seconds, start another, etc. That should cover the common case, since if it hasn't responded after 5 seconds it probably won't, and if it does then we'll drop it right away, oh well. More importantly, we won't spawn hundreds of connections if the first broker does respond normally.
    • User-provided addresses should be managed separately (which they already are) since they don't have brokerIds. They're only useful for metadata requests, so they should be disconnected entirely 99% of the time. If one of them fails, we should probably just mark it as recently-failed, and retry it after some time has passed. Only if all user-supplied nodes have recently failed (and we have no metadata from the cluster itself) should we abort somehow.

    Open Questions:

    • If a user-supplied broker is used to satisfy a call to any, when should it be disconnected? The extraBroker field in client currently is a hack to work around this, and it still leaves one connection floating around for now reason. Presumably disconnectBroker could be used and made smarter? However, if a metadata-supplied broker is used, calling disconnectBroker will pull it out from underneath other code that could be using it as a leader. Do we need reference-counting?
    • There is currently no way to tell that a Broker is 'Connecting' you can only tell that the lock is currently held. This means that if two concurrent leader calls would return the same broker and both call Open, the second one will block when we don't want it to. Do we need a second lock in the brokers? That gets really messy...
    enhancement client 
    opened by eapache 29
  • Memory leak sometimes when restarting kafka partitions

    Memory leak sometimes when restarting kafka partitions

    I know this isn't a lot to go on, but hope you can give me pointers to debug this.

    Restarting kafka brokers occasionally produces ever increasing memory usage until the go process dies OOM. This isn't consistent, but especially happens we have to restart multiple kafka nodes at once. When I run pprof/heap I see the memory usage by kafka messages but cannot tell where inside the kafka code the memory is being held onto :( It only happens occasionally and only on our production tier. I also don't see increased goroutine numbers. We're running with 256 brokers and about 12 kafka hosts, using the kafka revision just after you fixed the WaitGroup bug. The log has messages like

    Failed to connect to XYZ: getsocopt: connection refused client/brokers deregistering XYZ client/brokers deregistered broker #-1 at kafkaXYZ:9092 ...

    Even when kafka is stable, and the messages go away, memory usage still increases forever until we OOM. A restart and things work great.

    Here is the config we use

        conf.Net.DialTimeout = time.Second * 30
    
        conf.ChannelBufferSize = 256
    
        conf.Producer.RequiredAcks = sarama.WaitForLocal
        conf.Producer.Return.Errors = true
        conf.Producer.Return.Successes = true
        conf.Producer.Partitioner = partitioner
        conf.Producer.Flush.Frequency = time.Millisecond * 250
        conf.Producer.Flush.Messages = 3000
        conf.Producer.Flush.Bytes = 83886080
    

    The biggest issues for us are that even when kafka recovers memory usage still increases forever in this strait line upward.

    When kafka failures happen, is it possible that multiple buffers could be created internally that end up buffering points?

    Note we also don't see increases in messages on the error channel.

    producer 
    opened by cep21 26
  • Error building docker with Shopify Sarama

    Error building docker with Shopify Sarama

    I'm building docker image with Shopify Sarama library. When install Sarama on golang:1.13 base image it giving me following error. I have tried with different based image versions(1.17, 1.16, 1.9 etc), but same error. Previously this image building was working fine. This error occurred when I have updated the docker version of the Mac. Currently I'm using Docker v20.10.8

     => ERROR [4/9] RUN go get github.com/Shopify/sarama                                                                                                                       134.1s
    ------
     > [4/9] RUN go get github.com/Shopify/sarama:
    #7 133.3 # github.com/Shopify/sarama
    #7 133.3 src/github.com/Shopify/sarama/config.go:678:37: undefined: io.Discard
    #7 133.3 src/github.com/Shopify/sarama/decompress.go:43:10: undefined: io.ReadAll
    #7 133.3 src/github.com/Shopify/sarama/decompress.go:55:10: undefined: io.ReadAll
    #7 133.3 src/github.com/Shopify/sarama/sarama.go:89:29: undefined: io.Discard
    

    Following is the Dockerfile

    FROM golang:1.13
    
    MAINTAINER Eranga Bandara ([email protected])
    
    # install dependencies
    RUN go get gopkg.in/mgo.v2
    RUN	go get github.com/gorilla/mux
    RUN go get github.com/Shopify/sarama
    RUN go get github.com/wvanbergen/kafka/consumergroup
    RUN go get github.com/gorilla/handlers
    
    # copy app
    ADD . /app
    WORKDIR /app
    
    # build
    RUN go build -o build/gateway src/*.go
    
    # server running port
    EXPOSE 8751
    
    # .keys volume
    VOLUME ["/app/.keys"]
    
    ENTRYPOINT ["/app/docker-entrypoint.sh"]
    
    opened by erangaeb 0
  • fix:bugFix add retry for offset commit

    fix:bugFix add retry for offset commit

    Fixes #1562 #1758 The reason for the timeout when submitting the offset is likely to be the rebalance caused by the server restart or expansion and contraction, and a jittery retry strategy needs to be added.

    opened by HobbyBear 0
  • feat(test): add a simple fuzzing example

    feat(test): add a simple fuzzing example

    Explore what's possible with the fuzzer.

    opened by dnwe 0
  • Use acks=all and idempotence=true by default, as per Kafka 3.0.0 release

    Use acks=all and idempotence=true by default, as per Kafka 3.0.0 release

    Problem Description

    Today's Kafka 3.0.0 release enables acks=all and idempotence=true in the Java client by default, see also:

    • https://blogs.apache.org/kafka/entry/what-s-new-in-apache6
    • https://downloads.apache.org/kafka/3.0.0/RELEASE_NOTES.html

    Shall Sarama also follow suit, since the user expectation is that this type of consumer/producer consistency is enabled by default?

    opened by gdm85 1
  • alibabacloud recommends against using Sarama?

    alibabacloud recommends against using Sarama?

    I saw from alibabacloud that Sarama library is not recommended by them. Do we have a repair plan for these three problems?

    source: https://www.alibabacloud.com/help/faq-detail/266782.htm

    Quote:

    The following known problems exist with the Go client that is developed with the Sarama library:

    1. When a partition is added to a topic, the Go client developed with the Sarama library cannot detect the new partition or consume messages from the partition. Only after the client is restarted, it can consume messages from the new partition.

    2. When the client subscribes to more than two topics at the same time, the client may fail to consume messages from some partitions.

    3. If the policy for resetting consumer offsets of the Go client developed with the Sarama library is set to Oldest(earliest), the client may start to consume messages from the earliest offset when the client breaks down or the broker version is upgraded. This is because the out_of_range class is implemented in the client.

    opened by KeenCN 1
  • Goroutine leaks in zstd initialization

    Goroutine leaks in zstd initialization

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

    | Sarama | Kafka | Go | |--------|-------|----| | 1.29.1 | n/a | 1.17.1 |

    Configuration

    What configuration values are you using for Sarama and Kafka?

    n/a

    Logs

    When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

    logs: CLICK ME

     Goroutine 29 in state chan receive, with github.com/klauspost/compress/zstd.(*blockDec).startDecoder on top of the stack:
    goroutine 29 [chan receive]:
    github.com/klauspost/compress/zstd.(*blockDec).startDecoder(0xc00020fa00)
            /Users/pkw/go/pkg/mod/github.com/klauspost/[email protected]/zstd/blockdec.go:212 +0xec
    created by github.com/klauspost/compress/zstd.newBlockDec
            /Users/pkw/go/pkg/mod/github.com/klauspost/[email protected]/zstd/blockdec.go:118 +0x1b1
    
     Goroutine 30 in state chan receive, with github.com/klauspost/compress/zstd.(*blockDec).startDecoder on top of the stack:
    goroutine 30 [chan receive]:
    github.com/klauspost/compress/zstd.(*blockDec).startDecoder(0xc00020fad0)
            /Users/pkw/go/pkg/mod/github.com/klauspost/[email protected]/zstd/blockdec.go:212 +0xec
    created by github.com/klauspost/compress/zstd.newBlockDec
            /Users/pkw/go/pkg/mod/github.com/klauspost/[email protected]/zstd/blockdec.go:118 +0x1b1
    
    

    Problem Description

    The fix for https://github.com/Shopify/sarama/pull/1869 changed from initializing the zstd variables on demand (with sync.Once) to in init. Behind the scenes, this causes goroutines to be started (and calling .Close() on Sarama resources doesn't propagate through to calling .Close() on the zstd instances).

    This causes tests that verify no leaked goroutines (i.e. using https://github.com/uber-go/goleak) to start to fail if they use Sarama.

    opened by pkwarren 3
  • Making bytes for hash configurable in the hashPartitioner

    Making bytes for hash configurable in the hashPartitioner

    Making bytes required for hashing configurable, we have a use case where we want to partition data based on the prefix of the key in a pattern prefix::user_id and we want all data to be partitioned based on the prefix. This is to be used with log compacted topics where we want the last state of for prefix::user_id to be always kept in the topic and the downstream consumers want all data shared by the defined prefix.

    cla-needed 
    opened by abhinavsolan 0
  • GSSAPI Kerberos Authentication: wrong Token ID

    GSSAPI Kerberos Authentication: wrong Token ID

    Versions

    | Sarama | Kafka | Go | |--------|-------|----| | 1.29.1 | Cloudera Kafka | 1.17.1 |

    Configuration

    Sarama setup

    config := sarama.NewConfig()
    config.Producer.RequiredAcks.    = sarama.WaitForAll
    config.Producer.Retry.Max        = 10
    config.Producer.Return.Successes = true
    
    config.Net.SASL.Enable                    = true
    config.Net.SASL.Mechanism                 = sarama.SASLTypeGSSAPI
    config.Net.SASL.GSSAPI.ServiceName        = "kafka"
    config.Net.SASL.GSSAPI.AuthType           = sarama.KRB5_KEYTAB_AUTH
    config.Net.SASL.GSSAPI.KeyTabPath         = "./username.keytab"
    config.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
    config.Net.SASL.GSSAPI.Username           = "<USER>"
    config.Net.SASL.GSSAPI.Realm              = "<REALM>"
    config.Net.SASL.GSSAPI.DisablePAFXFAST    = true
    

    Kafka setup is unknown

    Logs
    logs: CLICK ME

    [sarama] 2021/09/14 14:17:50 Initializing new client
    [sarama] 2021/09/14 14:17:50 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    [sarama] 2021/09/14 14:17:50 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
    [sarama] 2021/09/14 14:17:50 client/metadata fetching metadata for all topics from broker <BROKER>:9092
    [sarama] 2021/09/14 14:17:51 Error while performing GSSAPI Kerberos Authentication: wrong Token ID. Expected 0504, was 6030
    [sarama] 2021/09/14 14:17:51 Closed connection to broker <BROKER>:9092
    

    Problem Description

    For some reasons it looks like sarama has difficulties connecting to our team's Kafka using keytab

    I've seen similar issue raised #1400 but it was closed with suggested solution that didn't work for me - unfortunately

    I've used different options for krb5.conf and keytab file 1.

    permitted_enctypes   = rc4-hmac
    default_tgs_enctypes = rc4-hmac
    default_tkt_enctypes = rc4-hmac
    default_etypes       = arcfour-hmac-md5
    default_etypes_des   = des-cbc-crc
    

    with keytab encrypted with RC4-HMAC gives an error I outlined above 2.

    # These options were suggested in #1366 (which was mentioned in #1400 )
    default_tgs_enctypes = aes256-cts-hmac-sha1-96
    default_tkt_enctypes = aes256-cts-hmac-sha1-96
    permitted_enctypes   = aes256-cts-hmac-sha1-96
    

    with keytab encrypted with RC4-HMAC gives an error

    [sarama] Kerberos client error:
    [Root cause: Encrypting_Error] KRBMessage_Handling_Error: AS Exchange Error: failed setting AS_REQ PAData for pre-authentication required < Encrypting_Error: error getting key from credentials: matching key not found in keytab. Looking for [<USER>] realm: <REALM> kvno: 0 etype: 18 
    

    which I believe means that there is a mismatch in encryption, so I've tried keytab encrypted with AES256-CTS-HMAC-SHA1-96 and it gives an error

    [sarama] client/metadata got error from broker -1 while fetching metadata:
    [Root cause: KDC_Error] KDC_Error: TGS Exchange Error: kerberos error response from KDC when requesting for kafka/<BROKER>: KRB Error: (14) KDC_ERR_ETYPE_NOSUPP KDC has no support for encryption type
    

    which I believe means that our Kerberos system is not set to accept AES256-CTS-HMAC-SHA1-96 at TGS step

    So I tried to change default_tkt_enctypes = aes256-cts-hmac-sha1-96 to default_tkt_enctypes = rc4-hmac which returned an initial error (about wrong Token ID)

    Any suggestions on how to get it working?

    opened by MikhailMS 0
  • close graceful

    close graceful

    If the consumer is executing the consumption logic, turn off the system at this time, how to use sarama to make the executing logic complete before restarting the system

    opened by HobbyBear 1
  • consumer/broker/0 abandoned subscription to xxx because consuming was taking too long

    consumer/broker/0 abandoned subscription to xxx because consuming was taking too long

    Versions

    Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

    | Sarama | Kafka | Go | |--------|-------|----| |v1.27.2 | 2.12 | 1.16|

    Configuration

    What configuration values are you using for Sarama and Kafka?

    Logs

    When filing an issue please provide logs from Sarama and Kafka if at all possible. You can set sarama.Logger to a log.Logger to capture Sarama debug output.

    logs: CLICK ME

    [sarama] 2021/09/13 10:28:10 consumer/broker/0 added subscription to dns_saas/0
    [sarama] 2021/09/13 10:28:10 consumer/broker/0 added subscription to dns_saas/3
    [sarama] 2021/09/13 10:28:10 consumer/broker/0 added subscription to dns_saas/2
    [sarama] 2021/09/13 10:28:11 consumer/broker/0 abandoned subscription to dns_saas/2 because consuming was taking too long
    [sarama] 2021/09/13 10:28:11 consumer/broker/0 abandoned subscription to dns_saas/1 because consuming was taking too long
    [sarama] 2021/09/13 10:28:11 consumer/broker/0 abandoned subscription to dns_saas/0 because consuming was taking too long
    [sarama] 2021/09/13 10:28:11 consumer/broker/0 abandoned subscription to dns_saas/3 because consuming was taking too long
    

    Problem Description

    I saw the counsumer is very slow.

    opened by IIvyPy 0
Releases(v1.29.1)
  • v1.29.1(Jun 24, 2021)

    New Features / Improvements

    • #1966 - @ajanikow - KIP-339: Add Incremental Config updates API
    • #1964 - @ajanikow - Add DelegationToken ResourceType

    Fixes

    • #1962 - @hanxiaolin - fix(consumer): call interceptors when MaxProcessingTime expire
    • #1971 - @KerryJava - fix kafka-producer-performance throughput panic
    • #1968 - @dnwe - chore: bump golang.org/x versions
    • #1956 - @joewreschnig - Allow checking the entire ProducerMessage in the mock producers
    • #1963 - @dnwe - fix: ensure backoff timer is re-used
    • #1949 - @dnwe - fix: explicitly use uint64 for payload length
    Source code(tar.gz)
    Source code(zip)
  • v1.29.0(May 7, 2021)

    New Features / Improvements

    • #1917 - @arkady-emelyanov - KIP-554: Add Broker-side SCRAM Config API
    • #1869 - @wyndhblb - zstd: encode+decode performance improvements
    • #1541 - @izolight - add String, (Un)MarshalText for acl types.
    • #1921 - @bai - Add support for Kafka 2.8.0

    Fixes

    • #1936 - @dnwe - fix(consumer): follow preferred broker
    • #1933 - @ozzieba - Use gofork for encoding/asn1 to fix ASN errors during Kerberos authentication
    • #1929 - @celrenheit - Handle isolation level in Offset(Request|Response) and require stable offset in FetchOffset(Request|Response)
    • #1926 - @dnwe - fix: correct initial CodeQL findings
    • #1925 - @bai - Test out CodeQL
    • #1923 - @bestgopher - Remove redundant switch-case, fix doc typos
    • #1922 - @bai - Update go dependencies
    • #1898 - @mmaslankaprv - Parsing only known control batches value
    • #1887 - @withshubh - Fix: issues affecting code quality
    Source code(tar.gz)
    Source code(zip)
  • v1.28.0(Feb 15, 2021)

    Note that with this release we change RoundRobinBalancer strategy to match Java client behavior. See #1788 for details.

    • #1870 - @kvch - Update Kerberos library to latest major
    • #1876 - @bai - Update docs, reference pkg.go.dev
    • #1846 - @wclaeys - Do not ignore Consumer.Offsets.AutoCommit.Enable config on Close
    • #1747 - @XSAM - fix: mock sync producer does not handle the offset while sending messages
    • #1863 - @bai - Add support for Kafka 2.7.0 + update lz4 and klauspost/compress dependencies
    • #1788 - @kzinglzy - feat[balance_strategy]: announcing a new round robin balance strategy
    • #1862 - @bai - Fix CI setenv permissions issues
    • #1832 - @ilyakaznacheev - Update Godoc link to pkg.go.dev
    • #1822 - @danp - KIP-392: Allow consumers to fetch from closest replica
    Source code(tar.gz)
    Source code(zip)
  • v1.27.2(Oct 21, 2020)

    Improvements

    #1750 - @krantideep95 Adds missing mock responses for mocking consumer group

    Fixes

    #1817 - reverts #1785 - Add private method to Client interface to prevent implementation

    Source code(tar.gz)
    Source code(zip)
  • v1.27.1(Oct 7, 2020)

    Improvements

    #1775 - @d1egoaz - Adds a Producer Interceptor example #1781 - @justin-chen - Refresh brokers given list of seed brokers #1784 - @justin-chen - Add randomize seed broker method #1790 - @d1egoaz - remove example binary #1798 - @bai - Test against Go 1.15 #1785 - @justin-chen - Add private method to Client interface to prevent implementation #1802 - @uvw - Support Go 1.13 error unwrapping

    Fixes

    #1791 - @stanislavkozlovski - bump default version to 1.0.0

    Source code(tar.gz)
    Source code(zip)
  • v1.27.0(Aug 11, 2020)

    Improvements

    #1466 - @rubenvp8510 - Expose kerberos fast negotiation configuration #1695 - @KJTsanaktsidis - Use docker-compose to run the functional tests #1699 - @wclaeys - Consumer group support for manually comitting offsets #1714 - @bai - Bump Go to version 1.14.3, golangci-lint to 1.27.0 #1726 - @d1egoaz - Include zstd on the functional tests #1730 - @d1egoaz - KIP-42 Add producer and consumer interceptors #1738 - @varun06 - fixed variable names that are named same as some std lib package names #1741 - @varun06 - updated zstd dependency to latest v1.10.10 #1743 - @varun06 - Fixed declaration dependencies and other lint issues in code base #1763 - @alrs - remove deprecated tls options from test #1769 - @bai - Add support for Kafka 2.6.0

    Fixes

    #1697 - @kvch - Use gofork for encoding/asn1 to fix ASN errors during Kerberos authentication #1744 - @alrs - Fix isBalanced Function Signature

    Source code(tar.gz)
    Source code(zip)
  • v1.26.4(May 19, 2020)

  • v1.26.3(May 7, 2020)

  • v1.26.2(May 6, 2020)

    ⚠️ Known Issues

    This release has been marked as not ready for production and may be unstable, please use v1.26.4.

    Improvements

    • #1560 - @iyacontrol - add sync pool for gzip 1-9
    • #1605 - @dnwe - feat: protocol support for V11 fetch w/ rackID
    • #1617 - @sladkoff / @dwi-di / @random-dwi - Add support for alter/list partition reassignements APIs
    • #1632 - @bai - Add support for Go 1.14
    • #1640 - @random-dwi - Feature/fix list partition reassignments
    • #1646 - @mimaison - Add DescribeLogDirs to admin client
    • #1667 - @bai - Add support for kafka 2.5.0

    Fixes

    • #1594 - @sladkoff - Sets ConfigEntry.Default flag in addition to the ConfigEntry.Source for Kafka versions > V1_1_0_0
    • #1601 - @alrs - fix: remove use of testing.T.FailNow() inside goroutine
    • #1602 - @d1egoaz - adds a note about consumer groups Consume method
    • #1607 - @darklore - Fix memory leak when Broker.Open and Broker.Close called repeatedly
    • #1613 - @wblakecaldwell - Updated "retrying" log message when BackoffFunc implemented
    • #1614 - @alrs - produce_response.go: Remove Unused Functions
    • #1619 - @alrs - tools/kafka-producer-performance: prune unused flag variables
    • #1639 - @agriffaut - Handle errors with no message but error code
    • #1643 - @kzinglzy - fix config.net.keepalive
    • #1644 - @KJTsanaktsidis - Fix brokers continually allocating new Session IDs
    • #1645 - @Stephan14 - Remove broker(s) which no longer exist in metadata
    • #1650 - @lavoiesl - Return the response error in heartbeatLoop
    • #1661 - @KJTsanaktsidis - Fix "broker received out of order sequence" when brokers die
    • #1666 - @KevinJCross - Bugfix: Allow TLS connections to work over socks proxy.
    Source code(tar.gz)
    Source code(zip)
  • v1.26.1(Feb 4, 2020)

    ⚠️ This release has been superseded by v1.26.4 and should not be used.

    Fetch requests will cause the Kafka broker to continously allocate new Fetch sessions in its cache on every request. Fixed in v1.26.2 via https://github.com/Shopify/sarama/pull/1644


    Improvements

    • Add requests-in-flight metric (1539)
    • Fix misleading example for cluster admin (1595)
    • Replace Travis with GitHub Actions, linters housekeeping (1573)
    • Allow BalanceStrategy to provide custom assignment data (1592)

    Bug Fixes

    • Adds back Consumer.Offsets.CommitInterval to fix API (1590)
    • Fix error message s/CommitInterval/AutoCommit.Interval (1589)
    Source code(tar.gz)
    Source code(zip)
  • v1.26.0(Jan 24, 2020)

    ⚠️ Known Issues

    This release has been superceded by v1.26.4 and should not be used.

    Fetch requests will cause the Kafka broker to continously allocate new Fetch sessions in its cache on every request. Fixed in v1.26.2 via https://github.com/Shopify/sarama/pull/1644


    New Features:

    • Enable zstd compression (1574,1582)
    • Support headers in tools kafka-console-producer (1549)

    Improvements:

    • Add SASL AuthIdentity to SASL frames (authzid) (1585).

    Bug Fixes:

    • Sending messages with ZStd compression enabled fails in multiple ways (1252).
    • Use the broker for any admin on BrokerConfig (1571).
    • Set DescribeConfigRequest Version field (1576).
    • ConsumerGroup flooding logs with client/metadata update req (1578).
    • MetadataRequest version in DescribeCluster (1580).
    • Fix deadlock in consumer group handleError (1581)
    • Fill in the Fetch{Request,Response} protocol (1582).
    • Retry topic request on ControllerNotAvailable (1586).
    Source code(tar.gz)
    Source code(zip)
  • v1.25.0(Jan 13, 2020)

    ⚠️ This release still contained known issues introduced in v1.24.1 and should not be used

    Recommended to upgrade to v1.26.4 or rollback to v1.24.0

    Known Issues

    • ConsumerGroup flooding logs with client/metadata update req (1544) introduced in v1.24.1
    • Unexpected user-specified time limit error (1562) introduced in v1.24.1

    New Features:

    • Support TLS protocol in kafka-producer-performance (1538).
    • Add support for kafka 2.4.0 (1552).

    Improvements:

    • Allow the Consumer to disable auto-commit offsets (1164).
    • Produce records with consistent timestamps (1455).

    Bug Fixes:

    • Fix incorrect SetTopicMetadata name mentions (1534).
    • Fix client.tryRefreshMetadata Println (1535).
    • Fix panic on calling updateMetadata on closed client (1531).
    • Fix possible faulty metrics in TestFuncProducing (1545).
    Source code(tar.gz)
    Source code(zip)
  • v1.24.1(Oct 31, 2019)

    ⚠️ This release introduced two new major regressions over v1.24.0 and should not be used

    Known Issues

    • ConsumerGroup flooding logs with client/metadata update req (1544)
    • Unexpected user-specified time limit error (1562)

    New Features:

    • Add DescribeLogDirs Request/Response pair (1520).

    Bug Fixes:

    • Fix ClusterAdmin returning invalid controller ID on DescribeCluster (1518).
    • Fix issue with consumergroup not rebalancing when new partition is added (1525).
    • Ensure consistent use of read/write deadlines (1529).
    Source code(tar.gz)
    Source code(zip)
  • v1.24.0(Oct 9, 2019)

    New Features:

    • Add sticky partition assignor (1416).
    • Switch from cgo zstd package to pure Go implementation (1477).

    Improvements:

    • Allow creating ClusterAdmin from client (1415).
    • Set KafkaVersion in ListAcls method (1452).
    • Set request version in CreateACL ClusterAdmin method (1458).
    • Set request version in DeleteACL ClusterAdmin method (1461).
    • Handle missed error codes on TopicMetaDataRequest and GroupCoordinatorRequest (1464).
    • Remove direct usage of gofork (1465).
    • Add support for Go 1.13 (1478).
    • Improve behavior of NewMockListAclsResponse (1481).

    Bug Fixes:

    • Fix race condition in consumergroup example (1434).
    • Fix brokerProducer goroutine leak (1442).
    • Use released version of lz4 library (1469).
    • Set correct version in MockDeleteTopicsResponse (1484).
    • Fix CLI help message typo (1494).

    Known Issues:

    • Please don't use Zstd, as it doesn't work right now. See https://github.com/Shopify/sarama/issues/1252.
    Source code(tar.gz)
    Source code(zip)
  • v1.23.1(Jul 22, 2019)

  • v1.23.0(Jul 2, 2019)

    New Features:

    • Add support for Kafka 2.3.0 (1418).
    • Add support for ListConsumerGroupOffsets v2 (1374).
    • Add support for DeleteConsumerGroup (1417).
    • Add support for SASLVersion configuration (1410).
    • Add kerberos support (1366).

    Improvements:

    • Improve sasl_scram_client example (1406).
    • Fix shutdown and race-condition in consumer-group example (1404).
    • Add support for error codes 77—81 (1397).
    • Pool internal objects allocated per message (1385).
    • Reduce packet decoder allocations (1373).
    • Support timeout when fetching metadata (1359).

    Bug Fixes:

    • Fix fetch size integer overflow (1376).
    • Handle and log throttled FetchResponses (1383).
    • Refactor misspelled word Resouce to Resource (1368).
    Source code(tar.gz)
    Source code(zip)
  • v1.22.1(Apr 29, 2019)

    Improvements:

    • Use zstd 1.3.8 (1350).
    • Add support for SaslHandshakeRequest v1 (1354).

    Bug Fixes:

    • Fix V5 MetadataRequest nullable topics array (1353).
    • Use a different SCRAM client for each broker connection (1349).
    • Fix AllowAutoTopicCreation for MetadataRequest greater than v3 (1344).
    Source code(tar.gz)
    Source code(zip)
  • v1.22.0(Apr 9, 2019)

    New Features:

    • Add Offline Replicas Operation to Client (1318).
    • Allow using proxy when connecting to broker (1326).
    • Implement ReadCommitted (1307).
    • Add support for Kafka 2.2.0 (1331).
    • Add SASL SCRAM-SHA-512 and SCRAM-SHA-256 mechanismes (1331).

    Improvements:

    • Unregister all broker metrics on broker stop (1232).
    • Add SCRAM authentication example (1303).
    • Add consumergroup examples (1304).
    • Expose consumer batch size metric (1296).
    • Add TLS options to console producer and consumer (1300).
    • Reduce client close bookkeeping (1297).
    • Satisfy error interface in create responses (1154).
    • Please lint gods (1346).

    Bug Fixes:

    • Fix multi consumer group instance crash (1338).
    • Update lz4 to latest version (1347).
    • Retry ErrNotCoordinatorForConsumer in new consumergroup session (1231).
    • Fix cleanup error handler (1332).
    • Fix rate condition in PartitionConsumer (1156).
    Source code(tar.gz)
    Source code(zip)
  • v1.21.0(Feb 24, 2019)

    New Features:

    • Add CreateAclRequest, DescribeAclRequest, DeleteAclRequest (#1236).
    • Add DescribeTopic, DescribeConsumerGroup, ListConsumerGroups, ListConsumerGroupOffsets admin requests (#1178).
    • Implement SASL/OAUTHBEARER (#1240).

    Improvements:

    • Add Go mod support (#1282).
    • Add error codes 73—76 (#1239).
    • Add retry backoff function (#1160).
    • Maintain metadata in the producer even when retries are disabled (#1189).
    • Include ReplicaAssignment in ListTopics (#1274).
    • Add producer performance tool (#1222).
    • Add support LogAppend timestamps (#1258).

    Bug Fixes:

    • Fix potential deadlock when a heartbeat request fails (#1286).
    • Fix consuming compacted topic (#1227).
    • Set correct Kafka version for DescribeConfigsRequest v1 (#1277).
    • Update kafka test version (#1273).
    Source code(tar.gz)
    Source code(zip)
  • v1.20.1(Jan 10, 2019)

    New Features:

    • Add optional replica id in offset request (1100).

    Improvements:

    • Implement DescribeConfigs Request + Response v1 & v2 (1230).
    • Reuse compression objects (1185).
    • Switch from png to svg for GoDoc link in README (1243).
    • Fix typo in deprecation notice for FetchResponseBlock.Records (1242).
    • Fix typos in consumer metadata response file (1244).

    Bug Fixes:

    • Revert to individual msg retries for non-idempotent (1203).
    • Respect MaxMessageBytes limit for uncompressed messages (1141).
    Source code(tar.gz)
    Source code(zip)
  • v1.20.0(Dec 10, 2018)

    New Features:

    • Add support for zstd compression (#1170).
    • Add support for Idempotent Producer (#1152).
    • Add support support for Kafka 2.1.0 (#1229).
    • Add support support for OffsetCommit request/response pairs versions v1 to v5 (#1201).
    • Add support support for OffsetFetch request/response pair up to version v5 (#1198).

    Improvements:

    • Export broker's Rack setting (#1173).
    • Always use latest patch version of Go on CI (#1202).
    • Add error codes 61 to 72 (#1195).

    Bug Fixes:

    • Fix build without cgo (#1182).
    • Fix go vet suggestion in consumer group file (#1209).
    • Fix typos in code and comments (#1228).
    Source code(tar.gz)
    Source code(zip)
  • v1.19.0(Sep 27, 2018)

    New Features:

    • Implement a higher-level consumer group (#1099).

    Improvements:

    • Add support for Go 1.11 (#1176).

    Bug Fixes:

    • Fix encoding of MetadataResponse with version 2 and higher (#1174).
    • Fix race condition in mock async producer (#1174).
    Source code(tar.gz)
    Source code(zip)
  • v1.18.0(Sep 7, 2018)

    New Features:

    • Make Partitioner.RequiresConsistency vary per-message (#1112).
    • Add customizable partitioner (#1118).
    • Add ClusterAdmin support for CreateTopic, DeleteTopic, CreatePartitions, DeleteRecords, DescribeConfig, AlterConfig, CreateACL, ListAcls, DeleteACL (#1055).

    Improvements:

    • Add support for Kafka 2.0.0 (#1149).
    • Allow setting LocalAddr when dialing an address to support multi-homed hosts (#1123).
    • Simpler offset management (#1127).

    Bug Fixes:

    • Fix mutation of ProducerMessage.MetaData when producing to Kafka (#1110).
    • Fix consumer block when response did not contain all the expected topic/partition blocks (#1086).
    • Fix consumer block when response contains only constrol messages (#1115).
    • Add timeout config for ClusterAdmin requests (#1142).
    • Add version check when producing message with headers (#1117).
    • Fix MetadataRequest for empty list of topics (#1132).
    • Fix producer topic metadata on-demand fetch when topic error happens in metadata response (#1125).
    Source code(tar.gz)
    Source code(zip)
  • v1.17.0(May 30, 2018)

    New Features:

    • Add support for gzip compression levels (#1044).
    • Add support for Metadata request/response pairs versions v1 to v5 (#1047, #1069).
    • Add versioning to JoinGroup request/response pairs (#1098).
    • Add support for CreatePartitions, DeleteGroups, DeleteRecords request/response pairs (#1065, #1096, #1027).
    • Add Controller() method to Client interface (#1063).

    Improvements:

    • ConsumerMetadataReq/Resp has been migrated to FindCoordinatorReq/Resp (#1010).
    • Expose missing protocol parts: msgSet and recordBatch (#1049).
    • Add support for v1 DeleteTopics Request (#1052).
    • Add support for Go 1.10 (#1064).
    • Claim support for Kafka 1.1.0 (#1073).

    Bug Fixes:

    • Fix FindCoordinatorResponse.encode to allow nil Coordinator (#1050, #1051).
    • Clear all metadata when we have the latest topic info (#1033).
    • Make PartitionConsumer.Close idempotent (#1092).
    Source code(tar.gz)
    Source code(zip)
  • v1.16.0(Feb 12, 2018)

    New Features:

    • Add support for the Create/Delete Topics request/response pairs (#1007, #1008).
    • Add support for the Describe/Create/Delete ACL request/response pairs (#1009).
    • Add support for the five transaction-related request/response pairs (#1016).

    Improvements:

    • Permit setting version on mock producer responses (#999).
    • Add NewMockBrokerListener helper for testing TLS connections (#1019).
    • Changed the default value for Consumer.Fetch.Default from 32KiB to 1MiB which results in much higher throughput in most cases (#1024).
    • Reuse the time.Ticker across fetch requests in the PartitionConsumer to reduce CPU and memory usage when processing many partitions (#1028).
    • Assign relative offsets to messages in the producer to save the brokers a recompression pass (#1002, #1015).

    Bug Fixes:

    • Fix producing uncompressed batches with the new protocol format (#1032).
    • Fix consuming compacted topics with the new protocol format (#1005).
    • Fix consuming topics with a mix of protocol formats (#1021).
    • Fix consuming when the broker includes multiple batches in a single response (#1022).
    • Fix detection of PartialTrailingMessage when the partial message was truncated before the magic value indicating its version (#1030).
    • Fix expectation-checking in the mock of SyncProducer.SendMessages (#1035).
    Source code(tar.gz)
    Source code(zip)
  • v1.15.0(Dec 8, 2017)

    New Features:

    • Claim official support for Kafka 1.0, though it did already work (#984).
    • Helper methods for Kafka version numbers to/from strings (#989).
    • Implement CreatePartitions request/response (#985).

    Improvements:

    • Add error codes 45-60 (#986).

    Bug Fixes:

    • Fix slow consuming for certain Kafka 0.11/1.0 configurations (#982).
    • Correctly determine when a FetchResponse contains the new message format (#990).
    • Fix producing with multiple headers (#996).
    • Fix handling of truncated record batches (#998).
    • Fix leaking metrics when closing brokers (#991).
    Source code(tar.gz)
    Source code(zip)
  • v1.14.0(Nov 13, 2017)

    New Features:

    • Add support for the new Kafka 0.11 record-batch format, including the wire protocol and the necessary behavioural changes in the producer and consumer. Transactions and idempotency are not yet supported, but producing and consuming should work with all the existing bells and whistles (batching, compression, etc) as well as the new custom headers. Thanks to Vlad Hanciuta of Arista Networks for this work. Part of (#901).

    Bug Fixes:

    • Fix encoding of ProduceResponse versions in test (#970).
    • Return partial replicas list when we have it (#975).
    Source code(tar.gz)
    Source code(zip)
  • v1.13.0(Oct 4, 2017)

    New Features:

    • Support for FetchRequest version 3 (#905).
    • Permit setting version on mock FetchResponses (#939).
    • Add a configuration option to support storing only minimal metadata for extremely large clusters (#937).
    • Add PartitionOffsetManager.ResetOffset for backtracking tracked offsets (#932).

    Improvements:

    • Provide the block-level timestamp when consuming compressed messages (#885).
    • Client.Replicas and Client.InSyncReplicas now respect the order returned by the broker, which can be meaningful (#930).
    • Use a Ticker to reduce consumer timer overhead at the cost of higher variance in the actual timeout (#933).

    Bug Fixes:

    • Gracefully handle messages with negative timestamps (#907).
    • Raise a proper error when encountering an unknown message version (#940).
    Source code(tar.gz)
    Source code(zip)
  • v1.12.0(May 8, 2017)

    New Features:

    • Added support for the ApiVersions request and response pair, and Kafka version 0.10.2 (#867). Note that you still need to specify the Kafka version in the Sarama configuration for the time being.
    • Added a Brokers method to the Client which returns the complete set of active brokers (#813).
    • Added an InSyncReplicas method to the Client which returns the set of all in-sync broker IDs for the given partition, now that the Kafka versions for which this was misleading are no longer in our supported set (#872).
    • Added a NewCustomHashPartitioner method which allows constructing a hash partitioner with a custom hash method in case the default (FNV-1a) is not suitable (#837, #841).

    Improvements:

    • Recognize more Kafka error codes (#859).

    Bug Fixes:

    • Fix an issue where decoding a malformed FetchRequest would not return the correct error (#818).
    • Respect ordering of group protocols in JoinGroupRequests. This fix is transparent if you're using the AddGroupProtocol or AddGroupProtocolMetadata helpers; otherwise you will need to switch from the GroupProtocols field (now deprecated) to use OrderedGroupProtocols (#812).
    • Fix an alignment-related issue with atomics on 32-bit architectures (#859).
    Source code(tar.gz)
    Source code(zip)
  • v1.11.0(Dec 20, 2016)

    New Features:

    • Metrics! Thanks to Sébastien Launay for all his work on this feature (#701, #746, #766).
    • Add support for LZ4 compression (#786).
    • Add support for ListOffsetRequest v1 and Kafka 0.10.1 (#775).
    • Added a HighWaterMarks method to the Consumer which aggregates the HighWaterMarkOffset values of its child topic/partitions (#769).

    Bug Fixes:

    • Fixed producing when using timestamps, compression and Kafka 0.10 (#759).
    • Added missing decoder methods to DescribeGroups response (#756).
    • Fix producer shutdown when Return.Errors is disabled (#787).
    • Don't mutate configuration in SyncProducer (#790).
    • Fix crash on SASL initialization failure (#795).
    Source code(tar.gz)
    Source code(zip)
franz-go contains a high performance, pure Go library for interacting with Kafka from 0.8.0 through 2.7.0+. Producing, consuming, transacting, administrating, etc.

franz-go - Apache Kafka client written in Go Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every

Travis Bischel 348 Sep 23, 2021
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Shopify 7.7k Sep 23, 2021
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Confluent Inc. 2.9k Sep 22, 2021
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Black Square Media 989 Sep 15, 2021
Higher level abstraction for Sarama.

kafka-do v0.1.5 kafka-do What Higher level abstraction for Sarama. Why We want to be able to write our kafka applications without making the same thin

seo.do 18 Sep 21, 2021
kafka watcher for casbin library

Casbin Kafka Watcher Casbin watcher for kafka This watcher library will enable users to dynamically change casbin policies through kakfa messages Infl

Aruna Prabashwara 3 May 8, 2021
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

CloudHut 1.6k Sep 23, 2021
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Obsidian Dynamics 51 Jun 13, 2021
Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Yuriy Nasretdinov 28 Sep 20, 2021
Kafka implemented in Golang with built-in coordination (No ZooKeeper, single binary install, Cloud Native)

Jocko Distributed commit log service in Go that is wire compatible with Kafka. Created by @travisjeffery, continued by nash. Goals: Protocol compatibl

Nash.io 104 Aug 9, 2021
pubsub controller using kafka and base on sarama. Easy controll flow for actions streamming, event driven.

Psub helper for create system using kafka to streaming and events driven base. Install go get github.com/teng231/psub have 3 env variables for config

Te Nguyen 3 Sep 15, 2021
Apache Pulsar Go Client Library

Apache Pulsar Go Client Library A Go client library for the Apache Pulsar project. Goal This projects is developing a pure-Go client library for Pulsa

The Apache Software Foundation 350 Sep 19, 2021
Go gRPC Kafka CQRS microservices with tracing

Golang CQRS Kafka gRPC Postgresql MongoDB Redis microservices example ?? ??‍?? Full list what has been used: Kafka as messages broker gRPC Go implemen

Alexander 18 Sep 13, 2021
Declarative streaming ETL for mundane tasks, written in Go

Benthos is a high performance and resilient stream processor, able to connect various sources and sinks in a range of brokering patterns and perform h

Ashley Jeffs 3.4k Sep 16, 2021
Open source Observability Platform. 👉 SigNoz helps developers find issues in their deployed applications & solve them quickly

SigNoz SigNoz is an opensource observability platform. SigNoz uses distributed tracing to gain visibility into your systems and powers data using Kafk

SigNoz 4.7k Sep 23, 2021
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Johannes Brüderl 1.4k Sep 23, 2021
ChanBroker, a Broker for goroutine, is simliar to kafka

Introduction chanbroker, a Broker for goroutine, is simliar to kafka In chanbroker has three types of goroutine: Producer Consumer(Subscriber) Broker

沉风 61 Aug 12, 2021
Build event-driven and event streaming applications with ease

Commander ?? Commander is Go library for writing event-driven applications. Enabling event sourcing, RPC over messages, SAGA's, bidirectional streamin

Jeroen Rinzema 55 Aug 4, 2021
Simple, high-performance event streaming broker

Styx Styx is a simple and high-performance event streaming broker. It aims to provide teams of all sizes with a simple to operate, disk-persisted publ

Dataptive 27 Aug 25, 2021