Skip to content
This repository was archived by the owner on Oct 2, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ language: go
sudo: false
go:
- 1.11
go_import_path: github.com/uber-go/kafka-client
go_import_path: github.com/gig/kafka-client
cache:
directories:
- vendor
Expand Down
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
export GO15VENDOREXPERIMENT=1

BENCH_FLAGS ?= -cpuprofile=cpu.pprof -memprofile=mem.pprof -benchmem
PKGS ?= ./cmd/... ./internal/... ./kafka/... .
PKGS ?= ./cmd/... ./lib/... ./kafka/... .
# Many Go tools take file globs or directories as arguments instead of packages.
PKG_FILES ?= *.go kafka internal/consumer internal/backoff internal/list internal/metrics internal/util
PKG_FILES ?= *.go kafka lib/consumer lib/backoff lib/list lib/metrics lib/util

# The linting tools evolve with each Go version, so run them only on the latest
# stable release.
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ A high level Go client library for Apache Kafka that provides the following prim
This library is in alpha. APIs are subject to change, use at your own risk

## Contributing
If you are interested in contributing, please sign the [License Agreement](https://cla-assistant.io/uber-go/kafka-client) and see our [development guide](https://github.com/uber-go/kafka-client/blob/master/docs/DEVELOPMENT-GUIDE.md)
If you are interested in contributing, please sign the [License Agreement](https://cla-assistant.io/uber-go/kafka-client) and see our [development guide](https://github.com/gig/kafka-client/blob/master/docs/DEVELOPMENT-GUIDE.md)

## Installation

`go get -u github.com/uber-go/kafka-client`
`go get -u github.com/gig/kafka-client`

## Quick Start

Expand All @@ -27,8 +27,8 @@ import (
"os"
"os/signal"

"github.com/uber-go/kafka-client"
"github.com/uber-go/kafka-client/kafka"
"github.com/gig/kafka-client"
"github.com/gig/kafka-client/kafka"
"github.com/uber-go/tally"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -100,7 +100,7 @@ func main() {
```

[mit-img]: http://img.shields.io/badge/License-MIT-blue.svg
[mit]: https://github.com/uber-go/kafka-client/blob/master/LICENSE
[mit]: https://github.com/gig/kafka-client/blob/master/LICENSE

[ci-img]: https://img.shields.io/travis/uber-go/kafka-client/master.svg
[ci]: https://travis-ci.org/uber-go/kafka-client/branches
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package kafkaclient
import (
"os"

"github.com/uber-go/kafka-client/kafka"
"github.com/gig/kafka-client/kafka"
"github.com/uber-go/tally"
"go.uber.org/zap"
)
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"github.com/stretchr/testify/assert"
"github.com/uber-go/kafka-client/internal/consumer"
"github.com/uber-go/kafka-client/kafka"
"github.com/gig/kafka-client/lib/consumer"
"github.com/gig/kafka-client/kafka"
"github.com/uber-go/tally"
"go.uber.org/zap"
)
Expand Down
58 changes: 52 additions & 6 deletions consumerBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@
package kafkaclient

import (
"crypto/sha512"
"hash"
"time"

"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/uber-go/kafka-client/internal/consumer"
"github.com/uber-go/kafka-client/kafka"
cluster "github.com/bsm/sarama-cluster"
"github.com/gig/kafka-client/lib/consumer"
"github.com/gig/kafka-client/kafka"
"github.com/uber-go/tally"
"github.com/xdg/scram"
"go.uber.org/zap"
)

var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

type (
consumerBuilder struct {
clusterTopicsMap map[consumerCluster][]consumer.Topic
Expand Down Expand Up @@ -63,6 +68,12 @@ type (
}
)

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func newConsumerBuilder(
config *kafka.ConsumerConfig,
resolver kafka.NameResolver,
Expand All @@ -76,9 +87,9 @@ func newConsumerBuilder(
clusterSaramaClientMap: make(map[consumer.ClusterGroup]sarama.Client),
clusterSaramaConsumerMap: make(map[consumer.ClusterGroup]consumer.SaramaConsumer),
clusterTopicSaramaProducerMap: make(map[string]map[string]sarama.AsyncProducer),
msgCh: make(chan kafka.Message, consumerOptions.RcvBufferSize),
logger: logger.With(zap.String("consumergroup", config.GroupName)),
scope: scope.Tagged(map[string]string{"consumergroup": config.GroupName}),
msgCh: make(chan kafka.Message, consumerOptions.RcvBufferSize),
logger: logger.With(zap.String("consumergroup", config.GroupName)),
scope: scope.Tagged(map[string]string{"consumergroup": config.GroupName}),
constructors: consumer.Constructors{
NewSaramaConsumer: consumer.NewSaramaConsumer,
NewSaramaProducer: consumer.NewSaramaProducer,
Expand Down Expand Up @@ -124,6 +135,7 @@ func (c *consumerBuilder) Build() (kafka.Consumer, error) {

func (c *consumerBuilder) build() (*consumer.MultiClusterConsumer, error) {
// build TopicList per cluster
errorsC := make(chan error)
for _, consumerTopic := range c.kafkaConfig.TopicList {
// first, add TopicConsumer for original topic if topic is well defined.
// disabling offset commit only applies for the original topic.
Expand Down Expand Up @@ -191,6 +203,7 @@ func (c *consumerBuilder) build() (*consumer.MultiClusterConsumer, error) {
Group: cluster.groupName,
}] = consumer.NewClusterConsumer(
cluster.name,
errorsC,
saramaConsumer,
topicConsumerMap,
c.scope,
Expand All @@ -204,6 +217,7 @@ func (c *consumerBuilder) build() (*consumer.MultiClusterConsumer, error) {
c.kafkaConfig.TopicList,
clusterConsumerMap,
c.clusterSaramaClientMap,
errorsC,
c.msgCh,
c.scope,
c.logger,
Expand Down Expand Up @@ -325,6 +339,9 @@ func buildOptions(config *kafka.ConsumerConfig, consumerOpts ...ConsumerOption)
opts.OffsetPolicy = config.Offsets.Initial.Offset
}

opts.TLSConfig = config.TLSConfig
opts.SASLEnabled = config.SASLEnabled

// Apply optional consumer parameters that may be passed in.
for _, cOpt := range consumerOpts {
cOpt.apply(opts)
Expand Down Expand Up @@ -358,9 +375,38 @@ func buildSaramaConfig(options *consumer.Options) *cluster.Config {
config.Consumer.Offsets.CommitInterval = options.OffsetCommitInterval
config.Consumer.Offsets.Initial = options.OffsetPolicy
config.Consumer.MaxProcessingTime = options.MaxProcessingTime

if options.SASLEnabled {
config.Net.SASL.Enable = true
config.Net.SASL.User = options.SASLUsername
config.Net.SASL.Password = options.SASLPassword
config.Net.SASL.Handshake = true
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
config.Net.TLS.Enable = true
config.Net.TLS.Config = options.TLSConfig
}
return config
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

func topicToRetryTopic(topic kafka.ConsumerTopic) kafka.ConsumerTopic {
return kafka.ConsumerTopic{
Topic: topic.RetryQ,
Expand Down
4 changes: 2 additions & 2 deletions consumerBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/stretchr/testify/suite"
"github.com/uber-go/kafka-client/internal/consumer"
"github.com/uber-go/kafka-client/kafka"
"github.com/gig/kafka-client/lib/consumer"
"github.com/gig/kafka-client/kafka"
"github.com/uber-go/tally"
"go.uber.org/zap"
)
Expand Down
22 changes: 20 additions & 2 deletions consumerOptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
package kafkaclient

import (
"github.com/uber-go/kafka-client/internal/consumer"
"github.com/uber-go/kafka-client/kafka"
"github.com/gig/kafka-client/lib/consumer"
"github.com/gig/kafka-client/kafka"
)

type (
Expand All @@ -42,8 +42,26 @@ type (
clientIDOptions struct {
clientID string
}

SASLOptions struct {
username string
password string
}
)

func WithSASLAuth(username, password string) ConsumerOption {
return &SASLOptions{
username: username,
password: password,
}
}

func (o *SASLOptions) apply(opts *consumer.Options) {
opts.SASLEnabled = true
opts.SASLUsername = o.username
opts.SASLPassword = o.password
}

// WithDLQTopics creates a range consumer for the specified consumer DLQ topics.
func WithDLQTopics(topicList kafka.ConsumerTopicList) ConsumerOption {
return &dlqTopicsOptions{
Expand Down
4 changes: 2 additions & 2 deletions consumerOptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/uber-go/kafka-client/internal/consumer"
"github.com/uber-go/kafka-client/kafka"
"github.com/gig/kafka-client/lib/consumer"
"github.com/gig/kafka-client/kafka"
)

func TestDLQConsumerOptions(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package: github.com/uber-go/kafka-client
package: github.com/gig/kafka-client
license: MIT
import:
- package: go.uber.org/zap
Expand Down
3 changes: 2 additions & 1 deletion kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ type (
// routines handler functions the library will run. Default is 1.
Concurrency int


// TLSConfig is the configuration to use for secure connections if
// enabled (not nil) (defaults to disabled, nil).
TLSConfig *tls.Config

SASLEnabled bool
}
)

Expand Down
2 changes: 2 additions & 0 deletions kafka/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type (
Start() error
// Stop stops the consumer
Stop()
// Errors returns errors from the sarama cluster
Errors() <-chan error
// Closed returns a channel which will be closed after this consumer is completely shutdown
Closed() <-chan struct{}
// Messages return the message channel for this consumer
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions internal/consumer/ackMgr.go → lib/consumer/ackMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"sync"
"time"

"github.com/uber-go/kafka-client/internal/list"
"github.com/uber-go/kafka-client/internal/metrics"
"github.com/gig/kafka-client/lib/list"
"github.com/gig/kafka-client/lib/metrics"
"github.com/uber-go/tally"
"go.uber.org/zap"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
"sync"
"time"

"github.com/bsm/sarama-cluster"
"github.com/uber-go/kafka-client/internal/metrics"
"github.com/uber-go/kafka-client/internal/util"
"github.com/uber-go/kafka-client/kafka"
cluster "github.com/bsm/sarama-cluster"
"github.com/gig/kafka-client/lib/metrics"
"github.com/gig/kafka-client/lib/util"
"github.com/gig/kafka-client/kafka"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -52,6 +52,7 @@ type (
metricsTicker *time.Ticker
stopC chan struct{}
doneC chan struct{}
errorsC chan error
}

ClusterGroup struct {
Expand All @@ -63,6 +64,7 @@ type (
// NewClusterConsumer returns a new single cluster consumer.
func NewClusterConsumer(
cluster string,
errorsC chan error,
saramaConsumer SaramaConsumer,
consumerMap map[string]*TopicConsumer,
scope tally.Scope,
Expand All @@ -78,6 +80,7 @@ func NewClusterConsumer(
metricsTicker: time.NewTicker(metricsInterval),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
errorsC: errorsC,
}
}

Expand Down Expand Up @@ -124,6 +127,11 @@ func (c *ClusterConsumer) Closed() <-chan struct{} {
return c.doneC
}

// Errors returns errors from the sarama cluster
func (c *ClusterConsumer) Errors() <-chan error {
return c.errorsC
}

// eventLoop is the main event loop for this consumer
func (c *ClusterConsumer) eventLoop() {
var n *cluster.Notification
Expand All @@ -141,6 +149,7 @@ func (c *ClusterConsumer) eventLoop() {
case err, ok := <-c.consumer.Errors():
if ok {
c.logger.Warn("cluster consumer error", zap.Error(err))
c.errorsC <- err
}
case _, ok := <-c.metricsTicker.C:
if ok && n != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/stretchr/testify/suite"
"github.com/uber-go/kafka-client/internal/util"
"github.com/uber-go/kafka-client/kafka"
"github.com/gig/kafka-client/lib/util"
"github.com/gig/kafka-client/kafka"
"github.com/uber-go/tally"
"go.uber.org/zap"
)
Expand Down
Loading