Confluent's Apache Kafka Golang client

Overview

Confluent's Golang Client for Apache KafkaTM

confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform.

Features:

  • High performance - confluent-kafka-go is a lightweight wrapper around librdkafka, a finely tuned C client.

  • Reliability - There are a lot of details to get right when writing an Apache Kafka client. We get them right in one place (librdkafka) and leverage this work across all of our clients (also confluent-kafka-python and confluent-kafka-dotnet).

  • Supported - Commercial support is offered by Confluent.

  • Future proof - Confluent, founded by the creators of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.

The Golang bindings provides a high-level Producer and Consumer with support for the balanced consumer groups of Apache Kafka 0.9 and above.

See the API documentation for more information.

Examples

High-level balanced consumer

import (
	"fmt"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}

	c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

	for {
		msg, err := c.ReadMessage(-1)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
		} else {
			// The client will automatically try to recover from all errors.
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}

	c.Close()
}

Producer

import (
	"fmt"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
	if err != nil {
		panic(err)
	}

	defer p.Close()

	// Delivery report handler for produced messages
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
				} else {
					fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
				}
			}
		}
	}()

	// Produce messages to topic (asynchronously)
	topic := "myTopic"
	for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
		p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          []byte(word),
		}, nil)
	}

	// Wait for message deliveries before shutting down
	p.Flush(15 * 1000)
}

More elaborate examples are available in the examples directory, including how to configure the Go client for use with Confluent Cloud.

Getting Started

Supports Go 1.11+ and librdkafka 1.4.0+.

Using Go Modules

Starting with Go 1.13, you can use Go Modules to install confluent-kafka-go.

Import the kafka package from GitHub in your code:

import "github.com/confluentinc/confluent-kafka-go/kafka"

Build your project:

go build ./...

If you are building for Alpine Linux (musl), -tags musl must be specified.

go build -tags musl ./...

A dependency to the latest stable version of confluent-kafka-go should be automatically added to your go.mod file.

Install the client

If Go modules can't be used we recommend that you version pin the confluent-kafka-go import to v1 using gopkg.in:

Manual install:

go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

Golang import:

import "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"

librdkafka

Prebuilt librdkafka binaries are included with the Go client and librdkafka does not need to be installed separately on the build or target system. The following platforms are supported by the prebuilt librdkafka binaries:

  • Mac OSX x64
  • glibc-based Linux x64 (e.g., RedHat, Debian, CentOS, Ubuntu, etc) - without GSSAPI/Kerberos support
  • musl-based Linux 64 (Alpine) - without GSSAPI/Kerberos support

When building your application for Alpine Linux (musl libc) you must pass -tags musl to go get, go build, etc.

CGO_ENABLED must NOT be set to 0 since the Go client is based on the C library librdkafka.

If GSSAPI/Kerberos authentication support is required you will need to install librdkafka separately, see the Installing librdkafka chapter below, and then build your Go application with -tags dynamic.

Installing librdkafka

If the bundled librdkafka build is not supported on your platform, or you need a librdkafka with GSSAPI/Kerberos support, you must install librdkafka manually on the build and target system using one of the following alternatives:

  • For Debian and Ubuntu based distros, install librdkafka-dev from the standard repositories or using Confluent's Deb repository.
  • For Redhat based distros, install librdkafka-devel using Confluent's YUM repository.
  • For MacOS X, install librdkafka from Homebrew. You may also need to brew install pkg-config if you don't already have it: brew install librdkafka pkg-config.
  • For Alpine: apk add librdkafka-dev pkgconf
  • confluent-kafka-go is not supported on Windows.
  • For source builds, see instructions below.

Build from source:

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install

After installing librdkafka you will need to build your Go application with -tags dynamic.

Note: If you use the master branch of the Go client, then you need to use the master branch of librdkafka.

confluent-kafka-go requires librdkafka v1.4.0 or later.

API Strands

There are two main API strands: function and channel based.

Function Based Consumer

Messages, errors and events are polled through the consumer.Poll() function.

Pros:

  • More direct mapping to underlying librdkafka functionality.

Cons:

  • Makes it harder to read from multiple channels, but a go-routine easily solves that (see Cons in channel based consumer above about outdated events).
  • Slower than the channel consumer.

See examples/consumer_example

Channel Based Consumer (deprecated)

Deprecated: The channel based consumer is deprecated due to the channel issues mentioned below. Use the function based consumer.

Messages, errors and events are posted on the consumer.Events channel for the application to read.

Pros:

  • Possibly more Golang:ish
  • Makes reading from multiple channels easy
  • Fast

Cons:

  • Outdated events and messages may be consumed due to the buffering nature of channels. The extent is limited, but not remedied, by the Events channel buffer size (go.events.channel.size).

See examples/consumer_channel_example

Channel Based Producer

Application writes messages to the producer.ProducerChannel. Delivery reports are emitted on the producer.Events or specified private channel.

Pros:

  • Go:ish
  • Proper channel backpressure if librdkafka internal queue is full.

Cons:

  • Double queueing: messages are first queued in the channel (size is configurable) and then inside librdkafka.

See examples/producer_channel_example

Function Based Producer

Application calls producer.Produce() to produce messages. Delivery reports are emitted on the producer.Events or specified private channel.

Pros:

  • Go:ish

Cons:

  • Produce() is a non-blocking call, if the internal librdkafka queue is full the call will fail.
  • Somewhat slower than the channel producer.

See examples/producer_example

License

Apache License v2.0

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by confluent-kafka-go. confluent-kafka-go has no affiliation with and is not endorsed by The Apache Software Foundation.

Developer Notes

See kafka/README

Contributions to the code, examples, documentation, et.al, are very much appreciated.

Make your changes, run gofmt, tests, etc, push your branch, create a PR, and sign the CLA.

Comments
  • How to use Golang client on Windows

    How to use Golang client on Windows

    Description

    How can I configure windows in order for my Golang client to work ? e.g. where should I place librdkafka client ?

    How to reproduce

    1. download librdkafka from https://www.nuget.org/packages/librdkafka.redist/ & unzip its contents
    2. go get -u github.com/confluentinc/confluent-kafka-go/kafka
    3. setup pkg-config for windows
    4. go run main.go results in
    Package rdkafka was not found in the pkg-config search path.
    Perhaps you should add the directory containing `rdkafka.pc'
    to the PKG_CONFIG_PATH environment variable
    No package 'rdkafka' found
    pkg-config: exit status 1
    
    opened by abhirockzz 93
  • Slow Producer and huge memory leak

    Slow Producer and huge memory leak

    I am sending messages to Kafka using this code:

    	deliveryChan := make(chan kafka.Event)
    	topic:=viper.GetString("kafka.forwardtopic")
    	kf.Producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(bulk)}, deliveryChan)
    	e := <-deliveryChan
    	m := e.(*kafka.Message)
    
    	if m.TopicPartition.Error != nil {
    		logger.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
    	}
    	close(deliveryChan)
    

    However this is extremely slow. Sometimes it takes a second or even 2. I guess it hangs on:

    e := <-deliveryChan Because it is waiting for Kafka acknowledge.

    So I tried the same without the channel because I don't really need Kafka acknowledge:

    	//deliveryChan := make(chan kafka.Event)
    	topic:=viper.GetString("kafka.forwardtopic")
    	kf.Producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(bulk)}, nil)
    	//e := <-deliveryChan
    	//m := e.(*kafka.Message)
    
    	//if m.TopicPartition.Error != nil {
    	//	logger.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
    	//}
    	//close(deliveryChan)
    

    But this creates a huge memory leak and my app crashes after few minutes:

    question wait info 
    opened by groyee 55
  • Schema registry support

    Schema registry support

    I am not able to pass url for schema registry url in the config map. It would be great if somebody point me to right place. I looked at all the documentation. We are using confluent kakfa and schema registry all over the place. In java & python we are able to pass. But not able to use it go.

    enhancement question 
    opened by skyrocknroll 47
  • Crash on message retry enqueue

    Crash on message retry enqueue

    Description

    We have large scale deployment of kafka producers using confluent-kafka-go and come across many occurrences of crashes associated with the following assertion failure:rd_kafka_assert(NULL, rkmq->rkmq_msg_cnt > 0) in rdkafka_msg.h. It appears to occur more often while we have under replicated partitions or while we perform partition reassignment. The following stack trace has been found to be shared among all the crash occurrences:

    Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
    Program terminated with signal SIGABRT, Aborted.
    #0  runtime.raise () at /usr/local/go/src/runtime/sys_linux_amd64.s:159
    +------------------------------------------------------------------------------+
    | backtrace                                                                    |
    +------------------------------------------------------------------------------+
    #0  runtime.raise () at /usr/local/go/src/runtime/sys_linux_amd64.s:159
    #1  0x0000000000458d82 in runtime.raisebadsignal (c=0x7f0fb77fbd98, sig=6) at /usr/local/go/src/runtime/signal_unix.go:491
    #2  0x00000000004591a3 in runtime.badsignal (c=0x7f0fb77fbd98, sig=6) at /usr/local/go/src/runtime/signal_unix.go:600
    #3  0x0000000000458988 in runtime.sigtrampgo (ctx=0x7f0fb77fbe40, info=0x7f0fb77fbf70, sig=6) at /usr/local/go/src/runtime/signal_unix.go:297
    #4  0x0000000000471863 in runtime.sigtramp () at /usr/local/go/src/runtime/sys_linux_amd64.s:352
    #5  
    #6  0x00007f0fdaecdc37 in __GI_raise ([email protected]=6) at ../nptl/sysdeps/unix/sysv/linux/raise.c:56
    #7  0x00007f0fdaed1028 in __GI_abort () at abort.c:89
    #8  0x000000000066456a in rd_kafka_crash ([email protected]=0x7c15c3 "rdkafka_msg.h", [email protected]=263, [email protected]=0x7c1980 <__FUNCTION__.19041> "rd_kafka_msgq_deq", [email protected]=0x0, [email protected]=0x7c1708 "assert: rkmq->rkmq_msg_cnt > 0") at rdkafka.c:3102
    #9  0x000000000041a520 in rd_kafka_msgq_deq (do_count=, rkm=, rkmq=) at rdkafka_msg.h:263
    #10 0x0000000000683e09 in rd_kafka_msgq_deq (do_count=, rkm=, rkmq=) at rdkafka_msg.h:264
    #11 rd_kafka_msgq_age_scan (rkmq=0x1ba88, [email protected]=0x7f0ebc044a88, timedout=0x1bad5, [email protected]=0x7f0fb77fc5f0, now=6, [email protected]=2173403318391) at rdkafka_msg.c:577
    #12 0x0000000000687875 in rd_kafka_topic_scan_all ([email protected]=0x7f0fcc001400, now=2173403318391) at rdkafka_topic.c:1135
    #13 0x0000000000662e56 in rd_kafka_topic_scan_tmr_cb (rkts=, arg=) at rdkafka.c:1194
    #14 0x000000000068a448 in rd_kafka_timers_run ([email protected]=0x7f0fcc001c18, [email protected]=0) at rdkafka_timer.c:251
    #15 0x000000000066c8d7 in rd_kafka_thread_main ([email protected]=0x7f0fcc001400) at rdkafka.c:1270
    #16 0x00000000006cdb97 in _thrd_wrapper_function (aArg=) at tinycthread.c:624
    #17 0x00007f0fdb470184 in start_thread (arg=0x7f0fb77fe700) at pthread_create.c:312
    #18 0x00007f0fdaf94ffd in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111
    

    How to reproduce

    We tried and have not yet been successful reproducing in a lab environment. We'd like to know of any suggestions of changes in the librdkafka code such that it prints out things useful to diagnose this issue. Also, we briefly reviewed the later librdkafka commits and didn't find anything that looks like it addresses this crash, but if we misidentified this, we would be happy to rebuild and retest with the lastest librdkafka commit in master.

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): confluent-kafka-go commit: 1112e2c08a15bce669a99cc35284a690fd2086b8 librdkafka: 0.11.4-RC1B
    • [x] Apache Kafka broker version: 0.11.0.1
    • [x] Client configuration: ConfigMap{...}
    			"bootstrap.servers":      brokerList,
    			"queue.buffering.max.ms": "500",
    			"compression.codec":      "gzip",
    			"batch.num.messages":     "256",
    			"security.protocol":      "ssl",
    			"queue.buffering.max.messages": 8000,
    			"broker.version.fallback":      "0.10.2.1",
    			"statistics.interval.ms":       60000,
    
    • [x] Operating system: ubuntu 14.04
    • [ ] Provide client logs (with "debug": ".." as necessary) We didn't know witch debug setting would be useful for this so didn't enable any debug flag. Please let us know if there is a debug flag we should enable.
    • [ ] Provide broker log excerpts
    • [x] Critical issue
    bug librdkafka GREAT REPORT 
    opened by lintang0 39
  • Clang error when building with v1.4.0

    Clang error when building with v1.4.0

    Description

    I upgraded to the latest version of the go client (v1.4.0) and ran brew upgrade librdkafka. But the client is looking for librdkafka in the package directory and it is not present.

    How to reproduce

    In an existing project that uses confluent-kafka-go, run go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka and go build ./...

    You should receive something similar to:

    clang: error: no such file or directory: '$GOHOME/src/github.com/your-user/your-service/vendor/gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka
    /librdkafka_darwin.a'
    

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): v1.4.0 (both)
    • [ n/a ] Apache Kafka broker version:
    • [ n/a ] Client configuration:
    • [x] Operating system: Mac OS X 10.15.4
    • [ n/a ] Provide client logs
    • [ n/a ] Provide broker log excerpts
    • [ ] Critical issue
    opened by jhuggart 36
  • Consumer.Close() never returns with multiple consumers in the same process

    Consumer.Close() never returns with multiple consumers in the same process

    I'm running 2 consumers on the same topic and the same consumer group, using Poll() to consume messages. After having consumed messages with both of them, calling Close() on any of the consumers blocks indefinitely, reproducible 100% of the time. Debug logs show things being stuck at "waiting for rebalance_cb"

    This is the gist with the go code to reproduce and all relevant information: https://gist.github.com/dtheodor/26821f951502aab2e325f860b459cbfc

    This could be the same issue as https://github.com/confluentinc/confluent-kafka-go/issues/65 but you are talking about using the Events channel there, which I am not.

    Calling Unassign() right before calling Close() allows Close() to return successfully, but having to call Unassign looks bogus since I never have to use Assign or Unassign in any other place, this is managed by the lib for me.

    bug librdkafka GREAT REPORT 
    opened by dtheodor 35
  • 1.4.0: Building apps fails on latest alpine, when using bundled librdkafka

    1.4.0: Building apps fails on latest alpine, when using bundled librdkafka

    Description

    When using the 1.4.0 release, the bundled librdkafka fails to work as intended with the latest alpine.

    How to reproduce

    Build an application which uses github.com/confluentinc/confluent-kafka-go, in a container, using the alpine:3.11 image (which, at the time of writing, is actually alpine:3.11.5) as a base. Errors such as the following will be displayed:

     go: downloading github.com/confluentinc/confluent-kafka-go v1.4.0
     # github.com/confluentinc/confluent-kafka-go/kafka
     /usr/lib/gcc/x86_64-alpine-linux-musl/9.2.0/../../../../x86_64-alpine-linux-musl/bin/ld: /go/pkg/mod/github.com/confluentinc/[email protected]/kafka/librdkafka/librdkafka_glibc_linux.a(rdkafka_txnmgr.o): in function `rd_kafka_txn_set_fatal_error':
     (.text+0x141): undefined reference to `__strdup'
     /usr/lib/gcc/x86_64-alpine-linux-musl/9.2.0/../../../../x86_64-alpine-linux-musl/bin/ld: /go/pkg/mod/github.com/confluentinc/[email protected]/kafka/librdkafka/librdkafka_glibc_linux.a(rdkafka_txnmgr.o): in function `rd_kafka_txn_set_abortable_error':
     (.text+0x64f): undefined reference to `__strdup'
    ...
    

    Installing a fresh build of librdkafka 1.4.0 into the container and building the application with -tags dynamic resolves the issue.

    Could possibly be because the bundled librdkafka was built from a :edge release of alpine, instead of :latest?

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): 1.4.0, bundled.
    • [ ] Apache Kafka broker version: N/A
    • [ ] Client configuration: N/A
    • [x] Operating system: Alpine Linux
    • [ ] Provide client logs (with "debug": ".." as necessary): N/A
    • [ ] Provide broker log excerpts: N/A
    • [ ] Critical issue: No
    opened by bschofield 28
  • WIP: Avro/Schema-Registry

    WIP: Avro/Schema-Registry

    This is a request for a review/comment of a WIP Schema Registry implementation.

    I have attempted to stick to the Java API as much as possible with some minor changes but I've run into a bit of a mental block on deserialization. I'm not sure how to best handle deserialization into specific types.

    I have toyed with a few implementations but I think they too are less than ideal

    1. take a new()interface{} function pointer upon instantiating the deserializer. This is similar to the way sync.Pool works

    2. Move Avro serialization/deserialization out of the Producer/Consumer opting instead to have a stateful marshaler/unmarshaler.

    3. The current implementation which takes an instance on instantiation and makes a zero copy then places the results in the Message.Object field which was added in the serde_support development branch to facilitate handling objects for serialization/deserialization.

    In the end I suspect the best way to move forward is option 2. This puts serialization/deserialization outside of the producer/consumer however which may not be what we want.

    All relevant code is located in the encoding/avro folder. Specifically serializer.go

    opened by rnpridgeon 23
  • librdkafka v0.11.4 does not exist

    librdkafka v0.11.4 does not exist

    Description

    The go get -u github.com/confluentinc/confluent-kafka-go/kafka is unable to install the client and produces the error

    ../github.com/confluentinc/confluent-kafka-go/kafka/00version.go:43:2: error: "confluent-kafka-go requires librdkafka v0.11.4 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
    #error "confluent-kafka-go requires librdkafka v0.11.4 or later. Install the latest version of librdkafka from Homebrew running `brew install librdkafka` or `brew upgrade librdkafka`"
     ^
    1 error generated.
    

    When brew install librdkafa is run it fetches the v0.11.3 which is the latest stable version released https://github.com/edenhill/librdkafka/releases

    » brew info librdkafka                                                                                                                                                                             
    librdkafka: stable 0.11.3 (bottled), HEAD
    The Apache Kafka C/C++ library
    https://github.com/edenhill/librdkafka
    /usr/local/Cellar/librdkafka/0.11.3 (14 files, 1.9MB) *
      Poured from bottle on 2018-03-16 at 13:33:24
    From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/librdkafka.rb
    ==> Dependencies
    Build: pkg-config ✔
    Required: lzlib ✔, openssl ✔
    Recommended: lz4 ✔
    ==> Options
    --without-lz4
    	Build without lz4 support
    --HEAD
    	Install HEAD version
    
    • [ ] confluent-kafka-go version (0.11.0)
    • [ ] Operating system: Mac OSX
    opened by livnoorbrar 21
  • segmentation fault in rd_kafka_cgrp_handle_SyncGroup

    segmentation fault in rd_kafka_cgrp_handle_SyncGroup

    Description

    I'm trying to build a simple consumer that just logs messages, but am getting seg faults when running docker. It runs fine locally on my mac.

    Here is the backtrace from the coredump.

    (gdb) bt
    #0  free (p=<optimized out>) at src/malloc/malloc.c:482
    #1  0x0000000000786126 in trim ([email protected]=0x1635990, n=<optimized out>) at src/malloc/malloc.c:317
    #2  0x00000000007864d4 in malloc (n=<optimized out>, [email protected]=85) at src/malloc/malloc.c:364
    #3  0x00000000007929fa in __strdup (s=0x7fb46b044120 "v1.requestor._requestorId.orgs._orgId.users._userId.tokens._tokenId.command._command") at src/string/strdup.c:8
    #4  0x0000000000746b17 in rd_strdup (s=0x7fb46b044120 "v1.requestor._requestorId.orgs._orgId.users._userId.tokens._tokenId.command._command") at rd.h:117
    #5  rd_kafka_topic_partition_list_add0 (rktparlist=0x16095c0, topic=0x7fb46b044120 "v1.requestor._requestorId.orgs._orgId.users._userId.tokens._tokenId.command._command", partition=5, _private=0x0)
        at rdkafka_partition.c:2573
    #6  0x000000000073ad84 in rd_kafka_cgrp_handle_SyncGroup ([email protected]=0x157e740, [email protected]=0x157f760, [email protected]=RD_KAFKA_RESP_ERR_NO_ERROR, [email protected]=0x7fb46b057e30)
        at rdkafka_cgrp.c:3243
    #7  0x0000000000727e4b in rd_kafka_handle_SyncGroup (rk=<optimized out>, rkb=0x157f760, err=<optimized out>, rkbuf=<optimized out>, request=0x15808c0, opaque=0x157e740) at rdkafka_request.c:1106
    #8  0x00000000007122fe in rd_kafka_buf_callback (rk=<optimized out>, rkb=<optimized out>, err=RD_KAFKA_RESP_ERR_NO_ERROR, response=0x16095e0, request=0x15808c0) at rdkafka_buf.c:448
    #9  0x00000000007132f7 in rd_kafka_buf_handle_op (rko=<optimized out>, err=<optimized out>) at rdkafka_buf.c:397
    #10 0x000000000071f175 in rd_kafka_op_handle_std (rk=<optimized out>, rkq=<optimized out>, rko=<optimized out>, cb_type=<optimized out>) at rdkafka_op.c:616
    #11 0x000000000071f1b5 in rd_kafka_op_handle (rk=0x157da40, rkq=0x7fb46b058110, rko=0x15e3ae0, cb_type=RD_KAFKA_Q_CB_CALLBACK, opaque=0x0, callback=0x0) at rdkafka_op.c:647
    #12 0x00000000007184dc in rd_kafka_q_serve (rkq=0x157e420, timeout_ms=<optimized out>, [email protected]=0, [email protected]=RD_KAFKA_Q_CB_CALLBACK, [email protected]=0x0, 
        [email protected]=0x0) at rdkafka_queue.c:467
    #13 0x00000000006f0484 in rd_kafka_thread_main ([email protected]=0x157da40) at rdkafka.c:1418
    #14 0x000000000074e637 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:633
    #15 0x0000000000793bdc in start (p=0x7fb46b059ae8) at src/thread/pthread_create.c:150
    #16 0x0000000000794b5e in __clone () at src/thread/x86_64/clone.s:21
    

    Here are the last few lines of the rdkafka debug output before the coredump

    %7|1535210152.560|ASSIGN|rdkafka#consumer-1| [thrd:main]:   v1.requestor._requestorId.orgs._orgId.customer-extensions._customerId [5]
    %7|1535210152.560|ASSIGNOR|rdkafka#consumer-1| [thrd:main]: Group "kafkalogger": "range" assignor run for 1 member(s)
    %7|1535210152.560|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "kafkalogger" changed join state wait-metadata -> wait-sync (v1, state up)
    %7|1535210152.561|SEND|rdkafka#consumer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/1: Sent SyncGroupRequest (v0, 31712 bytes @ 0, CorrId 7)
    %7|1535210152.561|BROADCAST|rdkafka#consumer-1| [thrd:kafka:9092/bootstrap]: Broadcasting state change
    %7|1535210152.570|RECV|rdkafka#consumer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/1: Received SyncGroupResponse (v0, 31580 bytes, CorrId 7, rtt 9.24ms)
    %7|1535210152.570|SYNCGROUP|rdkafka#consumer-1| [thrd:main]: SyncGroup response: Success (31574 bytes of MemberState data)
    

    How to reproduce

    I'm running against a single node kafka broker running kafka 1.1.0 in a docker image running on alpine linux 3.7. It has a lot of topics though (~300).

    The code I'm running is essentially the example consumer in the README of this repo.

    main.go

    package main
    
    import (
    	"fmt"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    
    	c, err := kafka.NewConsumer(&kafka.ConfigMap{
    		"bootstrap.servers": "kafka:9092",
    		"group.id":          "myGroup",
    		"auto.offset.reset": "earliest",
    	})
    
    	if err != nil {
    		panic(err)
    	}
    
    	c.SubscribeTopics([]string{"^v.*"}, nil)
    
    	for {
    		msg, err := c.ReadMessage(-1)
    		if err == nil {
    			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
    		} else {
    			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
    			break
    		}
    	}
    
    	c.Close()
    }
    

    Dockerfile

    FROM golang:1.10-alpine3.8 as builder
    
    # librdkakfa
    RUN apk --update --no-cache add git openssh bash g++ make
    RUN git clone https://github.com/edenhill/librdkafka.git
    WORKDIR ./librdkafka
    RUN ./configure --prefix /usr && make && make install
    
    ARG gopath="/go"
    ENV GOPATH=${gopath}
    WORKDIR $GOPATH/src/github.com/gerad/kafkalogger/
    
    COPY . ./
    RUN go build -tags static_all -o kafkalogger .
    
    FROM alpine:3.8
    WORKDIR /kafkalogger/
    # Install CA certificates
    RUN apk --update --no-cache add ca-certificates gdb
    ENV USER gerad
    RUN addgroup -S $USER && adduser -S $USER $USER && chown $USER:$USER .
    USER gerad
    RUN ulimit -c unlimited
    COPY --from=builder /go/src/github.com/gerad/kafkalogger/kafkalogger .
    
    ENTRYPOINT ["./kafkalogger"]
    

    I see this behavior on master and the latest release v0.11.4

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): 722431 0.11.6-PRE2-10-gbfc821
    • [x] Apache Kafka broker version: 1.1.0
    • [x] Client configuration:
    ConfigMap{
      "bootstrap.servers": "kafka:9092",
      "group.id":          "myGroup",
      "auto.offset.reset": "earliest",
    }
    
    • [x] Operating system: Linux - Alpine 3.8
    • [x] Provide client logs (with "debug": ".." as necessary)
    • [x] Provide broker log excerpts
    [2018-08-25 15:43:00,013] INFO [GroupCoordinator 1]: Preparing to rebalance group myGroup with old generation 12 (__consumer_offsets-17) (kafka.coordinator.group.GroupCoordinator)
    [2018-08-25 15:43:03,016] INFO [GroupCoordinator 1]: Stabilized group myGroup generation 13 (__consumer_offsets-17) (kafka.coordinator.group.GroupCoordinator)
    [2018-08-25 15:43:03,053] INFO [GroupCoordinator 1]: Assignment received from leader for group myGroup for generation 13 (kafka.coordinator.group.GroupCoordinator)
    
    • [ ] Critical issue
    GREAT REPORT 
    opened by gerad 19
  • Make dynamic build use system librdkafka headers

    Make dynamic build use system librdkafka headers

    cgo adds -I${SRCDIR} to the header search path by default; therefore, using #include <librdkafka/rdkafka.h> can actually still find the bundled librdkafka header in preference to the system one even when -tags=dynamic is specified.

    This change instead makes the static build set a preprocessor macro USE_VENDORED_LIBRDKAFKA, which is used in a new header rdkafka_select.h to either import the bundled one (with a special name, and quotes), or the system one (with the <> import and the expected name).

    Fixes #514

    opened by KJTsanaktsidis 18
  • [DO NOT MERGE] librdkafka static bundle fixes required for next release

    [DO NOT MERGE] librdkafka static bundle fixes required for next release

    THIS MUST NOT BE MERGED BEFORE THE NEXT IMPORT

    So what you do is, when it is time for the next librdkafka import or RC: $ git checkout master $ git pull --rebase origin master $ git fetch --tags $ git checkout impfix # this PR $ cd kafka/librdkafka_vendor $ ./import.sh --develop /tmp/librdkafka-static-bundle-v2.0.0-RC2.tgz # or whatever version

    If everything is ok, push it:

    $ git checkout import_v2.0.0-RC2 # or whatever verison it is $ push --dry-run origin import_v2.0.0-RC2

    Go to github, review, MERGE! Do not squash or rebase, MERGE!

    opened by edenhill 0
  • Offset reset callback is required when doing manual commits

    Offset reset callback is required when doing manual commits

    Description

    When the auto.offset.reset is set to latest and the enable.auto.offset.store is set to false, the reset offset should be committed to the broker in-order to report the correct lag. When there is an offset reset we should be notified of this event so that we can commit that offset to the broker. If we do not commit this offset to the broker then lag is reported even though there isn't any lag

    How to reproduce

    Checklist

    Please provide the following information:

    • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): v1.9.2
    • [x] Apache Kafka broker version: v2.3.1
    • [x] Client configuration: ConfigMap{ "bootstrap.servers": "<sensitive>", "debug": "consumer,cgrp", "group.id": "consumer_001", "enable.auto.commit": false, "enable.auto.offset.store": false, "session.timeout.ms": 10000, "auto.offset.reset": "latest", "max.poll.interval.ms": 15000, }
    • [x] Operating system: OSX 11.5.2
    • [x] Provide client logs (with "debug": ".." as necessary)
    Screenshot 2022-12-14 at 12 20 53 PM
    • [x] Broker lag metrics
    Screenshot 2022-12-14 at 12 21 26 PM Screenshot 2022-12-14 at 12 21 46 PM
    • [x] Critical issue

    As you can see that the offset was reset to the log end offset because offset reset was set to latest, but this offset is not committed as we do not know which offset to store using the StoreOffsets method

    opened by shubhang93 0
  • Question Regarding Kafka Protobuf Producer - Specifying Schemas and Versions

    Question Regarding Kafka Protobuf Producer - Specifying Schemas and Versions

    Description

    Hi,

    I am new to Kafka Connect and working with Protobuf serialization and also this entire process in general. My current task is to evaluate the data flow process between Kafka topics and inserts into TimescaleDB using JDBC sink connector with Kafka Schema Registry. I have almost everything up and running and am trying to test the E2E flow using a sample producer.

    However, I had a few general questions on Kafka Protobuf Producer, an example is provided in this repo here: https://github.com/confluentinc/confluent-kafka-go/blob/5ba3caae52b04aaf7b4e22d5e30737b42ca948cd/schemaregistry/serde/protobuf/protobuf.go

    I was hoping someone could explain to me a few things here:

    In the referenced example above, only the schema URL is given. But let's say that you had multiple proto schema subjects available (example: proto.testrecord and proto.anotherrecord) in the Schema Registry. In this new example, we have the two schemas showing as:

    proto.testrecord

    message TestRecord {
        string cluster_name = 1;
        string id = 2;
        string hostname = 3;
        string metric = 4;
        int64 value = 5;
        string value_text = 6;
        int64 timestamp = 7;
    } 
    

    proto.anotherrecord

    message AnotherRecord{
        string source_name= 1;
        string map_id= 2;
        string hostname = 3;
        string metric_group = 4;
        int64 value = 5;
        string value_text = 6;
        int64 timestamp = 7;
    } 
    

    Let's say you also have two topics and one (producer 1) should use the first schema subject and the second (producer 2) should use the second schema subject to validate/conform the data for inserts. Now let's you are creating a producer for first topic (producer 1) and it should use the proto.testrecord subject from the Schema Registry for serialization. How would you configure/tell the producer to use the correct schema subject and also the exact version (if multiple existed)? Or are not supposed to specify those due to the way the process works?

    I noticed the repo example provided doesn't specify any of that information and I am trying to understand exactly how it knows or defaults to a certain subject and version.

    According to this: https://github.com/confluentinc/confluent-kafka-go/blob/5ba3caae52b04aaf7b4e22d5e30737b42ca948cd/schemaregistry/serde/config.go#L36. Looks like you can specify the SchemaId (via UseSchemaID var) which tells it which schema subject to use but unsure about how to specify versioning.

    I probably missed something during my reading and am not understanding things correctly and could use some help/discussion.

    Any help is appreciated, thanks!

    opened by pchang388 1
  • Add SeekPartitions wrapper for rd_kafka_seek_partitions function to support seeking multiple partitions at a time

    Add SeekPartitions wrapper for rd_kafka_seek_partitions function to support seeking multiple partitions at a time

    Description

    The Seek function only supports seeking one partition at a time. This is due to the underlying limitation of the rd_kafka_seek function also only seeking one partition at a time.

    However, the rd_kafka_seek function was marked as deprecated, with rd_kafka_seek_partitions being the new preferred alternative. This function takes a list of partitions.

    Please consider adding a new function named SeekPartitions or similar to the Go consumer, which wraps rd_kafka_seek_partitions. This will allow us to avoid having to write loops like this:

    	assigned, _ := consumer.Assignment()
    	for i := range assigned {
    		assigned[i].Offset = 0
    	}
    	for _, a := range assigned {
    		_ = consumer.Seek(a, 0)
    	}
    

    and instead write:

    	assigned, _ := consumer.Assignment()
    	for i := range assigned {
    		assigned[i].Offset = 0
    	}
    	_ = consumer.SeekPartitions(assigned, 0)
    

    Checklist

    Please provide the following information:

    • [X] confluent-kafka-go and librdkafka version (LibraryVersion()): master branch: a reference to rd_kafka_seek_partitions cannot be found anywhere indicating that there is no existing wrapper function
    • [ ] Apache Kafka broker version: N/A
    • [ ] Client configuration: ConfigMap{...} N/A
    • [ ] Operating system: N/A
    • [ ] Provide client logs (with "debug": ".." as necessary): N/A
    • [ ] Provide broker log excerpts: N/A
    • [ ] Critical issue
    opened by james-johnston-thumbtack 2
  • Bug/producer consumer adminclient validity handling

    Bug/producer consumer adminclient validity handling

    • Added isClosed new field to Producer struct which acts as flag to know if Producer is open or closed.

    • Added new function verifyProducer() function which checks if producer is valid or not by doing null checks and closed checks to avoid seg faults for C-GO or go-panics.

    opened by PrasanthV454 0
Releases(v1.9.2)
  • v1.9.2(Aug 2, 2022)

  • v1.9.1(Jul 7, 2022)

  • v1.9.0(Jun 21, 2022)

    v1.9.0 is a feature release:

    • OAUTHBEARER OIDC support
    • KIP-140 Admin API ACL support
    • Added MockCluster for functional testing of applications without the need for a real Kafka cluster (by @SourceFellows and @kkoehler, #729). See examples/mock_cluster.

    Fixes

    • Fix Rebalance events behavior for static membership (@jliunyu, #757, #798).
    • Fix consumer close taking 10 seconds when there's no rebalance needed (@jliunyu, #757).

    confluent-kafka-go is based on librdkafka v1.9.0, see the librdkafka release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Source code(tar.gz)
    Source code(zip)
  • v1.8.2(Dec 14, 2021)

    confluent-kafka-go v1.8.2

    This is a maintenance release:

    • Bundles librdkafka v1.8.2
    • Check termination channel while reading delivery reports (by @zjj)
    • Added convenience method Consumer.StoreMessage() (@finncolman, #676)

    confluent-kafka-go is based on librdkafka v1.8.2, see the librdkafka release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Note: There were no confluent-kafka-go v1.8.0 and v1.8.1 releases.

    Source code(tar.gz)
    Source code(zip)
  • v1.7.0(May 11, 2021)

    confluent-kafka-go is based on librdkafka v1.7.0, see the librdkafka release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Enhancements

    • Experimental Windows support (by @neptoess).
    • The produced message headers are now available in the delivery report Message.Headers if the Producer's go.delivery.report.fields configuration property is set to include headers, e.g.: "go.delivery.report.fields": "key,value,headers" This comes at a performance cost and are thus disabled by default.

    Fixes

    • AdminClient.CreateTopics() previously did not accept default value(-1) of ReplicationFactor without specifying an explicit ReplicaAssignment, this is now fixed.
    Source code(tar.gz)
    Source code(zip)
  • v1.6.1(Mar 11, 2021)

    v1.6.1

    v1.6.1 is a feature release:

    • KIP-429: Incremental consumer rebalancing - see cooperative_consumer_example.go for an example how to use the new incremental rebalancing consumer.
    • KIP-480: Sticky producer partitioner - increase throughput and decrease latency by sticking to a single random partition for some time.
    • KIP-447: Scalable transactional producer - a single transaction producer can now be used for multiple input partitions.
    • Add support for go.delivery.report.fields by @kevinconaway

    Fixes

    • For dynamically linked builds (-tags dynamic) there was previously a possible conflict between the bundled librdkafka headers and the system installed ones. This is now fixed. (@KJTsanaktsidis)

    confluent-kafka-go is based on and bundles librdkafka v1.6.1, see the librdkafka release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Source code(tar.gz)
    Source code(zip)
  • v1.5.2(Nov 5, 2020)

    confluent-kafka-go v1.5.2

    v1.5.2 is a maintenance release with the following fixes and enhancements:

    • Bundles librdkafka v1.5.2 - see release notes for all enhancements and fixes.
    • Documentation fixes

    confluent-kafka-go is based on librdkafka v1.5.2, see the librdkafka release notes for a complete list of changes, enhancements, fixes and upgrade considerations.

    Source code(tar.gz)
    Source code(zip)
  • v1.4.2(May 7, 2020)

    confluent-kafka-go v1.4.2

    v1.4.2 is a maintenance release:

    • The bundled librdkafka directory (kafka/librdkafka) is no longer pruned by Go mod vendor import.
    • Bundled librdkafka upgraded to v1.4.2, highlights:
      • System root CA certificates should now be picked up automatically on most platforms
      • Fix produce/consume hang after partition goes away and comes back, such as when a topic is deleted and re-created (regression in v1.3.0).

    librdkafka v1.4.2 changes

    See the librdkafka v1.4.2 release notes for changes to the bundled librdkafka included with the Go client.

    Source code(tar.gz)
    Source code(zip)
  • v1.4.0(Apr 8, 2020)

    confluent-kafka-go v1.4.0

    • Added Transactional Producer API and full Exactly-Once-Semantics (EOS) support.
    • A prebuilt version of the latest version of librdkafka is now bundled with the confluent-kafka-go client. A separate installation of librdkafka is NO LONGER REQUIRED or used.
    • Added support for sending client (librdkafka) logs to Logs() channel.
    • Added Consumer.Position() to retrieve the current consumer offsets.
    • The Error type now has additional attributes, such as IsRetriable() to deem if the errored operation can be retried. This is currently only exposed for the Transactional API.
    • Removed support for Go < 1.9

    Transactional API

    librdkafka and confluent-kafka-go now has complete Exactly-Once-Semantics (EOS) functionality, supporting the idempotent producer (since v1.0.0), a transaction-aware consumer (since v1.2.0) and full producer transaction support (in this release). This enables developers to create Exactly-Once applications with Apache Kafka.

    See the Transactions in Apache Kafka page for an introduction and check the transactions example for a complete transactional application example.

    Bundled librdkafka

    The confluent-kafka-go client now comes with batteries included, namely prebuilt versions of librdkafka for the most popular platforms, you will thus no longer need to install or manage librdkafka separately.

    Supported platforms are:

    • Mac OSX
    • glibc-based Linux x64 (e.g., RedHat, Debian, etc) - lacks Kerberos/GSSAPI support
    • musl-based Linux x64 (Alpine) - lacks Kerberos/GSSAPI support

    These prebuilt librdkafka has all features (e.g., SSL, compression, etc) except for the Linux builds which due to libsasl2 dependencies does not have Kerberos/GSSAPI support. If you need Kerberos support, or you are running on a platform where the prebuilt librdkafka builds are not available (see above), you will need to install librdkafka separately (preferably through the Confluent APT and RPM repositories) and build your application with -tags dynamic to disable the builtin librdkafka and instead link your application dynamically to librdkafka.

    librdkafka v1.4.0 changes

    Full librdkafka v1.4.0 release notes.

    Highlights:

    • KIP-98: Transactional Producer API
    • KIP-345: Static consumer group membership (by @rnpridgeon)
    • KIP-511: Report client software name and version to broker
    • SASL SCRAM security fixes.
    Source code(tar.gz)
    Source code(zip)
  • v1.3.0(Dec 17, 2019)

    confluent-kafka-go v1.3.0

    • Purge messages API (by @khorshuheng at GoJek).

    • ClusterID and ControllerID APIs.

    • Go Modules support.

    • Fixed memory leak on calls to NewAdminClient(). (discovered by @gabeysunda)

    • Requires librdkafka v1.3.0 or later

    librdkafka v1.3.0 changes

    Full librdkafka v1.3.0 release notes.

    • KIP-392: Fetch messages from closest replica/follower (by @mhowlett).
    • Experimental mock broker to make application and librdkafka development testing easier.
    • Fixed consumer_lag in stats when consuming from broker versions <0.11.0.0 (regression in librdkafka v1.2.0).
    Source code(tar.gz)
    Source code(zip)
  • v1.1.0(Jul 15, 2019)

    confluent-kafka-go v1.1.0

    • OAUTHBEARER SASL authentication (KIP-255) by Ron Dagostini (@rondagostino) at StateStreet.
    • Offset commit metadata (@damour, #353)
    • Requires librdkafka v1.1.0 or later

    Noteworthy librdkafka v1.1.0 changes

    Full librdkafka v1.1.0 release notes.

    • SASL OAUTHBEARER support (by @rondagostino at StateStreet)
    • In-memory SSL certificates (PEM, DER, PKCS#12) support (by @noahdav at Microsoft)
    • Pluggable broker SSL certificate verification callback (by @noahdav at Microsoft)
    • Use Windows Root/CA SSL Certificate Store (by @noahdav at Microsoft)
    • ssl.endpoint.identification.algorithm=https (off by default) to validate the broker hostname matches the certificate. Requires OpenSSL >= 1.0.2.
    • Improved GSSAPI/Kerberos ticket refresh

    Upgrade considerations

    • Windows SSL users will no longer need to specify a CA certificate file/directory (ssl.ca.location), librdkafka will load the CA certs by default from the Windows Root Certificate Store.
    • SSL peer (broker) certificate verification is now enabled by default (disable with enable.ssl.certificate.verification=false)
    • %{broker.name} is no longer supported in sasl.kerberos.kinit.cmd since kinit refresh is no longer executed per broker, but per client instance.

    SSL

    New configuration properties:

    • ssl.key.pem - client's private key as a string in PEM format
    • ssl.certificate.pem - client's public key as a string in PEM format
    • enable.ssl.certificate.verification - enable(default)/disable OpenSSL's builtin broker certificate verification.
    • enable.ssl.endpoint.identification.algorithm - to verify the broker's hostname with its certificate (disabled by default).
    • The private key data is now securely cleared from memory after last use.

    Enhancements

    • Bump message.timeout.ms max value from 15 minutes to 24 days (@sarkanyi)

    Fixes

    • SASL GSSAPI/Kerberos: Don't run kinit refresh for each broker, just per client instance.
    • SASL GSSAPI/Kerberos: Changed sasl.kerberos.kinit.cmd to first attempt ticket refresh, then acquire.
    • SASL: Proper locking on broker name acquisition.
    • Consumer: max.poll.interval.ms now correctly handles blocking poll calls, allowing a longer poll timeout than the max poll interval.
    Source code(tar.gz)
    Source code(zip)
  • v1.0.0(Mar 29, 2019)

    confluent-kafka-go v1.0.0

    This release adds support for librdkafka v1.0.0, featuring the EOS Idempotent Producer, Sparse connections, KIP-62 - max.poll.interval.ms support, zstd, and more.

    See the librdkafka v1.0.0 release notes for more information and upgrade considerations.

    Go client enhancements

    • Now requires librdkafka v1.0.0.
    • A new IsFatal() function has been added to KafkaError to help the application differentiate between temporary and fatal errors. Fatal errors are currently only triggered by the idempotent producer.
    • Added kafka.NewError() to make it possible to create error objects from user code / unit test (Artem Yarulin)

    Go client fixes

    • Deprecate the use of default.topic.config. Topic configuration should now be set on the standard ConfigMap.
    • Reject delivery.report.only.error=true on producer creation (#306)
    • Avoid use of "Deprecated: " prefix (#268)
    • PartitionEOF must now be explicitly enabled thru enable.partition.eof

    Make sure to check out the Idempotent Producer example

    Source code(tar.gz)
    Source code(zip)
  • v0.11.6(Oct 25, 2018)

    Admin API

    This release adds support for the Topic Admin API (KIP-4):

    • Create and delete topics
    • Increase topic partition count
    • Read and modify broker and topic configuration
    • Requires librdkafka >= v0.11.6
    results, err := a.CreateTopics(
    	ctx,
    	// Multiple topics can be created simultaneously
    	// by providing additional TopicSpecification structs here.
    	[]kafka.TopicSpecification{{
    		Topic:             "mynewtopic",
    		NumPartitions:     20,
    		ReplicationFactor: 3}})
    

    More examples.

    Fixes and enhancements

    • Make sure plugins are set before other configuration options (#225, @dtheodor)
    • Fix metadata memory leak
    • Clone config before mutating it in NewProducer and NewConsumer (@vlad-alexandru-ionescu)
    • Enable Error events to be emitted from librdkafka errors, e.g., ErrAllBrokersDown, et.al (#200)
    Source code(tar.gz)
    Source code(zip)
  • v0.11.4(Mar 28, 2018)

    Announcements

    • This release drops support for Golang < 1.7

    • Requires librdkafka v0.11.4 or later

    Message header support

    Support for Kafka message headers has been added (requires broker version >= v0.11.0).

    When producing messages, pass a []kafka.Header list:

            err = p.Produce(&kafka.Message{
                    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                    Value:          []byte(value),
                    Headers:        []kafka.Header{{"myTestHeader", []byte("header values are binary")}},
            }, deliveryChan)
    

    Message headers are available to the consumer as Message.Headers:

    	msg, err := c.ReadMessage(-1)
    	if err != nil {
    		fmt.Printf("%% Consumer error: %v\n", err)
    		continue
    	}
    	fmt.Printf("%% Message on %s:\n%s\n", msg.TopicPartition, string(msg.Value))
    	if msg.Headers != nil {
    		fmt.Printf("%% Headers: %v\n", msg.Headers)
    	}
    
    

    Enhancements

    • Message Headers support
    • Close event channel when consumer is closed (#123 by @czchen)
    • Added ReadMessage() convenience method to Consumer
    • producer: Make events channel size configurable (@agis)
    • Added Consumer.StoreOffsets() (#72)
    • Added ConfigMap.Get() (#26)
    • Added Pause() and Resume() APIs
    • Added Consumer.Committed() API
    • Added OffsetsForTimes() API to Consumer and Producer

    Fixes

    • Static builds should now work on both OSX and Linux (#137, #99)
    • Update error constants from librdkafka
    • Enable produce.offset.report by default (unless overriden)
    • move test helpers that need testing pkg to _test.go file (@gwilym)
    • Build and run-time checking of librdkafka version (#88)
    • Remove gotos (@jadekler)
    • Fix Producer Value&Key slice referencing to avoid cgo pointer checking failures (#24)
    • Fix Go 1.10 build errors (drop pkg-config --static ..)
    Source code(tar.gz)
    Source code(zip)
  • v0.11.0(Jul 25, 2017)

    This is a minimal librdkafka version-synchronized release of the Go client.

    Changes:

    • Requires librdkafka v0.11.0 or later
    • Added stats events (#57)
    • Updated librdkafka error codes
    • Fix signal channel buffering in example (#66)
    Source code(tar.gz)
    Source code(zip)
Owner
Confluent Inc.
Real-time streams powered by Apache Kafka®
Confluent Inc.
franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Travis Bischel 865 Dec 29, 2022
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Shopify 9.5k Jan 1, 2023
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

CloudHut 2.9k Jan 3, 2023
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Johannes Brüderl 1.8k Dec 31, 2022
CLI Tool to Stress Apache Kafka Clusters

Kafka Stress - Stress Test Tool for Kafka Clusters, Producers and Consumers Tunning Installation Docker docker pull fidelissauro/kafka-stress:latest d

Matheus Fidelis 27 Nov 13, 2022
Study project that uses Apache Kafka as syncing mechanism between two databases, with producers and consumers written in Go.

Kafka DB Sync Study project that uses Apache Kafka as syncing mechanisms between a monolith DB and a microservice. The main purpose of this project is

Diego Hordi 0 Dec 5, 2021
A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Apache Kafka in 6 minutes A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In thi

bagher sohrabi 2 Oct 27, 2021
Testing Apache Kafka using Go.

Apache Kafka Go Testing Apache Kafka using Go. Instructions Provision the single node Kafka cluster using Docker: docker-compose -p apache-kafka-go up

Gabriel Kim 0 Dec 17, 2021
provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

provider-kafka provider-kafka is a Crossplane Provider that is used to manage Kafka resources. Usage Create a provider secret containing a json like t

Crossplane Contrib 18 Oct 29, 2022
A CLI tool for interacting with Kafka through the Confluent Kafka Rest Proxy

kafkactl Table of contents kafkactl Table of contents Overview Build Development Overview kafkactl is a CLI tool to interact with Kafka through the Co

Alexandre Barone 0 Nov 1, 2021
Apache Pulsar Go Client Library

Apache Pulsar Go Client Library A Go client library for the Apache Pulsar project. Goal This projects is developing a pure-Go client library for Pulsa

The Apache Software Foundation 525 Jan 4, 2023
Go client libraries for Kafka

kafka-go Motivations We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Go client libraries for Kafka at the time of this

none 0 Jan 10, 2022
Experiments using Go 1.18beta1's Generic typings and the Segmentio kafka-go consumer client

Experiments using Go 1.18beta1's Generic typings and the Segmentio kafka-go consumer client

Derek Chau 0 Jan 28, 2022
Kafka implemented in Golang with built-in coordination (No ZooKeeper, single binary install, Cloud Native)

Jocko Distributed commit log service in Go that is wire compatible with Kafka. Created by @travisjeffery, continued by nash. Goals: Protocol compatibl

Nash.io 104 Aug 9, 2021
GoLang + Kafka example project

Golang Kafka Example Sample Golang Kafka Consumer and Producer Setup Apache Kafka Quickstart Producer go run cmd/producer/main.go Consumer flags: brok

Thrimal Avishka 0 Nov 9, 2021
Example Golang Event-Driven with kafka Microservices Choreography

Microservices Choreography A demonstration for event sourcing using Go and Kafka example Microservices Choreography. To run this project: Install Go I

Muhammad Nasrul 0 Dec 2, 2021
Laboratório de Kafka com Golang - Full Cycle

APACHE KAFKA "O Apache Kafka é uma plataforma distribuída de streaming de eventos open-source que é utilizada por milhares de empresas para uma alta p

Ricardo Santana 0 Jan 27, 2022
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Obsidian Dynamics 59 Dec 8, 2022
ChanBroker, a Broker for goroutine, is simliar to kafka

Introduction chanbroker, a Broker for goroutine, is simliar to kafka In chanbroker has three types of goroutine: Producer Consumer(Subscriber) Broker

沉风 61 Aug 12, 2021