Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ It's possible that your topic and system are entirely ok with losing some messag

`Kaffe.Producer` handles producing messages to Kafka and will automatically select the topic partitions per message or can be given a function to call to determine the partition per message. Kaffe automatically inserts a Kafka timestamp with each message.

Configure your Kaffe Producer in your mix config. For all options, see `Kaffe.Config.Producer`.
Configure your Kaffe Producer(s) in your mix config. For all options, see `Kaffe.Config.Producer`.

Single Producer configuration:

```elixir
config :kaffe,
Expand All @@ -299,6 +301,31 @@ config :kaffe,
]
```

Multiple Producers configuration:

```elixir
config :kaffe,
producers: %{
"producer_1" => [
endpoints: [kafka1: 9092], # [hostname: port]
topics: ["kafka-topic"],

# optional
partition_strategy: :md5,
ssl: true,
sasl: %{
mechanism: :plain,
login: System.get_env("KAFFE_PRODUCER_USER"),
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
}
],
"producer_2" => [
endpoints: [kafka2: 9092], # [hostname: port]
topics: ["another-kafka-topic"]
]
}
```

The `partition_strategy` setting can be one of:

- `:md5`: (default) provides even and deterministic distribution of the messages over the available partitions based on an MD5 hash of the key
Expand Down Expand Up @@ -347,6 +374,10 @@ There are several ways to produce:
Kaffe.Producer.produce_sync("topic", [{"key1", "value1"}, {"key2", "value2"}])
```

```elixir
Kaffe.Producer.produce_sync_with_client("producer_1", "topic", [{"key1", "value1"}, {"key2", "value2"}])
```

- `topic`/`partition`/`message_list` - Produce each message in the list to the given `topic`/`partition`.

Each item in the list is a tuple of the key and value: `{key, value}`.
Expand All @@ -355,24 +386,40 @@ There are several ways to produce:
Kaffe.Producer.produce_sync("topic", 2, [{"key1", "value1"}, {"key2", "value2"}])
```

```elixir
Kaffe.Producer.produce_sync_with_client("producer_1", "topic", 2, [{"key1", "value1"}, {"key2", "value2"}])
```

- `key`/`value` - The key/value will be produced to the first topic given to the producer when it was started. The partition will be selected with the chosen strategy or given function.

```elixir
Kaffe.Producer.produce_sync("key", "value")
```

```elixir
Kaffe.Producer.produce_sync_with_client("producer_1", "key", "value")
```

- `topic`/`key`/`value` - The key/value will be produced to the given topic.

```elixir
Kaffe.Producer.produce_sync("whitelist", "key", "value")
```

```elixir
Kaffe.Producer.produce_sync_with_client("producer_1", "whitelist", "key", "value")
```

- `topic`/`partition`/`key`/`value` - The key/value will be produced to the given topic/partition.

```elixir
Kaffe.Producer.produce_sync("whitelist", 2, "key", "value")
```

```elixir
Kaffe.Producer.produce_sync_with_client("producer_1", "whitelist", 2, "key", "value")
```

**NOTE**: With this approach Kaffe will not calculate the next partition since it assumes you're taking over that job by giving it a specific partition.

## Testing
Expand Down
20 changes: 11 additions & 9 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ config :kaffe,
}
]
},
producer: [
endpoints: [kafka: 9092],
topics: ["kaffe-test"],
sasl: %{
mechanism: :plain,
login: System.get_env("KAFFE_PRODUCER_USER"),
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
}
]
producers: %{
"producer_name" => [
endpoints: [kafka: 9092],
topics: ["kaffe-test"],
sasl: %{
mechanism: :plain,
login: System.get_env("KAFFE_PRODUCER_USER"),
password: System.get_env("KAFFE_PRODUCER_PASSWORD")
}
]
}
13 changes: 10 additions & 3 deletions lib/kaffe.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,16 @@ defmodule Kaffe do

Logger.debug("event#start=#{__MODULE__}")

if Application.get_env(:kaffe, :producer) do
Logger.debug("event#start_producer_client=#{__MODULE__}")
Kaffe.Producer.start_producer_client()
Kaffe.Config.Producer.maybe_set_producers_env!()

if producers = Application.get_env(:kaffe, :producers) do
producers
|> Map.keys()
|> Enum.each(fn config_key ->
Logger.debug("event#start_producer_client=#{__MODULE__}_#{config_key}")
config = Kaffe.Config.Producer.configuration(config_key)
Kaffe.Producer.start_producer_client(config)
end)
end

children = []
Expand Down
138 changes: 100 additions & 38 deletions lib/kaffe/config/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,78 +58,140 @@ defmodule Kaffe.Config.Producer do

import Kaffe.Config, only: [heroku_kafka_endpoints: 0, parse_endpoints: 1]

def configuration do
require Logger

@default_producer_config_key "producer"

def configuration(config_key) do
%{
endpoints: endpoints(),
producer_config: client_producer_config(),
client_name: config_get(:client_name, :kaffe_producer_client),
topics: producer_topics(),
partition_strategy: config_get(:partition_strategy, :md5)
endpoints: endpoints(config_key),
producer_config: client_producer_config(config_key),
client_name: client_name(config_key),
topics: producer_topics(config_key),
partition_strategy: partition_strategy(config_key)
}
end

def producer_topics, do: config_get!(:topics)
def producer_topics(config_key), do: config_get!(config_key, :topics)

def endpoints do
if heroku_kafka?() do
def endpoints(config_key) do
if heroku_kafka?(config_key) do
heroku_kafka_endpoints()
else
parse_endpoints(config_get!(:endpoints))
parse_endpoints(config_get!(config_key, :endpoints))
end
end

def client_producer_config do
default_client_producer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options() ++ ssl_options()
def client_producer_config(config_key) do
default_client_producer_config(config_key) ++
maybe_heroku_kafka_ssl(config_key) ++
sasl_options(config_key) ++
ssl_options(config_key)
end

def client_name(config_key) do
config_get(config_key, :client_name, :"kaffe_producer_client_#{config_key}")
end

def partition_strategy(config_key) do
config_get(config_key, :partition_strategy, :md5)
end

def sasl_options do
:sasl
|> config_get(%{})
def sasl_options(config_key) do
config_key
|> config_get(:sasl, %{})
|> Kaffe.Config.sasl_config()
end

def maybe_heroku_kafka_ssl do
case heroku_kafka?() do
def maybe_heroku_kafka_ssl(config_key) do
case heroku_kafka?(config_key) do
true -> Kaffe.Config.ssl_config()
false -> []
end
end

def ssl_options do
:ssl
|> config_get(false)
def ssl_options(config_key) do
config_key
|> config_get(:ssl, false)
|> Kaffe.Config.ssl_config()
end

def default_client_producer_config do
def default_client_producer_config(config_key) do
[
auto_start_producers: config_get(:auto_start_producers, true),
allow_topic_auto_creation: config_get(:allow_topic_auto_creation, false),
auto_start_producers: config_get(config_key, :auto_start_producers, true),
allow_topic_auto_creation: config_get(config_key, :allow_topic_auto_creation, false),
default_producer_config: [
required_acks: config_get(:required_acks, -1),
ack_timeout: config_get(:ack_timeout, 1000),
partition_buffer_limit: config_get(:partition_buffer_limit, 512),
partition_onwire_limit: config_get(:partition_onwire_limit, 1),
max_batch_size: config_get(:max_batch_size, 1_048_576),
max_retries: config_get(:max_retries, 3),
retry_backoff_ms: config_get(:retry_backoff_ms, 500),
compression: config_get(:compression, :no_compression),
min_compression_batch_size: config_get(:min_compression_batch_size, 1024)
required_acks: config_get(config_key, :required_acks, -1),
ack_timeout: config_get(config_key, :ack_timeout, 1000),
partition_buffer_limit: config_get(config_key, :partition_buffer_limit, 512),
partition_onwire_limit: config_get(config_key, :partition_onwire_limit, 1),
max_batch_size: config_get(config_key, :max_batch_size, 1_048_576),
max_retries: config_get(config_key, :max_retries, 3),
retry_backoff_ms: config_get(config_key, :retry_backoff_ms, 500),
compression: config_get(config_key, :compression, :no_compression),
min_compression_batch_size: config_get(config_key, :min_compression_batch_size, 1024)
]
]
end

def heroku_kafka? do
config_get(:heroku_kafka_env, false)
def heroku_kafka?(config_key) do
config_get(config_key, :heroku_kafka_env, false)
end

def config_get!(key) do
Application.get_env(:kaffe, :producer)
def config_get!(config_key, key) do
:kaffe
|> Application.get_env(:producers)
|> Map.fetch!(config_key)
|> Keyword.fetch!(key)
end

def config_get(key, default) do
Application.get_env(:kaffe, :producer)
def config_get(config_key, key, default) do
:kaffe
|> Application.get_env(:producers)
|> Map.fetch!(config_key)
|> Keyword.get(key, default)
end

def list_config_keys do
:kaffe
|> Application.get_env(:producers)
|> Map.keys()
end

@doc """
Sets :kaffe, :producers application env if :kaffe, :producer is present.

Provides backward compatibility between single producer and multiple producers.
`#{@default_producer_config_key}` config key is used for multiple producers config.
"""
@spec maybe_set_producers_env!() :: :ok
def maybe_set_producers_env! do
single_config = Application.get_env(:kaffe, :producer) || []
multiple_config = Application.get_env(:kaffe, :producers) || %{}

if !Enum.empty?(single_config) and !Enum.empty?(multiple_config) do
raise("""
FOUND SINGLE PRODUCER AND MULTIPLE PRODUCERS CONFIG:

Delete `:kaffe, :producers` or `:kaffe, :producer` configuration.
""")
end

if !Enum.empty?(single_config) and Enum.empty?(multiple_config) do
multiple_config = %{@default_producer_config_key => single_config}

Logger.info("""
Configuration for single producer is specified in :kaffe, :producer.

To ensure backward compatibility :kaffe, :producers was set to a map \
with default producer name as the key and single producer config as the value:

config :kaffe, producers: #{inspect(multiple_config)}
""")

Application.put_env(:kaffe, :producers, multiple_config)
end

:ok
end
end
Loading