franz-go contains a high performance, pure Go library for interacting with Kafka from 0.8.0 through 2.7.0+. Producing, consuming, transacting, administrating, etc.

Overview

franz-go - Apache Kafka client written in Go

GoDev GitHub GitHub tag (latest SemVer) Discord Chat

Franz-go is an all-encompassing Apache Kafka client fully written Go. This library aims to provide every Kafka feature from Apache Kafka v0.8.0 onward. It has support for transactions, regex topic consuming, the latest partitioning strategies, data loss detection, closest replica fetching, and more. If a client KIP exists, this library aims to support it.

This library attempts to provide an intuitive API while interacting with Kafka the way Kafka expects (timeouts, etc.).

Features

  • Feature complete client (up to Kafka v2.7.0+)
  • Supported compression types: snappy, gzip, lz4 and zstd
  • SSL/TLS Support
  • Exactly once semantics / idempotent producing
  • Transactions support
  • All SASL mechanisms are supported (OAuthBearer, GSSAPI/Kerberos, SCRAM-SHA-256/512 and plain)
  • Supported Kafka versions >=0.8
  • Provides low level functionality (such as sending API requests) as well as high level functionality (e.g. consuming in groups)
  • Utilizes modern & idiomatic Go (support for contexts, variadic configuration options, ...)
  • Highly performant, see Performance (benchmarks will be added)
  • Written in pure Go (no wrapper lib for a C library or other bindings)
  • Ability to add detailed log messages or metrics using hooks

Getting started

Basic usage for producing and consuming Kafka messages looks like this:

seeds := []string{"localhost:9092"}
client, err := kgo.NewClient(kgo.SeedBrokers(seeds...))
if err != nil {
    panic(err)
}
defer client.Close()

ctx := context.Background()

// 1.) Producing a message
// All record production goes through Produce, and the callback can be used
// to allow for syncronous or asyncronous production.
var wg sync.WaitGroup
wg.Add(1)
record := &kgo.Record{Topic: "foo", Value: []byte("bar")}
err := client.Produce(ctx, record, func(_ *Record, err error) {
        defer wg.Done()
        if err != nil {
                fmt.Printf("record had a produce error: %v\n", err)
        }

}
if err != nil {
        panic("we are unable to produce if the context is canceled, we have hit max buffered," +
                "or if we are transactional and not in a transaction")
}
wg.Wait()

// 2.) Consuming messages from a topic
// Consuming can either be direct (no consumer group), or through a group. Below, we use a group.
// client.AssignGroup("my-group-identifier", kgo.GroupTopics("foo"))
for {
        fetches := client.PollFetches(ctx)
        iter := fetches.RecordIter()
        for !iter.Done() {
            record := iter.Next()
            fmt.Println(string(record.Value))
        }
}

Version Pinning

By default, the client issues an ApiVersions request on connect to brokers and defaults to using the maximum supported version for requests that each broker supports.

Kafka 0.10.0 introduced the ApiVersions request; if you are working with brokers older than that, you must use the kversions package. Use the MaxVersions option for the client if you do so.

As well, it is recommended to set the MaxVersions to the version of your broker cluster. Until KIP-584 is implemented, it is possible that if you do not pin a max version, this client will speak with some features to one broker while not to another when you are in the middle of a broker update roll.

Metrics

Using hooks you can attach to any events happening within franz-go. This allows you to use your favorite metric library and collect the metrics you are interested in.

Supported KIPs

Theoretically, this library supports every (non-Java-specific) client facing KIP. Most are tested, some need testing. Any KIP that simply adds or modifies a protocol is supported by code generation.

  • KIP-12 (sasl & ssl; 0.9.0)
  • KIP-13 (throttling; supported but not obeyed)
  • KIP-31 (relative offsets in message set; 0.10.0)
  • KIP-32 (timestamps in message set v1; 0.10.0)
  • KIP-35 (adds ApiVersion; 0.10.0)
  • KIP-36 (rack aware replica assignment; 0.10.0)
  • KIP-40 (ListGroups and DescribeGroup v0; 0.9.0)
  • KIP-43 (sasl enhancements & handshake; 0.10.0)
  • KIP-54 (sticky group assignment)
  • KIP-62 (join group rebalnce timeout, background thread heartbeats; v0.10.1)
  • KIP-74 (fetch response size limit; 0.10.1)
  • KIP-78 (cluster id in metadata; 0.10.1)
  • KIP-79 (list offset req/resp timestamp field; 0.10.1)
  • KIP-84 (sasl scram; 0.10.2)
  • KIP-98 (EOS; 0.11.0)
  • KIP-101 (offset for leader epoch introduced; broker usage yet; 0.11.0)
  • KIP-107 (delete records; 0.11.0)
  • KIP-108 (validate create topic; 0.10.2)
  • KIP-110 (zstd; 2.1.0)
  • KIP-112 (JBOD disk failure, protocol changes; 1.0.0)
  • KIP-113 (JBOD log dir movement, protocol additions; 1.0.0)
  • KIP-124 (request rate quotas; 0.11.0)
  • KIP-133 (describe & alter configs; 0.11.0)
  • KIP-152 (more sasl, introduce sasl authenticate; 1.0.0)
  • KIP-183 (elect preferred leaders; 2.2.0)
  • KIP-185 (idempotent is default; 1.0.0)
  • KIP-195 (create partitions request; 1.0.0)
  • KIP-207 (new error in list offset request; 2.2.0)
  • KIP-219 (throttling happens after response; 2.0.0)
  • KIP-226 (describe configs v1; 1.1.0)
  • KIP-227 (incremental fetch requests; 1.1.0)
  • KIP-229 (delete groups request; 1.1.0)
  • KIP-255 (oauth via sasl/oauthbearer; 2.0.0)
  • KIP-279 (leader / follower failover; changed offsets for leader epoch; 2.0.0)
  • KIP-320 (fetcher log truncation detection; 2.1.0)
  • KIP-322 (new error when delete topics is disabled; 2.1.0)
  • KIP-339 (incremental alter configs; 2.3.0)
  • KIP-341 (sticky group bug fix)
  • KIP-342 (oauth extensions; 2.1.0)
  • KIP-345 (static group membership, see KAFKA-8224)
  • KIP-360 (safe epoch bumping for UNKNOWN_PRODUCER_ID; 2.5.0)
  • KIP-368 (periodically reauth sasl; 2.2.0)
  • KIP-369 (always round robin produce partitioner; 2.4.0)
  • KIP-380 (inter-broker command changes; 2.2.0)
  • KIP-392 (fetch request from closest replica w/ rack; 2.2.0)
  • KIP-394 (require member.id for initial join; 2.2.0)
  • KIP-412 (dynamic log levels with incremental alter configs; 2.4.0)
  • KIP-429 (incremental rebalance, see KAFKA-8179; 2.4.0)
  • KIP-430 (include authorized ops in describe groups; 2.3.0)
  • KIP-447 (transaction changes to better support group changes; 2.5.0)
  • KIP-455 (admin replica reassignment; 2.4.0)
  • KIP-460 (admin leader election; 2.4.0)
  • KIP-464 (defaults for create topic; 2.4.0)
  • KIP-467 (produce response error change for per-record errors; 2.4.0)
  • KIP-480 (sticky partition producing; 2.4.0)
  • KIP-482 (tagged fields; KAFKA-8885; 2.4.0)
  • KIP-496 (offset delete admin command; 2.4.0)
  • KIP-497 (new API to alter ISR; 2.7.0)
  • KIP-498 (add max bound on reads; unimplemented in Kafka)
  • KIP-511 (add client name / version in apiversions req; 2.4.0)
  • KIP-518 (list groups by state; 2.6.0)
  • KIP-525 (create topics v5 returns configs; 2.4.0)
  • KIP-526 (reduce metadata lookups; done minus part 2, which we wont do)
  • KIP-546 (client quota APIs; 2.5.0)
  • KIP-554 (broker side SCRAM API; 2.7.0)
  • KIP-559 (protocol info in sync / join; 2.5.0)
  • KIP-569 (doc/type in describe configs; 2.6.0)
  • KIP-570 (leader epoch in stop replica; 2.6.0)
  • KIP-580 (exponential backoff; 2.6.0)
  • KIP-588 (producer recovery from txn timeout; 2.7.0)
  • KIP-590 (support for forwarding admin requests; 2.7.0)
  • KIP-595 (new APIs for raft protocol; 2.7.0)
  • KIP-599 (throttle create/delete topic/partition; 2.7.0)
  • KIP-700 (describe cluster; 2.8.0)
Issues
  • [Question] Recovering from INVALID_PRODUCER_EPOCH

    [Question] Recovering from INVALID_PRODUCER_EPOCH

    Hey @twmb, over the course of testing a transactional producer I've written I've encountered this error from EndTransaction after a rebalance:

    INVALID_PRODUCER_EPOCH: Producer attempted an operation with an old epoch.
    

    For a bit of context, this is from a consumer (currently Sarama, will be franz-go soon) that does the following:

    1. client.BeginTransaction
    2. Consumes a record from a topic
    3. Writes a bunch of records to a different topic (client.Send)
    4. If all goes well, client.EndTransaction(ctx, kgo.TryCommit)
    5. If something goes wrong in step 3 or 4, there's a client.AbortBufferedRecords followed by a client.EndTransaction(ctx, kgo.TryAbort)

    We're running multiple instances of these consumers, each reading from one or more partitions on the input topic and writing (randomly) to partitions on the output topic. These tasks can be relatively long running and rebalances may occur during any of those steps due to deploys, scaling, kafka operations, etc.

    What happens after a rebalance is they start throwing those kerr.InvalidProducerEpoch errors. A little digging suggests this is due to franz-go's handling of KIP-588 and that seems to make sense, but what isn't clear to me is how to recover from this situation. Right now, the producers just spin forever on that error - able to begin a transaction but not write records or end the transaction (though aborting still works).

    I remembered this doc from config.TransactionalID, but I'm not sure it applies to me since I'm not using a franz-go consumer.

    // Note that, unless using Kafka 2.5.0, a consumer group rebalance may be // problematic. Production should finish and be committed before the client // rejoins the group. It may be safer to use an eager group balancer and just // abort the transaction. Alternatively, any time a partition is revoked, you // could abort the transaction and reset offsets being consumed.

    So with that in mind and with the bold assumption that I'm doing this correctly in the first place, what is the proper way to recover from this situation with the franz-go API?

    opened by dcrodman 31
  • panic: runtime error: invalid memory address or nil pointer dereference

    panic: runtime error: invalid memory address or nil pointer dereference

    Client version v0.6.9 Kafka Version: kafka 2.6 Connection Auth: MTLS connection Auto commit disabled

    Our Kafka dev cluster can be under high load, so no idea if its related

    panic: runtime error: invalid memory address or nil pointer dereference
    [signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x7d381f]
    
    goroutine 10119 [running]:
    github.com/twmb/franz-go/pkg/kgo.(*topicPartitions).load(...)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/topics_and_partitions.go:73
    github.com/twmb/franz-go/pkg/kgo.(*consumer).assignPartitions(0xc0007326a0, 0xc0030022d0, 0xc00084d500)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer.go:357 +0x47f
    github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).fetchOffsets(0xc00084d4a0, 0xfc34a8, 0xc002bcce40, 0xc0024914a0, 0x0, 0x0)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:1201 +0x4d7
    github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat.func3(0xc002bd7080, 0xc002554060, 0xc00084d4a0, 0xc0024914a0, 0xfc34a8, 0xc002bcce40)
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:750 +0x145
    created by github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat
            /go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:746 +0x387
    
    opened by owenhaynes 26
  • Possible record loss on unclean kill of consumer when using automatic offset commit

    Possible record loss on unclean kill of consumer when using automatic offset commit

    Because the offset commit and consumption are done independently it is possible to have record loss on the consumer side if the consumer process has a kill -9 applied to it mid loop. You can reproduce this by producing 1000 records, then run up a consumer and sleep at the 500 record mark. Then issue a kill -9 on the consumer process. When the consumer starts back up the full 1000 records sometimes don't all get consumed. This is presumably because the offsets for these records have already been committed. When you run the Go confluent lib this does not happen because the offset commit occurs inside the Poll call itself, so there is no message loss.

    opened by finncolman 22
  • Getting

    Getting "invalid short sasl lifetime millis" periodically with AWS MSK using SASL authentication

    I'm getting "invalid short sasl lifetime millis" periodically (after a few hours) with AWS MSK using SASL authentication.

    Looking at the code, it errors out when the lifetimeMillis is less than 5 sec but would re-authenticate if the time is greater than 5 sec.

    The periodical re-authentication works most of the time because AWS auth token expires every 15min and I only see this issue after a few hours. When it happens, lifetimeMillis become less than 5s. Could this be a clock sync issue? Any idea how I can prevent this error from happening?

    opened by sharonx 21
  • Manual commit multiple partitions consumer

    Manual commit multiple partitions consumer

    Slightly modified version of https://github.com/twmb/franz-go/tree/master/examples/goroutine_per_partition_consuming

    Using manual commits.

    During testing of this i noticed that when adding new consumers I see errors from err := cl.CommitRecords(context.Background(), recs...) The errors where mainly

    error”:“ILLEGAL_GENERATION: Specified group generation id is not valid.”
    

    But also included

    REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.”
    

    There did not seem to be a clear pattern in the errors

    opened by JacobSMoller 18
  • error=

    error="unknown attributes on uncompressed message 8"

    We are piloting this project out and are running in to this error when trying to consume from one of our brokers:

    unknown attributes on uncompressed message 8
    

    The error is being returned from fetches.Errors()

    This particular broker cluster has the following settings:

    version=2.2.1
    inter.broker.protocol.version: "2.2"
    log.message.format.version: "0.10.1"
    

    We do not see the same issue on another broker cluster with the following settings:

    version: 2.7.1
    inter.broker.protocol.version: 2.7.1
    log.message.format.version: "2.7"
    

    I've tried passing different versions to the client via kgo.MaxVersions to no avail

    Is there some other option that I should be setting here or is there any way to get more debug information about what is happening here?

    opened by kevinconaway 17
  • Direct Consumer Offset Management

    Direct Consumer Offset Management

    Most of our applications do not use consumer groups and are instead initialized with a static list of partitions to consume from. I'm having trouble adapting our current usage to this library

    How can "direct" consumers (those using kgo.ConsumePartitions) commit offsets? Currently in CommitOffsets, the function exits early if it detects that the consumer is not in "group" mode.

    Separately, it doesn't appear that there is an option for kgo.ConsumePartitions to resume from the last committed offset, is that accurate? I see that there is SetOffsets which could be used if there was a way to fetch the last committed offsets for a group ID / partition but I don't see one.

    opened by kevinconaway 15
  • Eventhub example / trouble committing

    Eventhub example / trouble committing

    Hey! Thanks for the great library.

    Do i have to consider specific things to work with EventHub? It seems, that my offset commits have no effect. I do commit, tried various methods (all from the group example). However, once my consumer re-connects, i start over again (offsets were apparently not correctly committed).

    Excerpt from my setup:

    	opts := []kgo.Opt{
    		kgo.SeedBrokers(consumerCfg.Brokers...),
    		kgo.ConsumerGroup(consumerCfg.ConsumerGroup),
    		kgo.ConsumeTopics(iothubTopic),
    		// SASL Options
    		kgo.SASL(plain.Auth{
    			User: "$ConnectionString",
    			Pass: connString,
    		}.AsMechanism()),
    
    		// Configure TLS. Uses SystemCertPool for RootCAs by default.
    		kgo.Dialer(tlsDialer.DialContext),
    	}
    
    
    func consume(ctx context.Context, cl *kgo.Client, log *zap.Logger, processor *processor.Processor) error {
    	for {
    		fetches := cl.PollFetches(context.Background())
    		if fetches.IsClientClosed() {
    			return nil
    		}
    		fetches.EachError(func(t string, p int32, err error) {
    			log.Error("Error on partition", zap.String("topic", t), zap.Int32("partition", p), zap.Error(err))
    		})
    
    		var rs []*kgo.Record
    
    		var seen int
    		fetches.EachRecord(func(record *kgo.Record) {
    			processor.Process(context.Background(), record)
    			seen++
    			rs = append(rs, record)
    		})
    		if err := cl.CommitRecords(context.Background(), rs...); err != nil {
    			fmt.Printf("commit records failed: %v", err)
    			continue
    		}
    
    		// TODO pass to handler
    
    		//if err := cl.CommitUncommittedOffsets(context.Background()); err != nil {
    		//log.Error("Failed to commit offsets", zap.Error(err))
    		//		continue
    		//	}
    		log.Info("Processed records offsets", zap.Int("n", seen))
    	}
    }
    
    
    opened by birdayz 15
  • strange profile data

    strange profile data

    487.41MB 32.71% 32.71%   487.41MB 32.71%  github.com/unistack-org/micro-broker-kgo/v3.(*subscriber).handleFetches.func2
      462.56MB 31.04% 63.75%   462.56MB 31.04%  github.com/twmb/franz-go/pkg/kgo.recordToRecord
      294.14MB 19.74% 83.48%   294.14MB 19.74%  github.com/twmb/franz-go/pkg/kgo.readRawRecords
      125.83MB  8.44% 91.93%   125.83MB  8.44%  github.com/twmb/franz-go/pkg/kgo.(*cursorOffsetNext).maybeKeepRecord (inline)
       59.51MB  3.99% 95.92%    59.51MB  3.99%  github.com/klauspost/compress/s2.Decode
       32.28MB  2.17% 98.09%    32.28MB  2.17%  github.com/unistack-org/micro-broker-kgo/v3.(*subscriber).handleFetches
       21.48MB  1.44% 99.53%    21.48MB  1.44%  github.com/twmb/franz-go/pkg/kgo.(*brokerCxn).readConn.func2
             0     0% 99.53%   942.04MB 63.21%  github.com/twmb/franz-go/pkg/kgo.(*cursorOffsetNext).processRecordBatch
             0     0% 99.53%   942.04MB 63.21%  github.com/twmb/franz-go/pkg/kgo.(*cursorOffsetNext).processRespPartition
             0     0% 99.53%    59.51MB  3.99%  github.com/twmb/franz-go/pkg/kgo.(*decompressor).decompress
             0     0% 99.53%   942.04MB 63.21%  github.com/twmb/franz-go/pkg/kgo.(*source).fetch.func3
             0     0% 99.53%   942.04MB 63.21%  github.com/twmb/franz-go/pkg/kgo.(*source).handleReqResp
             0     0% 99.53%    32.28MB  2.17%  github.com/unistack-org/micro-broker-kgo/v3.(*subscriber).run
             0     0% 99.53%   487.91MB 32.74%  golang.org/x/sync/errgroup.(*Group).Go.func1
    
    

    i'm not using any compression and pass NoCompresion

    opened by vtolstov 15
  • not all messages consumed from topic with may partitions

    not all messages consumed from topic with may partitions

    i have test topic with 33 partitions, publish to topic 600 000 messages and try to consume it via PollFetches func after consuming messages and blocking on PollFetches i'm check lag in partitions and get

    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                               HOST            CLIENT-ID
    test            test            14         272719          272719          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            6          95404           95404           0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            17         330429          330429          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            20         231589          231591          2               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            29         252090          252090          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            23         173664          173664          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            24         212880          212880          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            11         173948          173948          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            10         84643           84643           0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            28         119807          119807          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            19         239245          239245          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            0          246418          246419          1               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            7          109678          109678          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            18         477203          477218          15              test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            22         146831          146831          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            25         223192          223192          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            5          230747          230748          1               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            8          345563          345564          1               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            1          291728          291728          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            26         280354          280895          541             test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            27         387977          387979          2               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            13         369057          369057          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            4          227867          227867          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            30         331319          331319          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            16         142291          142291          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            9          345443          345444          1               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            3          276085          276085          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            21         328815          328815          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            12         233569          233569          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            32         160723          160728          5               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            15         114914          114914          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            2          192850          192853          3               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    test            test            31         187689          187689          0               test-2ae9870c-08b0-40dd-96ee-ae3cd7390814 /172.18.0.1     test
    
    opened by vtolstov 14
  • errNotInTransaction is not a public error

    errNotInTransaction is not a public error

    Hello -

    I'm curious why errNotInTransaction is not a public error. It is returned to the calling client in cases when a transaction has not been opened, and it would be nice to be able to check explicitly for this error type.

    opened by Zach-Johnson 13
  • Build failed when using go 1.15 and franz-go 1.6.0

    Build failed when using go 1.15 and franz-go 1.6.0

    github.com/twmb/franz-go/pkg/kgo

    /root/go/pkg/mod/github.com/twmb/[email protected]/pkg/kgo/record_formatter.go:1588:13: undefined: io.ReadAll

    io.ReadAll not available in go 1.15

    opened by ly931003 2
  • Finer grained transaction control needed - Feature request

    Finer grained transaction control needed - Feature request

    Hi there. Love the project and am considering using it. However, the GroupTransactSession semantics seem to be a bit limiting for concurrent processing of partitions. I have a consumer application that interacts with external systems, and it seems there is no good way to exclude a partition from a group transaction.

    in the eos example given:

    		if err := sess.Begin(); err != nil {
    			// Similar to above, we only encounter errors here if
    			// we are not transactional or are already in a
    			// transaction. We should not hit this error.
    			die("unable to start transaction: %v", err)
    		}
    
    		e := kgo.AbortingFirstErrPromise(sess.Client())
    		fetches.EachRecord(func(r *kgo.Record) {
    			sess.Produce(ctx, kgo.StringRecord("eos "+string(r.Value)), e.Promise())
    		})
    		committed, err := sess.End(ctx, e.Err() == nil)
    		
    

    I could throw each record into a go routine for it's partitions and then wait for all of them to finish before consuming the next set of records, but if one partition is running slow, the entire batch returned in fetches will block. This seems like both a throughput and latency issue for my use case.

    The shorthand of what I would like to be able to do is something like this:

    fetches := client.PollFetches(ctx)
    
    fetches.EachPartition(func (p) {
       go func () {
           producerClient := transactionalProducerPool.Borrow()
           defer transactionalProducerPool.Release(producerClient)
           producerClient.BeginTransaction()
           process(p.Records)
           producerClient.Flush()
           producerClient.CommitOffsetsForTransaction(getHighestOffsets(p.Records))
           producerClient.EndTransaction()
       }()
    })
    

    The CommitOffsetsForTransaction method existed at one point as it is referenced in the documentation for EndTransaction. It seems that this method was made package private. Is there any way to export this method again (or some version of it)?

    opened by salsorrentino 4
Owner
Travis Bischel
Travis Bischel
franz-go - A complete Apache Kafka client written in Go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 2.8.0+. Producing, consuming, transacting, administrating, etc.

Travis Bischel 669 Jun 24, 2022
provider-kafka is a Crossplane Provider that is used to manage Kafka resources.

provider-kafka provider-kafka is a Crossplane Provider that is used to manage Kafka resources. Usage Create a provider secret containing a json like t

Crossplane Contrib 13 May 26, 2022
go broker interface,you can use kafka,redis,pulsar etc.

broker go broker interface,you can use kafka,redis,pulsar etc. pulsar in docker run pulsar in docker docker run -dit \ --name pulsar-sever \ -p 6650:

go-god 11 May 25, 2022
It's client library written in Golang for interacting with Linkedin Cruise Control using its HTTP API.

go-cruise-control It's client library (written in Golang) for interacting with Linkedin Cruise Control using its HTTP API. Supported Cruise Control ve

Banzai Cloud 1 Jan 10, 2022
A go library for interacting with Google Verified SMS

verifiedsms This is a go library for interacting with the Google Verified SMS service. You'll need to already be signed up as a Verified SMS Partner t

Monzo 3 Feb 8, 2022
Sarama is a Go library for Apache Kafka 0.8, and up.

sarama Sarama is an MIT-licensed Go client library for Apache Kafka version 0.8 (and later). Getting started API documentation and examples are availa

Shopify 8.7k Jun 30, 2022
Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Black Square Media 1k Jun 15, 2022
kafka watcher for casbin library

Casbin Kafka Watcher Casbin watcher for kafka This watcher library will enable users to dynamically change casbin policies through kakfa messages Infl

Aruna Prabashwara 3 May 8, 2021
Producer x Consumer example using Kafka library and Go.

Go - Kafka - Example Apache Kafka Commands First of all, run the docker-compose docker-compose up, than run docker exec -it kafka_kafka_1 bash Topics

Roberto Morel 0 Dec 8, 2021
nanoQ — high-performance brokerless Pub/Sub for streaming real-time data

nanoQ — high-performance brokerless Pub/Sub for streaming real-time data nanoQ is a very minimalistic (opinionated/limited) Pub/Sub transport library.

Aigent 148 Jun 7, 2022
High-Performance server for NATS, the cloud native messaging system.

NATS is a simple, secure and performant communications system for digital systems, services and devices. NATS is part of the Cloud Native Computing Fo

NATS - The Cloud Native Messaging System 11.1k Jun 28, 2022
Simple, high-performance event streaming broker

Styx Styx is a simple and high-performance event streaming broker. It aims to provide teams of all sizes with a simple to operate, disk-persisted publ

Dataptive 46 Apr 6, 2022
Confluent's Apache Kafka Golang client

Confluent's Golang Client for Apache KafkaTM confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform. Features: Hi

Confluent Inc. 3.4k Jun 26, 2022
Implementation of the NELI leader election protocol for Go and Kafka

goNELI Implementation of the NELI leader election protocol for Go and Kafka. goNELI encapsulates the 'fast' variation of the protocol, running in excl

Obsidian Dynamics 57 Apr 28, 2022
ChanBroker, a Broker for goroutine, is simliar to kafka

Introduction chanbroker, a Broker for goroutine, is simliar to kafka In chanbroker has three types of goroutine: Producer Consumer(Subscriber) Broker

沉风 61 Aug 12, 2021
Apache Kafka Web UI for exploring messages, consumers, configurations and more with a focus on a good UI & UX.

Kowl - Apache Kafka Web UI Kowl (previously known as Kafka Owl) is a web application that helps you to explore messages in your Apache Kafka cluster a

CloudHut 2.5k Jun 27, 2022
Modern CLI for Apache Kafka, written in Go.

Kaf Kafka CLI inspired by kubectl & docker Install Install from source: go get -u github.com/birdayz/kaf/cmd/kaf Install binary: curl https://raw.git

Johannes Brüderl 1.6k Jun 28, 2022
Easy to use distributed event bus similar to Kafka

chukcha Easy to use distributed event bus similar to Kafka. The event bus is designed to be used as a persistent intermediate storage buffer for any k

Yuriy Nasretdinov 71 Jun 8, 2022
Kafka implemented in Golang with built-in coordination (No ZooKeeper, single binary install, Cloud Native)

Jocko Distributed commit log service in Go that is wire compatible with Kafka. Created by @travisjeffery, continued by nash. Goals: Protocol compatibl

Nash.io 104 Aug 9, 2021