From 3719a5d78792f2c1612b181fa860af6473bb278a Mon Sep 17 00:00:00 2001 From: Gergely Munkacsy Date: Fri, 28 Jun 2019 15:40:44 +0200 Subject: [PATCH 1/4] feat: add SASL auth support --- consumerBuilder.go | 16 ++++++++++++---- consumerOptions.go | 18 ++++++++++++++++++ internal/consumer/options.go | 5 ++++- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/consumerBuilder.go b/consumerBuilder.go index d45031b..a5dd61a 100644 --- a/consumerBuilder.go +++ b/consumerBuilder.go @@ -24,7 +24,7 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/bsm/sarama-cluster" + cluster "github.com/bsm/sarama-cluster" "github.com/uber-go/kafka-client/internal/consumer" "github.com/uber-go/kafka-client/kafka" "github.com/uber-go/tally" @@ -76,9 +76,9 @@ func newConsumerBuilder( clusterSaramaClientMap: make(map[consumer.ClusterGroup]sarama.Client), clusterSaramaConsumerMap: make(map[consumer.ClusterGroup]consumer.SaramaConsumer), clusterTopicSaramaProducerMap: make(map[string]map[string]sarama.AsyncProducer), - msgCh: make(chan kafka.Message, consumerOptions.RcvBufferSize), - logger: logger.With(zap.String("consumergroup", config.GroupName)), - scope: scope.Tagged(map[string]string{"consumergroup": config.GroupName}), + msgCh: make(chan kafka.Message, consumerOptions.RcvBufferSize), + logger: logger.With(zap.String("consumergroup", config.GroupName)), + scope: scope.Tagged(map[string]string{"consumergroup": config.GroupName}), constructors: consumer.Constructors{ NewSaramaConsumer: consumer.NewSaramaConsumer, NewSaramaProducer: consumer.NewSaramaProducer, @@ -358,6 +358,14 @@ func buildSaramaConfig(options *consumer.Options) *cluster.Config { config.Consumer.Offsets.CommitInterval = options.OffsetCommitInterval config.Consumer.Offsets.Initial = options.OffsetPolicy config.Consumer.MaxProcessingTime = options.MaxProcessingTime + + if options.SASLEnabled { + config.Config.Net.SASL.Enable = true + config.Config.Net.SASL.Mechanism = sarama.SASLTypePlaintext + config.Config.Net.SASL.User = options.SASLUsername + config.Config.Net.SASL.Password = options.SASLPassword + + } return config } diff --git a/consumerOptions.go b/consumerOptions.go index 9b796b5..62c1c36 100644 --- a/consumerOptions.go +++ b/consumerOptions.go @@ -42,8 +42,26 @@ type ( clientIDOptions struct { clientID string } + + SASLOptions struct { + username string + password string + } ) +func WithSASLAuth(username, password string) ConsumerOption { + return &SASLOptions{ + username: username, + password: password, + } +} + +func (o *SASLOptions) apply(opts *consumer.Options) { + opts.SASLEnabled = true + opts.SASLUsername = o.username + opts.SASLPassword = o.password +} + // WithDLQTopics creates a range consumer for the specified consumer DLQ topics. func WithDLQTopics(topicList kafka.ConsumerTopicList) ConsumerOption { return &dlqTopicsOptions{ diff --git a/internal/consumer/options.go b/internal/consumer/options.go index 2fe0f48..2bb8f42 100644 --- a/internal/consumer/options.go +++ b/internal/consumer/options.go @@ -25,7 +25,7 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/bsm/sarama-cluster" + cluster "github.com/bsm/sarama-cluster" ) type ( @@ -44,6 +44,9 @@ type ( FetchDefaultBytes int32 OtherConsumerTopics []Topic TLSConfig *tls.Config // TLSConfig is the configuration to use for secure connections, not nil -> enable, nil -> disabled + SASLUsername string + SASLPassword string + SASLEnabled bool } ) From afbf71e6a1e94ac53744570c1a563b2c0a44f275 Mon Sep 17 00:00:00 2001 From: Gergely Munkacsy Date: Fri, 28 Jun 2019 17:09:44 +0200 Subject: [PATCH 2/4] fix: rename org name --- .travis.yml | 2 +- README.md | 10 +++++----- client.go | 2 +- client_test.go | 4 ++-- consumerBuilder.go | 4 ++-- consumerBuilder_test.go | 4 ++-- consumerOptions.go | 4 ++-- consumerOptions_test.go | 4 ++-- glide.yaml | 2 +- internal/consumer/ackMgr.go | 4 ++-- internal/consumer/clusterConsumer.go | 6 +++--- internal/consumer/clusterConsumer_test.go | 4 ++-- internal/consumer/dlq.go | 6 +++--- internal/consumer/dlq_test.go | 2 +- internal/consumer/message_test.go | 2 +- internal/consumer/mocks_test.go | 4 ++-- internal/consumer/multiClusterConsumer_test.go | 2 +- internal/consumer/multiclusterConsumer.go | 6 +++--- internal/consumer/partitionConsumer.go | 8 ++++---- internal/consumer/partitionConsumer_test.go | 2 +- internal/consumer/topicConsumer.go | 4 ++-- internal/consumer/types.go | 4 ++-- 22 files changed, 45 insertions(+), 45 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6f57037..ab107ab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,7 @@ language: go sudo: false go: - 1.11 -go_import_path: github.com/uber-go/kafka-client +go_import_path: github.com/gig/kafka-client cache: directories: - vendor diff --git a/README.md b/README.md index 66fafcf..f3b3c77 100644 --- a/README.md +++ b/README.md @@ -12,11 +12,11 @@ A high level Go client library for Apache Kafka that provides the following prim This library is in alpha. APIs are subject to change, use at your own risk ## Contributing -If you are interested in contributing, please sign the [License Agreement](https://cla-assistant.io/uber-go/kafka-client) and see our [development guide](https://github.com/uber-go/kafka-client/blob/master/docs/DEVELOPMENT-GUIDE.md) +If you are interested in contributing, please sign the [License Agreement](https://cla-assistant.io/uber-go/kafka-client) and see our [development guide](https://github.com/gig/kafka-client/blob/master/docs/DEVELOPMENT-GUIDE.md) ## Installation -`go get -u github.com/uber-go/kafka-client` +`go get -u github.com/gig/kafka-client` ## Quick Start @@ -27,8 +27,8 @@ import ( "os" "os/signal" - "github.com/uber-go/kafka-client" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -100,7 +100,7 @@ func main() { ``` [mit-img]: http://img.shields.io/badge/License-MIT-blue.svg -[mit]: https://github.com/uber-go/kafka-client/blob/master/LICENSE +[mit]: https://github.com/gig/kafka-client/blob/master/LICENSE [ci-img]: https://img.shields.io/travis/uber-go/kafka-client/master.svg [ci]: https://travis-ci.org/uber-go/kafka-client/branches diff --git a/client.go b/client.go index 1d602ac..8eeb15b 100644 --- a/client.go +++ b/client.go @@ -23,7 +23,7 @@ package kafkaclient import ( "os" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/client_test.go b/client_test.go index 061c7a8..9b4111f 100644 --- a/client_test.go +++ b/client_test.go @@ -27,8 +27,8 @@ import ( "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" "github.com/stretchr/testify/assert" - "github.com/uber-go/kafka-client/internal/consumer" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/consumer" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/consumerBuilder.go b/consumerBuilder.go index a5dd61a..816012d 100644 --- a/consumerBuilder.go +++ b/consumerBuilder.go @@ -25,8 +25,8 @@ import ( "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" - "github.com/uber-go/kafka-client/internal/consumer" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/consumer" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/consumerBuilder_test.go b/consumerBuilder_test.go index bd66a1d..7447251 100644 --- a/consumerBuilder_test.go +++ b/consumerBuilder_test.go @@ -28,8 +28,8 @@ import ( "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "github.com/stretchr/testify/suite" - "github.com/uber-go/kafka-client/internal/consumer" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/consumer" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/consumerOptions.go b/consumerOptions.go index 62c1c36..5899702 100644 --- a/consumerOptions.go +++ b/consumerOptions.go @@ -21,8 +21,8 @@ package kafkaclient import ( - "github.com/uber-go/kafka-client/internal/consumer" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/consumer" + "github.com/gig/kafka-client/kafka" ) type ( diff --git a/consumerOptions_test.go b/consumerOptions_test.go index bb6cacd..6068af8 100644 --- a/consumerOptions_test.go +++ b/consumerOptions_test.go @@ -24,8 +24,8 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/uber-go/kafka-client/internal/consumer" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/consumer" + "github.com/gig/kafka-client/kafka" ) func TestDLQConsumerOptions(t *testing.T) { diff --git a/glide.yaml b/glide.yaml index 1032e26..7ba944e 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,4 +1,4 @@ -package: github.com/uber-go/kafka-client +package: github.com/gig/kafka-client license: MIT import: - package: go.uber.org/zap diff --git a/internal/consumer/ackMgr.go b/internal/consumer/ackMgr.go index 42a1ba0..d5dbdab 100644 --- a/internal/consumer/ackMgr.go +++ b/internal/consumer/ackMgr.go @@ -25,8 +25,8 @@ import ( "sync" "time" - "github.com/uber-go/kafka-client/internal/list" - "github.com/uber-go/kafka-client/internal/metrics" + "github.com/gig/kafka-client/internal/list" + "github.com/gig/kafka-client/internal/metrics" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/internal/consumer/clusterConsumer.go b/internal/consumer/clusterConsumer.go index 78af419..88b5656 100644 --- a/internal/consumer/clusterConsumer.go +++ b/internal/consumer/clusterConsumer.go @@ -28,9 +28,9 @@ import ( "time" "github.com/bsm/sarama-cluster" - "github.com/uber-go/kafka-client/internal/metrics" - "github.com/uber-go/kafka-client/internal/util" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/metrics" + "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" "go.uber.org/zap/zapcore" diff --git a/internal/consumer/clusterConsumer_test.go b/internal/consumer/clusterConsumer_test.go index 7bff592..e284168 100644 --- a/internal/consumer/clusterConsumer_test.go +++ b/internal/consumer/clusterConsumer_test.go @@ -28,8 +28,8 @@ import ( "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "github.com/stretchr/testify/suite" - "github.com/uber-go/kafka-client/internal/util" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/internal/consumer/dlq.go b/internal/consumer/dlq.go index 6614c0c..5061e93 100644 --- a/internal/consumer/dlq.go +++ b/internal/consumer/dlq.go @@ -26,9 +26,9 @@ import ( "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" - "github.com/uber-go/kafka-client/internal/metrics" - "github.com/uber-go/kafka-client/internal/util" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/metrics" + "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/internal/consumer/dlq_test.go b/internal/consumer/dlq_test.go index e0fb8f8..52e660e 100644 --- a/internal/consumer/dlq_test.go +++ b/internal/consumer/dlq_test.go @@ -27,7 +27,7 @@ import ( "time" "github.com/stretchr/testify/suite" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/internal/consumer/message_test.go b/internal/consumer/message_test.go index 489a4ef..dde2eb4 100644 --- a/internal/consumer/message_test.go +++ b/internal/consumer/message_test.go @@ -26,7 +26,7 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/suite" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/kafka" ) type MessageTestSuite struct { diff --git a/internal/consumer/mocks_test.go b/internal/consumer/mocks_test.go index 2d8653c..fc45361 100644 --- a/internal/consumer/mocks_test.go +++ b/internal/consumer/mocks_test.go @@ -28,8 +28,8 @@ import ( "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" - "github.com/uber-go/kafka-client/internal/util" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/kafka" "go.uber.org/zap/zapcore" ) diff --git a/internal/consumer/multiClusterConsumer_test.go b/internal/consumer/multiClusterConsumer_test.go index f6128b8..2e28c79 100644 --- a/internal/consumer/multiClusterConsumer_test.go +++ b/internal/consumer/multiClusterConsumer_test.go @@ -26,7 +26,7 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/suite" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/internal/consumer/multiclusterConsumer.go b/internal/consumer/multiclusterConsumer.go index 5650035..9e68b51 100644 --- a/internal/consumer/multiclusterConsumer.go +++ b/internal/consumer/multiclusterConsumer.go @@ -23,9 +23,9 @@ package consumer import ( "errors" "github.com/Shopify/sarama" - "github.com/uber-go/kafka-client/internal/metrics" - "github.com/uber-go/kafka-client/internal/util" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/metrics" + "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/internal/consumer/partitionConsumer.go b/internal/consumer/partitionConsumer.go index 56c41b7..5988f35 100644 --- a/internal/consumer/partitionConsumer.go +++ b/internal/consumer/partitionConsumer.go @@ -29,10 +29,10 @@ import ( "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" - "github.com/uber-go/kafka-client/internal/list" - "github.com/uber-go/kafka-client/internal/metrics" - "github.com/uber-go/kafka-client/internal/util" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/list" + "github.com/gig/kafka-client/internal/metrics" + "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" "go.uber.org/zap/zapcore" diff --git a/internal/consumer/partitionConsumer_test.go b/internal/consumer/partitionConsumer_test.go index fa91ea9..00c4f49 100644 --- a/internal/consumer/partitionConsumer_test.go +++ b/internal/consumer/partitionConsumer_test.go @@ -26,7 +26,7 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/suite" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/internal/consumer/topicConsumer.go b/internal/consumer/topicConsumer.go index 8f8a79d..ef8a2bc 100644 --- a/internal/consumer/topicConsumer.go +++ b/internal/consumer/topicConsumer.go @@ -24,8 +24,8 @@ import ( "sync" "github.com/bsm/sarama-cluster" - "github.com/uber-go/kafka-client/internal/metrics" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/metrics" + "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/internal/consumer/types.go b/internal/consumer/types.go index 5167555..77ef7a1 100644 --- a/internal/consumer/types.go +++ b/internal/consumer/types.go @@ -26,8 +26,8 @@ import ( "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "github.com/golang/protobuf/proto" - "github.com/uber-go/kafka-client/internal/util" - "github.com/uber-go/kafka-client/kafka" + "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/kafka" "go.uber.org/zap/zapcore" ) From bd7d982e4a396baff487457a64a8d7ff8b4f52dc Mon Sep 17 00:00:00 2001 From: Ivaylo Ivanov Date: Fri, 28 Aug 2020 14:54:59 +0300 Subject: [PATCH 3/4] feat: sasl sha512 and errors channel for sarama cluster errors --- consumerBuilder.go | 48 ++++++++++++++++++++--- internal/consumer/clusterConsumer.go | 11 +++++- internal/consumer/multiclusterConsumer.go | 19 ++++++--- kafka/config.go | 3 +- kafka/interfaces.go | 2 + 5 files changed, 71 insertions(+), 12 deletions(-) diff --git a/consumerBuilder.go b/consumerBuilder.go index 816012d..3187e5c 100644 --- a/consumerBuilder.go +++ b/consumerBuilder.go @@ -21,6 +21,8 @@ package kafkaclient import ( + "crypto/sha512" + "hash" "time" "github.com/Shopify/sarama" @@ -28,9 +30,12 @@ import ( "github.com/gig/kafka-client/internal/consumer" "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" + "github.com/xdg/scram" "go.uber.org/zap" ) +var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } + type ( consumerBuilder struct { clusterTopicsMap map[consumerCluster][]consumer.Topic @@ -63,6 +68,12 @@ type ( } ) +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + func newConsumerBuilder( config *kafka.ConsumerConfig, resolver kafka.NameResolver, @@ -124,6 +135,7 @@ func (c *consumerBuilder) Build() (kafka.Consumer, error) { func (c *consumerBuilder) build() (*consumer.MultiClusterConsumer, error) { // build TopicList per cluster + errorsC := make(chan error) for _, consumerTopic := range c.kafkaConfig.TopicList { // first, add TopicConsumer for original topic if topic is well defined. // disabling offset commit only applies for the original topic. @@ -191,6 +203,7 @@ func (c *consumerBuilder) build() (*consumer.MultiClusterConsumer, error) { Group: cluster.groupName, }] = consumer.NewClusterConsumer( cluster.name, + errorsC, saramaConsumer, topicConsumerMap, c.scope, @@ -204,6 +217,7 @@ func (c *consumerBuilder) build() (*consumer.MultiClusterConsumer, error) { c.kafkaConfig.TopicList, clusterConsumerMap, c.clusterSaramaClientMap, + errorsC, c.msgCh, c.scope, c.logger, @@ -325,6 +339,9 @@ func buildOptions(config *kafka.ConsumerConfig, consumerOpts ...ConsumerOption) opts.OffsetPolicy = config.Offsets.Initial.Offset } + opts.TLSConfig = config.TLSConfig + opts.SASLEnabled = config.SASLEnabled + // Apply optional consumer parameters that may be passed in. for _, cOpt := range consumerOpts { cOpt.apply(opts) @@ -360,15 +377,36 @@ func buildSaramaConfig(options *consumer.Options) *cluster.Config { config.Consumer.MaxProcessingTime = options.MaxProcessingTime if options.SASLEnabled { - config.Config.Net.SASL.Enable = true - config.Config.Net.SASL.Mechanism = sarama.SASLTypePlaintext - config.Config.Net.SASL.User = options.SASLUsername - config.Config.Net.SASL.Password = options.SASLPassword - + config.Net.SASL.Enable = true + config.Net.SASL.User = options.SASLUsername + config.Net.SASL.Password = options.SASLPassword + config.Net.SASL.Handshake = true + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } + config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 + config.Net.TLS.Enable = true + config.Net.TLS.Config = options.TLSConfig } return config } +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} + func topicToRetryTopic(topic kafka.ConsumerTopic) kafka.ConsumerTopic { return kafka.ConsumerTopic{ Topic: topic.RetryQ, diff --git a/internal/consumer/clusterConsumer.go b/internal/consumer/clusterConsumer.go index 88b5656..9f83155 100644 --- a/internal/consumer/clusterConsumer.go +++ b/internal/consumer/clusterConsumer.go @@ -27,7 +27,7 @@ import ( "sync" "time" - "github.com/bsm/sarama-cluster" + cluster "github.com/bsm/sarama-cluster" "github.com/gig/kafka-client/internal/metrics" "github.com/gig/kafka-client/internal/util" "github.com/gig/kafka-client/kafka" @@ -52,6 +52,7 @@ type ( metricsTicker *time.Ticker stopC chan struct{} doneC chan struct{} + errorsC chan error } ClusterGroup struct { @@ -63,6 +64,7 @@ type ( // NewClusterConsumer returns a new single cluster consumer. func NewClusterConsumer( cluster string, + errorsC chan error, saramaConsumer SaramaConsumer, consumerMap map[string]*TopicConsumer, scope tally.Scope, @@ -78,6 +80,7 @@ func NewClusterConsumer( metricsTicker: time.NewTicker(metricsInterval), stopC: make(chan struct{}), doneC: make(chan struct{}), + errorsC: errorsC, } } @@ -124,6 +127,11 @@ func (c *ClusterConsumer) Closed() <-chan struct{} { return c.doneC } +// Errors returns errors from the sarama cluster +func (c *ClusterConsumer) Errors() <-chan error { + return c.errorsC +} + // eventLoop is the main event loop for this consumer func (c *ClusterConsumer) eventLoop() { var n *cluster.Notification @@ -141,6 +149,7 @@ func (c *ClusterConsumer) eventLoop() { case err, ok := <-c.consumer.Errors(): if ok { c.logger.Warn("cluster consumer error", zap.Error(err)) + c.errorsC <- err } case _, ok := <-c.metricsTicker.C: if ok && n != nil { diff --git a/internal/consumer/multiclusterConsumer.go b/internal/consumer/multiclusterConsumer.go index 9e68b51..710bc89 100644 --- a/internal/consumer/multiclusterConsumer.go +++ b/internal/consumer/multiclusterConsumer.go @@ -22,6 +22,7 @@ package consumer import ( "errors" + "github.com/Shopify/sarama" "github.com/gig/kafka-client/internal/metrics" "github.com/gig/kafka-client/internal/util" @@ -39,6 +40,7 @@ type ( clusterToSaramaClientMap map[ClusterGroup]sarama.Client msgC chan kafka.Message doneC chan struct{} + errorsC chan error scope tally.Scope logger *zap.Logger lifecycle *util.RunLifecycle @@ -52,6 +54,7 @@ func NewMultiClusterConsumer( topics kafka.ConsumerTopicList, clusterConsumerMap map[ClusterGroup]*ClusterConsumer, saramaClients map[ClusterGroup]sarama.Client, + errorsC chan error, msgC chan kafka.Message, scope tally.Scope, logger *zap.Logger, @@ -61,11 +64,12 @@ func NewMultiClusterConsumer( topics: topics, clusterConsumerMap: clusterConsumerMap, clusterToSaramaClientMap: saramaClients, - msgC: msgC, - doneC: make(chan struct{}), - scope: scope, - logger: logger, - lifecycle: util.NewRunLifecycle(groupName + "-consumer"), + msgC: msgC, + errorsC: errorsC, + doneC: make(chan struct{}), + scope: scope, + logger: logger, + lifecycle: util.NewRunLifecycle(groupName + "-consumer"), } } @@ -122,6 +126,11 @@ func (c *MultiClusterConsumer) Closed() <-chan struct{} { return c.doneC } +// Errors returns errors from the sarama cluster +func (c *MultiClusterConsumer) Errors() <-chan error { + return c.errorsC +} + // Messages returns a channel to receive messages on. func (c *MultiClusterConsumer) Messages() <-chan kafka.Message { return c.msgC diff --git a/kafka/config.go b/kafka/config.go index e1e47e3..d6e1e32 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -96,10 +96,11 @@ type ( // routines handler functions the library will run. Default is 1. Concurrency int - // TLSConfig is the configuration to use for secure connections if // enabled (not nil) (defaults to disabled, nil). TLSConfig *tls.Config + + SASLEnabled bool } ) diff --git a/kafka/interfaces.go b/kafka/interfaces.go index 2e0a31b..0943354 100644 --- a/kafka/interfaces.go +++ b/kafka/interfaces.go @@ -37,6 +37,8 @@ type ( Start() error // Stop stops the consumer Stop() + // Errors returns errors from the sarama cluster + Errors() <-chan error // Closed returns a channel which will be closed after this consumer is completely shutdown Closed() <-chan struct{} // Messages return the message channel for this consumer From 7c27afbcde9e9fb491106bb9b74096a47488cf91 Mon Sep 17 00:00:00 2001 From: Ivaylo Ivanov Date: Fri, 28 Aug 2020 17:29:49 +0300 Subject: [PATCH 4/4] internal/ -> lib/ --- Gopkg.lock | 6 +++--- Makefile | 4 ++-- client_test.go | 2 +- consumerBuilder.go | 2 +- consumerBuilder_test.go | 2 +- consumerOptions.go | 2 +- consumerOptions_test.go | 2 +- glide.lock | 6 +++--- {internal => lib}/backoff/retry.go | 0 {internal => lib}/backoff/retry_test.go | 0 {internal => lib}/backoff/retrypolicy.go | 0 {internal => lib}/backoff/retrypolicy_test.go | 0 {internal => lib}/consumer/ackMgr.go | 4 ++-- {internal => lib}/consumer/clusterConsumer.go | 4 ++-- {internal => lib}/consumer/clusterConsumer_test.go | 2 +- {internal => lib}/consumer/dlq.go | 4 ++-- {internal => lib}/consumer/dlqMetadata.pb.go | 0 {internal => lib}/consumer/dlqMetadata.proto | 0 {internal => lib}/consumer/dlq_test.go | 0 {internal => lib}/consumer/limit.go | 0 {internal => lib}/consumer/message.go | 0 {internal => lib}/consumer/message_test.go | 0 {internal => lib}/consumer/mocks_test.go | 2 +- {internal => lib}/consumer/multiClusterConsumer_test.go | 0 {internal => lib}/consumer/multiclusterConsumer.go | 4 ++-- {internal => lib}/consumer/options.go | 0 {internal => lib}/consumer/partitionConsumer.go | 6 +++--- {internal => lib}/consumer/partitionConsumer_test.go | 0 {internal => lib}/consumer/topicConsumer.go | 2 +- {internal => lib}/consumer/types.go | 2 +- {internal => lib}/consumer/types_test.go | 0 {internal => lib}/list/list.go | 0 {internal => lib}/list/list_test.go | 0 {internal => lib}/metrics/defs.go | 0 {internal => lib}/util/lifecycle.go | 0 {internal => lib}/util/lifecycle_test.go | 0 {internal => lib}/util/testutil.go | 0 37 files changed, 28 insertions(+), 28 deletions(-) rename {internal => lib}/backoff/retry.go (100%) rename {internal => lib}/backoff/retry_test.go (100%) rename {internal => lib}/backoff/retrypolicy.go (100%) rename {internal => lib}/backoff/retrypolicy_test.go (100%) rename {internal => lib}/consumer/ackMgr.go (98%) rename {internal => lib}/consumer/clusterConsumer.go (98%) rename {internal => lib}/consumer/clusterConsumer_test.go (99%) rename {internal => lib}/consumer/dlq.go (98%) rename {internal => lib}/consumer/dlqMetadata.pb.go (100%) rename {internal => lib}/consumer/dlqMetadata.proto (100%) rename {internal => lib}/consumer/dlq_test.go (100%) rename {internal => lib}/consumer/limit.go (100%) rename {internal => lib}/consumer/message.go (100%) rename {internal => lib}/consumer/message_test.go (100%) rename {internal => lib}/consumer/mocks_test.go (99%) rename {internal => lib}/consumer/multiClusterConsumer_test.go (100%) rename {internal => lib}/consumer/multiclusterConsumer.go (98%) rename {internal => lib}/consumer/options.go (100%) rename {internal => lib}/consumer/partitionConsumer.go (99%) rename {internal => lib}/consumer/partitionConsumer_test.go (100%) rename {internal => lib}/consumer/topicConsumer.go (98%) rename {internal => lib}/consumer/types.go (99%) rename {internal => lib}/consumer/types_test.go (100%) rename {internal => lib}/list/list.go (100%) rename {internal => lib}/list/list_test.go (100%) rename {internal => lib}/metrics/defs.go (100%) rename {internal => lib}/util/lifecycle.go (100%) rename {internal => lib}/util/lifecycle_test.go (100%) rename {internal => lib}/util/testutil.go (100%) diff --git a/Gopkg.lock b/Gopkg.lock index be52f6b..683eb30 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -142,9 +142,9 @@ packages = [ ".", "buffer", - "internal/bufferpool", - "internal/color", - "internal/exit", + "lib/bufferpool", + "lib/color", + "lib/exit", "zapcore", ] pruneopts = "UT" diff --git a/Makefile b/Makefile index 436de47..827d871 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,9 @@ export GO15VENDOREXPERIMENT=1 BENCH_FLAGS ?= -cpuprofile=cpu.pprof -memprofile=mem.pprof -benchmem -PKGS ?= ./cmd/... ./internal/... ./kafka/... . +PKGS ?= ./cmd/... ./lib/... ./kafka/... . # Many Go tools take file globs or directories as arguments instead of packages. -PKG_FILES ?= *.go kafka internal/consumer internal/backoff internal/list internal/metrics internal/util +PKG_FILES ?= *.go kafka lib/consumer lib/backoff lib/list lib/metrics lib/util # The linting tools evolve with each Go version, so run them only on the latest # stable release. diff --git a/client_test.go b/client_test.go index 9b4111f..d2c12f7 100644 --- a/client_test.go +++ b/client_test.go @@ -27,7 +27,7 @@ import ( "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" "github.com/stretchr/testify/assert" - "github.com/gig/kafka-client/internal/consumer" + "github.com/gig/kafka-client/lib/consumer" "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" diff --git a/consumerBuilder.go b/consumerBuilder.go index 3187e5c..67f4b8f 100644 --- a/consumerBuilder.go +++ b/consumerBuilder.go @@ -27,7 +27,7 @@ import ( "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" - "github.com/gig/kafka-client/internal/consumer" + "github.com/gig/kafka-client/lib/consumer" "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "github.com/xdg/scram" diff --git a/consumerBuilder_test.go b/consumerBuilder_test.go index 7447251..ecd4ee2 100644 --- a/consumerBuilder_test.go +++ b/consumerBuilder_test.go @@ -28,7 +28,7 @@ import ( "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "github.com/stretchr/testify/suite" - "github.com/gig/kafka-client/internal/consumer" + "github.com/gig/kafka-client/lib/consumer" "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" diff --git a/consumerOptions.go b/consumerOptions.go index 5899702..67ec330 100644 --- a/consumerOptions.go +++ b/consumerOptions.go @@ -21,7 +21,7 @@ package kafkaclient import ( - "github.com/gig/kafka-client/internal/consumer" + "github.com/gig/kafka-client/lib/consumer" "github.com/gig/kafka-client/kafka" ) diff --git a/consumerOptions_test.go b/consumerOptions_test.go index 6068af8..7c6ed7c 100644 --- a/consumerOptions_test.go +++ b/consumerOptions_test.go @@ -24,7 +24,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/gig/kafka-client/internal/consumer" + "github.com/gig/kafka-client/lib/consumer" "github.com/gig/kafka-client/kafka" ) diff --git a/glide.lock b/glide.lock index 4508903..9a156e7 100644 --- a/glide.lock +++ b/glide.lock @@ -43,9 +43,9 @@ imports: version: 35aad584952c3e7020db7b839f6b102de6271f89 subpackages: - buffer - - internal/bufferpool - - internal/color - - internal/exit + - lib/bufferpool + - lib/color + - lib/exit - zapcore testImports: - name: github.com/pmezard/go-difflib diff --git a/internal/backoff/retry.go b/lib/backoff/retry.go similarity index 100% rename from internal/backoff/retry.go rename to lib/backoff/retry.go diff --git a/internal/backoff/retry_test.go b/lib/backoff/retry_test.go similarity index 100% rename from internal/backoff/retry_test.go rename to lib/backoff/retry_test.go diff --git a/internal/backoff/retrypolicy.go b/lib/backoff/retrypolicy.go similarity index 100% rename from internal/backoff/retrypolicy.go rename to lib/backoff/retrypolicy.go diff --git a/internal/backoff/retrypolicy_test.go b/lib/backoff/retrypolicy_test.go similarity index 100% rename from internal/backoff/retrypolicy_test.go rename to lib/backoff/retrypolicy_test.go diff --git a/internal/consumer/ackMgr.go b/lib/consumer/ackMgr.go similarity index 98% rename from internal/consumer/ackMgr.go rename to lib/consumer/ackMgr.go index d5dbdab..719105c 100644 --- a/internal/consumer/ackMgr.go +++ b/lib/consumer/ackMgr.go @@ -25,8 +25,8 @@ import ( "sync" "time" - "github.com/gig/kafka-client/internal/list" - "github.com/gig/kafka-client/internal/metrics" + "github.com/gig/kafka-client/lib/list" + "github.com/gig/kafka-client/lib/metrics" "github.com/uber-go/tally" "go.uber.org/zap" ) diff --git a/internal/consumer/clusterConsumer.go b/lib/consumer/clusterConsumer.go similarity index 98% rename from internal/consumer/clusterConsumer.go rename to lib/consumer/clusterConsumer.go index 9f83155..77ed86b 100644 --- a/internal/consumer/clusterConsumer.go +++ b/lib/consumer/clusterConsumer.go @@ -28,8 +28,8 @@ import ( "time" cluster "github.com/bsm/sarama-cluster" - "github.com/gig/kafka-client/internal/metrics" - "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/lib/metrics" + "github.com/gig/kafka-client/lib/util" "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" diff --git a/internal/consumer/clusterConsumer_test.go b/lib/consumer/clusterConsumer_test.go similarity index 99% rename from internal/consumer/clusterConsumer_test.go rename to lib/consumer/clusterConsumer_test.go index e284168..48f7dbf 100644 --- a/internal/consumer/clusterConsumer_test.go +++ b/lib/consumer/clusterConsumer_test.go @@ -28,7 +28,7 @@ import ( "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "github.com/stretchr/testify/suite" - "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/lib/util" "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" diff --git a/internal/consumer/dlq.go b/lib/consumer/dlq.go similarity index 98% rename from internal/consumer/dlq.go rename to lib/consumer/dlq.go index 5061e93..2a97c49 100644 --- a/internal/consumer/dlq.go +++ b/lib/consumer/dlq.go @@ -26,8 +26,8 @@ import ( "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" - "github.com/gig/kafka-client/internal/metrics" - "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/lib/metrics" + "github.com/gig/kafka-client/lib/util" "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" diff --git a/internal/consumer/dlqMetadata.pb.go b/lib/consumer/dlqMetadata.pb.go similarity index 100% rename from internal/consumer/dlqMetadata.pb.go rename to lib/consumer/dlqMetadata.pb.go diff --git a/internal/consumer/dlqMetadata.proto b/lib/consumer/dlqMetadata.proto similarity index 100% rename from internal/consumer/dlqMetadata.proto rename to lib/consumer/dlqMetadata.proto diff --git a/internal/consumer/dlq_test.go b/lib/consumer/dlq_test.go similarity index 100% rename from internal/consumer/dlq_test.go rename to lib/consumer/dlq_test.go diff --git a/internal/consumer/limit.go b/lib/consumer/limit.go similarity index 100% rename from internal/consumer/limit.go rename to lib/consumer/limit.go diff --git a/internal/consumer/message.go b/lib/consumer/message.go similarity index 100% rename from internal/consumer/message.go rename to lib/consumer/message.go diff --git a/internal/consumer/message_test.go b/lib/consumer/message_test.go similarity index 100% rename from internal/consumer/message_test.go rename to lib/consumer/message_test.go diff --git a/internal/consumer/mocks_test.go b/lib/consumer/mocks_test.go similarity index 99% rename from internal/consumer/mocks_test.go rename to lib/consumer/mocks_test.go index fc45361..9366c8b 100644 --- a/internal/consumer/mocks_test.go +++ b/lib/consumer/mocks_test.go @@ -28,7 +28,7 @@ import ( "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" - "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/lib/util" "github.com/gig/kafka-client/kafka" "go.uber.org/zap/zapcore" ) diff --git a/internal/consumer/multiClusterConsumer_test.go b/lib/consumer/multiClusterConsumer_test.go similarity index 100% rename from internal/consumer/multiClusterConsumer_test.go rename to lib/consumer/multiClusterConsumer_test.go diff --git a/internal/consumer/multiclusterConsumer.go b/lib/consumer/multiclusterConsumer.go similarity index 98% rename from internal/consumer/multiclusterConsumer.go rename to lib/consumer/multiclusterConsumer.go index 710bc89..03fcdec 100644 --- a/internal/consumer/multiclusterConsumer.go +++ b/lib/consumer/multiclusterConsumer.go @@ -24,8 +24,8 @@ import ( "errors" "github.com/Shopify/sarama" - "github.com/gig/kafka-client/internal/metrics" - "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/lib/metrics" + "github.com/gig/kafka-client/lib/util" "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" diff --git a/internal/consumer/options.go b/lib/consumer/options.go similarity index 100% rename from internal/consumer/options.go rename to lib/consumer/options.go diff --git a/internal/consumer/partitionConsumer.go b/lib/consumer/partitionConsumer.go similarity index 99% rename from internal/consumer/partitionConsumer.go rename to lib/consumer/partitionConsumer.go index 5988f35..1d460be 100644 --- a/internal/consumer/partitionConsumer.go +++ b/lib/consumer/partitionConsumer.go @@ -29,9 +29,9 @@ import ( "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" - "github.com/gig/kafka-client/internal/list" - "github.com/gig/kafka-client/internal/metrics" - "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/lib/list" + "github.com/gig/kafka-client/lib/metrics" + "github.com/gig/kafka-client/lib/util" "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" diff --git a/internal/consumer/partitionConsumer_test.go b/lib/consumer/partitionConsumer_test.go similarity index 100% rename from internal/consumer/partitionConsumer_test.go rename to lib/consumer/partitionConsumer_test.go diff --git a/internal/consumer/topicConsumer.go b/lib/consumer/topicConsumer.go similarity index 98% rename from internal/consumer/topicConsumer.go rename to lib/consumer/topicConsumer.go index ef8a2bc..cc7a0fe 100644 --- a/internal/consumer/topicConsumer.go +++ b/lib/consumer/topicConsumer.go @@ -24,7 +24,7 @@ import ( "sync" "github.com/bsm/sarama-cluster" - "github.com/gig/kafka-client/internal/metrics" + "github.com/gig/kafka-client/lib/metrics" "github.com/gig/kafka-client/kafka" "github.com/uber-go/tally" "go.uber.org/zap" diff --git a/internal/consumer/types.go b/lib/consumer/types.go similarity index 99% rename from internal/consumer/types.go rename to lib/consumer/types.go index 77ef7a1..adaa877 100644 --- a/internal/consumer/types.go +++ b/lib/consumer/types.go @@ -26,7 +26,7 @@ import ( "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "github.com/golang/protobuf/proto" - "github.com/gig/kafka-client/internal/util" + "github.com/gig/kafka-client/lib/util" "github.com/gig/kafka-client/kafka" "go.uber.org/zap/zapcore" ) diff --git a/internal/consumer/types_test.go b/lib/consumer/types_test.go similarity index 100% rename from internal/consumer/types_test.go rename to lib/consumer/types_test.go diff --git a/internal/list/list.go b/lib/list/list.go similarity index 100% rename from internal/list/list.go rename to lib/list/list.go diff --git a/internal/list/list_test.go b/lib/list/list_test.go similarity index 100% rename from internal/list/list_test.go rename to lib/list/list_test.go diff --git a/internal/metrics/defs.go b/lib/metrics/defs.go similarity index 100% rename from internal/metrics/defs.go rename to lib/metrics/defs.go diff --git a/internal/util/lifecycle.go b/lib/util/lifecycle.go similarity index 100% rename from internal/util/lifecycle.go rename to lib/util/lifecycle.go diff --git a/internal/util/lifecycle_test.go b/lib/util/lifecycle_test.go similarity index 100% rename from internal/util/lifecycle_test.go rename to lib/util/lifecycle_test.go diff --git a/internal/util/testutil.go b/lib/util/testutil.go similarity index 100% rename from internal/util/testutil.go rename to lib/util/testutil.go