diff --git a/README.md b/README.md index 664d552..9b31220 100644 --- a/README.md +++ b/README.md @@ -280,7 +280,9 @@ It's possible that your topic and system are entirely ok with losing some messag `Kaffe.Producer` handles producing messages to Kafka and will automatically select the topic partitions per message or can be given a function to call to determine the partition per message. Kaffe automatically inserts a Kafka timestamp with each message. -Configure your Kaffe Producer in your mix config. For all options, see `Kaffe.Config.Producer`. +Configure your Kaffe Producer(s) in your mix config. For all options, see `Kaffe.Config.Producer`. + +Single Producer configuration: ```elixir config :kaffe, @@ -299,6 +301,31 @@ config :kaffe, ] ``` +Multiple Producers configuration: + +```elixir +config :kaffe, + producers: %{ + "producer_1" => [ + endpoints: [kafka1: 9092], # [hostname: port] + topics: ["kafka-topic"], + + # optional + partition_strategy: :md5, + ssl: true, + sasl: %{ + mechanism: :plain, + login: System.get_env("KAFFE_PRODUCER_USER"), + password: System.get_env("KAFFE_PRODUCER_PASSWORD") + } + ], + "producer_2" => [ + endpoints: [kafka2: 9092], # [hostname: port] + topics: ["another-kafka-topic"] + ] + } +``` + The `partition_strategy` setting can be one of: - `:md5`: (default) provides even and deterministic distribution of the messages over the available partitions based on an MD5 hash of the key @@ -347,6 +374,10 @@ There are several ways to produce: Kaffe.Producer.produce_sync("topic", [{"key1", "value1"}, {"key2", "value2"}]) ``` + ```elixir + Kaffe.Producer.produce_sync_with_client("producer_1", "topic", [{"key1", "value1"}, {"key2", "value2"}]) + ``` + - `topic`/`partition`/`message_list` - Produce each message in the list to the given `topic`/`partition`. Each item in the list is a tuple of the key and value: `{key, value}`. @@ -355,24 +386,40 @@ There are several ways to produce: Kaffe.Producer.produce_sync("topic", 2, [{"key1", "value1"}, {"key2", "value2"}]) ``` + ```elixir + Kaffe.Producer.produce_sync_with_client("producer_1", "topic", 2, [{"key1", "value1"}, {"key2", "value2"}]) + ``` + - `key`/`value` - The key/value will be produced to the first topic given to the producer when it was started. The partition will be selected with the chosen strategy or given function. ```elixir Kaffe.Producer.produce_sync("key", "value") ``` + ```elixir + Kaffe.Producer.produce_sync_with_client("producer_1", "key", "value") + ``` + - `topic`/`key`/`value` - The key/value will be produced to the given topic. ```elixir Kaffe.Producer.produce_sync("whitelist", "key", "value") ``` + ```elixir + Kaffe.Producer.produce_sync_with_client("producer_1", "whitelist", "key", "value") + ``` + - `topic`/`partition`/`key`/`value` - The key/value will be produced to the given topic/partition. ```elixir Kaffe.Producer.produce_sync("whitelist", 2, "key", "value") ``` + ```elixir + Kaffe.Producer.produce_sync_with_client("producer_1", "whitelist", 2, "key", "value") + ``` + **NOTE**: With this approach Kaffe will not calculate the next partition since it assumes you're taking over that job by giving it a specific partition. ## Testing diff --git a/config/test.exs b/config/test.exs index 2c9356a..f069722 100644 --- a/config/test.exs +++ b/config/test.exs @@ -25,12 +25,14 @@ config :kaffe, } ] }, - producer: [ - endpoints: [kafka: 9092], - topics: ["kaffe-test"], - sasl: %{ - mechanism: :plain, - login: System.get_env("KAFFE_PRODUCER_USER"), - password: System.get_env("KAFFE_PRODUCER_PASSWORD") - } - ] + producers: %{ + "producer_name" => [ + endpoints: [kafka: 9092], + topics: ["kaffe-test"], + sasl: %{ + mechanism: :plain, + login: System.get_env("KAFFE_PRODUCER_USER"), + password: System.get_env("KAFFE_PRODUCER_PASSWORD") + } + ] + } diff --git a/lib/kaffe.ex b/lib/kaffe.ex index d5155c8..d235453 100644 --- a/lib/kaffe.ex +++ b/lib/kaffe.ex @@ -14,9 +14,16 @@ defmodule Kaffe do Logger.debug("event#start=#{__MODULE__}") - if Application.get_env(:kaffe, :producer) do - Logger.debug("event#start_producer_client=#{__MODULE__}") - Kaffe.Producer.start_producer_client() + Kaffe.Config.Producer.maybe_set_producers_env!() + + if producers = Application.get_env(:kaffe, :producers) do + producers + |> Map.keys() + |> Enum.each(fn config_key -> + Logger.debug("event#start_producer_client=#{__MODULE__}_#{config_key}") + config = Kaffe.Config.Producer.configuration(config_key) + Kaffe.Producer.start_producer_client(config) + end) end children = [] diff --git a/lib/kaffe/config/producer.ex b/lib/kaffe/config/producer.ex index f818131..87166e8 100644 --- a/lib/kaffe/config/producer.ex +++ b/lib/kaffe/config/producer.ex @@ -58,78 +58,140 @@ defmodule Kaffe.Config.Producer do import Kaffe.Config, only: [heroku_kafka_endpoints: 0, parse_endpoints: 1] - def configuration do + require Logger + + @default_producer_config_key "producer" + + def configuration(config_key) do %{ - endpoints: endpoints(), - producer_config: client_producer_config(), - client_name: config_get(:client_name, :kaffe_producer_client), - topics: producer_topics(), - partition_strategy: config_get(:partition_strategy, :md5) + endpoints: endpoints(config_key), + producer_config: client_producer_config(config_key), + client_name: client_name(config_key), + topics: producer_topics(config_key), + partition_strategy: partition_strategy(config_key) } end - def producer_topics, do: config_get!(:topics) + def producer_topics(config_key), do: config_get!(config_key, :topics) - def endpoints do - if heroku_kafka?() do + def endpoints(config_key) do + if heroku_kafka?(config_key) do heroku_kafka_endpoints() else - parse_endpoints(config_get!(:endpoints)) + parse_endpoints(config_get!(config_key, :endpoints)) end end - def client_producer_config do - default_client_producer_config() ++ maybe_heroku_kafka_ssl() ++ sasl_options() ++ ssl_options() + def client_producer_config(config_key) do + default_client_producer_config(config_key) ++ + maybe_heroku_kafka_ssl(config_key) ++ + sasl_options(config_key) ++ + ssl_options(config_key) + end + + def client_name(config_key) do + config_get(config_key, :client_name, :"kaffe_producer_client_#{config_key}") + end + + def partition_strategy(config_key) do + config_get(config_key, :partition_strategy, :md5) end - def sasl_options do - :sasl - |> config_get(%{}) + def sasl_options(config_key) do + config_key + |> config_get(:sasl, %{}) |> Kaffe.Config.sasl_config() end - def maybe_heroku_kafka_ssl do - case heroku_kafka?() do + def maybe_heroku_kafka_ssl(config_key) do + case heroku_kafka?(config_key) do true -> Kaffe.Config.ssl_config() false -> [] end end - def ssl_options do - :ssl - |> config_get(false) + def ssl_options(config_key) do + config_key + |> config_get(:ssl, false) |> Kaffe.Config.ssl_config() end - def default_client_producer_config do + def default_client_producer_config(config_key) do [ - auto_start_producers: config_get(:auto_start_producers, true), - allow_topic_auto_creation: config_get(:allow_topic_auto_creation, false), + auto_start_producers: config_get(config_key, :auto_start_producers, true), + allow_topic_auto_creation: config_get(config_key, :allow_topic_auto_creation, false), default_producer_config: [ - required_acks: config_get(:required_acks, -1), - ack_timeout: config_get(:ack_timeout, 1000), - partition_buffer_limit: config_get(:partition_buffer_limit, 512), - partition_onwire_limit: config_get(:partition_onwire_limit, 1), - max_batch_size: config_get(:max_batch_size, 1_048_576), - max_retries: config_get(:max_retries, 3), - retry_backoff_ms: config_get(:retry_backoff_ms, 500), - compression: config_get(:compression, :no_compression), - min_compression_batch_size: config_get(:min_compression_batch_size, 1024) + required_acks: config_get(config_key, :required_acks, -1), + ack_timeout: config_get(config_key, :ack_timeout, 1000), + partition_buffer_limit: config_get(config_key, :partition_buffer_limit, 512), + partition_onwire_limit: config_get(config_key, :partition_onwire_limit, 1), + max_batch_size: config_get(config_key, :max_batch_size, 1_048_576), + max_retries: config_get(config_key, :max_retries, 3), + retry_backoff_ms: config_get(config_key, :retry_backoff_ms, 500), + compression: config_get(config_key, :compression, :no_compression), + min_compression_batch_size: config_get(config_key, :min_compression_batch_size, 1024) ] ] end - def heroku_kafka? do - config_get(:heroku_kafka_env, false) + def heroku_kafka?(config_key) do + config_get(config_key, :heroku_kafka_env, false) end - def config_get!(key) do - Application.get_env(:kaffe, :producer) + def config_get!(config_key, key) do + :kaffe + |> Application.get_env(:producers) + |> Map.fetch!(config_key) |> Keyword.fetch!(key) end - def config_get(key, default) do - Application.get_env(:kaffe, :producer) + def config_get(config_key, key, default) do + :kaffe + |> Application.get_env(:producers) + |> Map.fetch!(config_key) |> Keyword.get(key, default) end + + def list_config_keys do + :kaffe + |> Application.get_env(:producers) + |> Map.keys() + end + + @doc """ + Sets :kaffe, :producers application env if :kaffe, :producer is present. + + Provides backward compatibility between single producer and multiple producers. + `#{@default_producer_config_key}` config key is used for multiple producers config. + """ + @spec maybe_set_producers_env!() :: :ok + def maybe_set_producers_env! do + single_config = Application.get_env(:kaffe, :producer) || [] + multiple_config = Application.get_env(:kaffe, :producers) || %{} + + if !Enum.empty?(single_config) and !Enum.empty?(multiple_config) do + raise(""" + FOUND SINGLE PRODUCER AND MULTIPLE PRODUCERS CONFIG: + + Delete `:kaffe, :producers` or `:kaffe, :producer` configuration. + """) + end + + if !Enum.empty?(single_config) and Enum.empty?(multiple_config) do + multiple_config = %{@default_producer_config_key => single_config} + + Logger.info(""" + Configuration for single producer is specified in :kaffe, :producer. + + To ensure backward compatibility :kaffe, :producers was set to a map \ + with default producer name as the key and single producer config as the value: + + config :kaffe, producers: #{inspect(multiple_config)} + """) + + Application.put_env(:kaffe, :producers, multiple_config) + end + + :ok + end end diff --git a/lib/kaffe/producer.ex b/lib/kaffe/producer.ex index 376d012..2ded867 100644 --- a/lib/kaffe/producer.ex +++ b/lib/kaffe/producer.ex @@ -9,6 +9,8 @@ defmodule Kaffe.Producer do Currently only synchronous production is supported. """ + alias Kaffe.Config.Producer, as: ProducerConfig + @kafka Application.compile_env(:kaffe, :kafka_mod, :brod) @typedoc """ @@ -30,14 +32,28 @@ defmodule Kaffe.Producer do """ @type headers :: [{key :: binary(), value :: binary()}] + @typedoc """ + The method by which a partition in a topic is selected for sending a message + """ + @type partition_strategy :: + :md5 + | :random + | (topic :: binary(), partitions_count :: integer(), key :: binary(), value :: binary() -> + partition :: integer()) + + @typedoc """ + The name of the producer in the configuration that will be used to send the message + """ + @type config_key :: atom() | binary() + require Logger ## ------------------------------------------------------------------------- ## public api ## ------------------------------------------------------------------------- - def start_producer_client do - @kafka.start_client(config().endpoints, client_name(), config().producer_config) + def start_producer_client(config) do + @kafka.start_client(config.endpoints, config.client_name, config.producer_config) end @doc """ @@ -52,8 +68,10 @@ defmodule Kaffe.Producer do * `:ok` on successfully producing each message * `{:error, reason}` for any error """ + @spec produce(binary(), list(message() | message_object()), partition_strategy: partition_strategy()) :: + :ok | {:error, any()} def produce(topic, message_list, opts \\ []) do - produce_list(topic, message_list, partition_strategy_from(opts)) + produce_with_client(first_config_key(), topic, message_list, opts) end @doc """ @@ -71,13 +89,14 @@ defmodule Kaffe.Producer do * `:ok` on successfully producing each message * `{:error, reason}` for any error """ + @spec produce_sync(topic :: binary(), message_list :: list(message() | message_object())) :: :ok | {:error, any()} def produce_sync(topic, message_list) when is_list(message_list) do - produce_list(topic, message_list, global_partition_strategy()) + produce_sync_with_client(first_config_key(), topic, message_list) end + @spec produce_sync(key :: binary(), value :: binary()) :: :ok | {:error, any()} def produce_sync(key, value) do - topic = config().topics |> List.first() - produce_value(topic, key, value) + produce_sync_with_client(first_config_key(), key, value) end @doc """ @@ -92,12 +111,15 @@ defmodule Kaffe.Producer do * `:ok` on successfully producing each message * `{:error, reason}` for any error """ + @spec produce_sync(topic :: binary(), partition :: integer(), message_list :: list(message() | message_object())) :: + :ok | {:error, any()} def produce_sync(topic, partition, message_list) when is_list(message_list) do - produce_list(topic, message_list, fn _, _, _, _ -> partition end) + produce_sync_with_client(first_config_key(), topic, partition, message_list) end + @spec produce_sync(topic :: binary(), key :: binary(), value :: binary()) :: :ok | {:error, any()} def produce_sync(topic, key, value) do - produce_value(topic, key, value) + produce_sync_with_client(first_config_key(), topic, key, value) end @doc """ @@ -105,23 +127,126 @@ defmodule Kaffe.Producer do See `produce_sync/2` for returns. """ + @spec produce_sync(topic :: binary(), partition :: integer(), key :: binary(), value :: binary()) :: + :ok | {:error, any()} def produce_sync(topic, partition, key, value) do - @kafka.produce_sync(client_name(), topic, partition, key, value) + produce_sync_with_client(first_config_key(), topic, partition, key, value) + end + + @doc """ + Synchronously produce the `messages_list` to `topic` using the specified client + + - `messages_list` must be a list of type `message()` or `message_object()` + - `opts` may include the partition strategy to use, + `partition_strategy: :md5`, or `:random` or a function. + + Returns: + + * `:ok` on successfully producing each message + * `{:error, reason}` for any error + """ + @spec produce_with_client( + config_key :: config_key(), + topic :: binary(), + list(message() | message_object()), + partition_strategy: partition_strategy() + ) :: :ok | {:error, any()} + def produce_with_client(config_key, topic, message_list, opts \\ []) do + produce_list(config_key, topic, message_list, partition_strategy_from(config_key, opts)) + end + + @doc """ + Synchronously produce the `message_list` to `topic` using the specified client + + `messages` must be a list of type `message()` or `message_object()` + + Alternatively, synchronously produce the given `key`/`value` to the first Kafka topic. + + This is a simpler way to produce if you've only given Producer a single topic + for production and don't want to specify the topic for each call. + + Returns: + + * `:ok` on successfully producing each message + * `{:error, reason}` for any error + """ + @spec produce_sync_with_client( + config_key :: config_key(), + topic :: binary(), + message_list :: list(message() | message_object()) + ) :: :ok | {:error, any()} + def produce_sync_with_client(config_key, topic, message_list) when is_list(message_list) do + produce_list(config_key, topic, message_list, global_partition_strategy(config_key)) + end + + @spec produce_sync_with_client(config_key :: config_key(), key :: binary(), value :: binary()) :: + :ok | {:error, any()} + def produce_sync_with_client(config_key, key, value) do + topic = config(config_key).topics |> List.first() + produce_value(config_key, topic, key, value) + end + + @doc """ + Synchronously produce the `message_list` to `topic`/`partition` using the specified client + + `message_list` must be a list of type `message()` or `message_object()` + + Alternatively, synchronously produce the `key`/`value` to `topic` + + Returns: + + * `:ok` on successfully producing each message + * `{:error, reason}` for any error + """ + @spec produce_sync_with_client( + config_key :: config_key(), + topic :: binary(), + partition :: integer(), + message_list :: list(message() | message_object()) + ) :: :ok | {:error, any()} + def produce_sync_with_client(config_key, topic, partition, message_list) when is_list(message_list) do + produce_list(config_key, topic, message_list, fn _, _, _, _ -> partition end) + end + + @spec produce_sync_with_client( + config_key :: config_key(), + topic :: binary(), + key :: binary(), + value :: binary() + ) :: :ok | {:error, any()} + def produce_sync_with_client(config_key, topic, key, value) do + produce_value(config_key, topic, key, value) + end + + @doc """ + Synchronously produce the given `key`/`value` to the `topic`/`partition` using the specified client + + See `produce_sync/2` for returns. + """ + @spec produce_sync_with_client( + config_key :: config_key(), + topic :: binary(), + partition :: integer(), + key :: binary(), + value :: binary() + ) :: :ok | {:error, any()} + def produce_sync_with_client(config_key, topic, partition, key, value) do + @kafka.produce_sync(client_name(config_key), topic, partition, key, value) end ## ------------------------------------------------------------------------- ## internal ## ------------------------------------------------------------------------- - defp produce_list(topic, message_list, partition_strategy) when is_list(message_list) do - Logger.debug("event#produce_list topic=#{topic}") + defp produce_list(config_key, topic, message_list, partition_strategy) when is_list(message_list) do + Logger.debug("event#produce_list config_key=#{config_key} topic=#{topic}") message_list - |> add_timestamp - |> group_by_partition(topic, partition_strategy) + |> add_timestamp() + |> group_by_partition(config_key, topic, partition_strategy) |> case do messages = %{} -> - produce_list_to_topic(messages, topic) + produce_list_to_topic(config_key, messages, topic) {:error, reason} -> Logger.warning("Error while grouping by partition #{inspect(reason)}") @@ -129,19 +254,19 @@ defmodule Kaffe.Producer do end end - defp produce_value(topic, key, value) do - case @kafka.get_partitions_count(client_name(), topic) do + defp produce_value(config_key, topic, key, value) do + case @kafka.get_partitions_count(client_name(config_key), topic) do {:ok, partitions_count} -> - partition = choose_partition(topic, partitions_count, key, value, global_partition_strategy()) + partition = choose_partition(topic, partitions_count, key, value, global_partition_strategy(config_key)) Logger.debug( - "event#produce topic=#{topic} key=#{key} partitions_count=#{partitions_count} selected_partition=#{partition}" + "event#produce config_key=#{config_key} topic=#{topic} key=#{key} partitions_count=#{partitions_count} selected_partition=#{partition}" ) - @kafka.produce_sync(client_name(), topic, partition, key, value) + @kafka.produce_sync(client_name(config_key), topic, partition, key, value) error -> - Logger.warning("event#produce topic=#{topic} key=#{key} error=#{inspect(error)}") + Logger.warning("event#produce config_key=#{config_key} topic=#{topic} key=#{key} error=#{inspect(error)}") error end @@ -159,8 +284,8 @@ defmodule Kaffe.Producer do defp add_timestamp_to_message({key, message}), do: {System.system_time(:millisecond), key, message} - defp group_by_partition(messages, topic, partition_strategy) do - with {:ok, partitions_count} <- @kafka.get_partitions_count(client_name(), topic) do + defp group_by_partition(messages, config_key, topic, partition_strategy) do + with {:ok, partitions_count} <- @kafka.get_partitions_count(client_name(config_key), topic) do messages |> Enum.group_by(fn {_timestamp, key, message} -> @@ -172,22 +297,22 @@ defmodule Kaffe.Producer do end end - defp produce_list_to_topic(message_list, topic) do + defp produce_list_to_topic(config_key, message_list, topic) do message_list |> Enum.reduce_while(:ok, fn {partition, messages}, :ok -> - Logger.debug("event#produce_list_to_topic topic=#{topic} partition=#{partition}") + Logger.debug("event#produce_list_to_topic config_key=#{config_key} topic=#{topic} partition=#{partition}") - case @kafka.produce_sync(client_name(), topic, partition, "ignored", messages) do + case @kafka.produce_sync(client_name(config_key), topic, partition, "ignored", messages) do :ok -> {:cont, :ok} {:error, _reason} = error -> {:halt, error} end end) end - defp partition_strategy_from(opts) do + defp partition_strategy_from(config_key, opts) do case Keyword.fetch(opts, :partition_strategy) do {:ok, partition_strategy} -> partition_strategy - :error -> global_partition_strategy() + :error -> global_partition_strategy(config_key) end end @@ -203,15 +328,19 @@ defmodule Kaffe.Producer do fun.(topic, partitions_count, key, value) end - defp client_name do - config().client_name + defp client_name(config_key) do + ProducerConfig.client_name(config_key) + end + + defp global_partition_strategy(config_key) do + ProducerConfig.partition_strategy(config_key) end - defp global_partition_strategy do - config().partition_strategy + defp config(config_key) do + ProducerConfig.configuration(config_key) end - defp config do - Kaffe.Config.Producer.configuration() + defp first_config_key do + ProducerConfig.list_config_keys() |> List.first() end end diff --git a/test/kaffe/config/producer_test.exs b/test/kaffe/config/producer_test.exs index 03fa30d..f04992f 100644 --- a/test/kaffe/config/producer_test.exs +++ b/test/kaffe/config/producer_test.exs @@ -1,14 +1,18 @@ defmodule Kaffe.Config.ProducerTest do use ExUnit.Case, async: true + @default_producer "producer_name" + describe "configuration/0" do test "correct settings are extracted" do + config = Application.get_env(:kaffe, :producers) + no_sasl_config = - :kaffe - |> Application.get_env(:producer) - |> Keyword.delete(:sasl) + config + |> pop_in([@default_producer, :sasl]) + |> elem(1) - Application.put_env(:kaffe, :producer, no_sasl_config) + Application.put_env(:kaffe, :producers, no_sasl_config) expected = %{ endpoints: [{~c"kafka", 9092}], @@ -28,19 +32,19 @@ defmodule Kaffe.Config.ProducerTest do ] ], topics: ["kaffe-test"], - client_name: :kaffe_producer_client, + client_name: :kaffe_producer_client_producer_name, partition_strategy: :md5 } - assert Kaffe.Config.Producer.configuration() == expected + on_exit(fn -> + Application.put_env(:kaffe, :producers, config) + end) + + assert Kaffe.Config.Producer.configuration(@default_producer) == expected end test "correct settings with sasl plain are extracted" do - config = Application.get_env(:kaffe, :producer) - sasl = Keyword.get(config, :sasl) - sasl_config = Keyword.put(config, :sasl, %{mechanism: :plain, login: "Alice", password: "ecilA"}) - - Application.put_env(:kaffe, :producer, sasl_config) + update_producer_config(@default_producer, :sasl, %{mechanism: :plain, login: "Alice", password: "ecilA"}) expected = %{ endpoints: [{~c"kafka", 9092}], @@ -61,22 +65,183 @@ defmodule Kaffe.Config.ProducerTest do sasl: {:plain, "Alice", "ecilA"} ], topics: ["kaffe-test"], - client_name: :kaffe_producer_client, + client_name: :kaffe_producer_client_producer_name, + partition_strategy: :md5 + } + + assert Kaffe.Config.Producer.configuration(@default_producer) == expected + end + + test "correct settings are extracted for different producer clients" do + config = Application.get_env(:kaffe, :producers) + + multiple_producers_config = %{ + "producer_1" => [ + endpoints: [kafka1: 9092], + topics: ["kaffe-test-1"], + sasl: %{mechanism: :plain, login: "Alice", password: "ecilA"}, + ssl: true + ], + "producer_2" => [ + endpoints: [kafka2: 9092], + topics: ["kaffe-test-2"], + compression: :zstd + ] + } + + Application.put_env(:kaffe, :producers, multiple_producers_config) + + expected_producer_config_1 = %{ + endpoints: [{~c"kafka1", 9092}], + producer_config: [ + auto_start_producers: true, + allow_topic_auto_creation: false, + default_producer_config: [ + required_acks: -1, + ack_timeout: 1000, + partition_buffer_limit: 512, + partition_onwire_limit: 1, + max_batch_size: 1_048_576, + max_retries: 3, + retry_backoff_ms: 500, + compression: :no_compression, + min_compression_batch_size: 1024 + ], + sasl: {:plain, "Alice", "ecilA"}, + ssl: true + ], + topics: ["kaffe-test-1"], + client_name: :kaffe_producer_client_producer_1, + partition_strategy: :md5 + } + + expected_producer_config_2 = %{ + endpoints: [{~c"kafka2", 9092}], + producer_config: [ + auto_start_producers: true, + allow_topic_auto_creation: false, + default_producer_config: [ + required_acks: -1, + ack_timeout: 1000, + partition_buffer_limit: 512, + partition_onwire_limit: 1, + max_batch_size: 1_048_576, + max_retries: 3, + retry_backoff_ms: 500, + compression: :zstd, + min_compression_batch_size: 1024 + ] + ], + topics: ["kaffe-test-2"], + client_name: :kaffe_producer_client_producer_2, partition_strategy: :md5 } on_exit(fn -> - Application.put_env(:kaffe, :producer, Keyword.put(config, :sasl, sasl)) + Application.put_env(:kaffe, :producers, config) end) - assert Kaffe.Config.Producer.configuration() == expected + assert Kaffe.Config.Producer.configuration("producer_1") == expected_producer_config_1 + assert Kaffe.Config.Producer.configuration("producer_2") == expected_producer_config_2 + end + end + + describe "maybe_set_producers_env!/0" do + test "returns :ok if producers are not configured" do + producers_config = Application.get_env(:kaffe, :producers) + + Application.delete_env(:kaffe, :producers) + Application.delete_env(:kaffe, :producer) + + on_exit(fn -> Application.put_env(:kaffe, :producers, producers_config) end) + + assert :ok == Kaffe.Config.Producer.maybe_set_producers_env!() + + assert is_nil(Application.get_env(:kaffe, :producer)) + assert is_nil(Application.get_env(:kaffe, :producers)) + end + + test "returns :ok if producers are configured with the :producers key" do + producers_config = Application.get_env(:kaffe, :producers) + assert is_map(producers_config) + + Application.delete_env(:kaffe, :producer) + + assert :ok == Kaffe.Config.Producer.maybe_set_producers_env!() + + assert is_nil(Application.get_env(:kaffe, :producer)) + assert Application.get_env(:kaffe, :producers) == producers_config + end + + test "returns :ok and sets :kaffe, :producers if producer is configured with the :producer key" do + producers_config = Application.get_env(:kaffe, :producers) + producer_config = producers_config |> Map.values() |> List.first() + + Application.delete_env(:kaffe, :producers) + Application.put_env(:kaffe, :producer, producer_config) + + on_exit(fn -> + Application.put_env(:kaffe, :producers, producers_config) + Application.delete_env(:kaffe, :producer) + end) + + assert :ok == Kaffe.Config.Producer.maybe_set_producers_env!() + + assert Application.get_env(:kaffe, :producer) == producer_config + assert Application.get_env(:kaffe, :producers) == %{"producer" => producer_config} + end + + test "logs message if :kaffe, :producers was set with configuration from :kaffe, :producer" do + producers_config = Application.get_env(:kaffe, :producers) + producer_config = producers_config |> Map.values() |> List.first() + + Application.delete_env(:kaffe, :producers) + Application.put_env(:kaffe, :producer, producer_config) + + on_exit(fn -> + Application.put_env(:kaffe, :producers, producers_config) + Application.delete_env(:kaffe, :producer) + end) + + logs = + ExUnit.CaptureLog.capture_log(fn -> + assert :ok == Kaffe.Config.Producer.maybe_set_producers_env!() + end) + + assert logs =~ """ + Configuration for single producer is specified in :kaffe, :producer. + + To ensure backward compatibility :kaffe, :producers was set to a map \ + with default producer name as the key and single producer config as the value: + + config :kaffe, producers: %{\"producer\" => #{inspect(producer_config)}} + """ + end + + test "raises error is both :producer and :producers keys are present" do + producers_config = Application.get_env(:kaffe, :producers) + producer_config = producers_config |> Map.values() |> List.first() + + Application.put_env(:kaffe, :producer, producer_config) + + on_exit(fn -> + Application.delete_env(:kaffe, :producer) + end) + + expected_message = """ + FOUND SINGLE PRODUCER AND MULTIPLE PRODUCERS CONFIG: + + Delete `:kaffe, :producers` or `:kaffe, :producer` configuration. + """ + + assert_raise RuntimeError, expected_message, fn -> + Kaffe.Config.Producer.maybe_set_producers_env!() + end end end test "string endpoints parsed correctly" do - config = Application.get_env(:kaffe, :producer) - endpoints = Keyword.get(config, :endpoints) - Application.put_env(:kaffe, :producer, Keyword.put(config, :endpoints, "kafka:9092,localhost:9092")) + update_producer_config(@default_producer, :endpoints, "kafka:9092,localhost:9092") expected = %{ endpoints: [{~c"kafka", 9092}, {~c"localhost", 9092}], @@ -96,21 +261,15 @@ defmodule Kaffe.Config.ProducerTest do ] ], topics: ["kaffe-test"], - client_name: :kaffe_producer_client, + client_name: :kaffe_producer_client_producer_name, partition_strategy: :md5 } - on_exit(fn -> - Application.put_env(:kaffe, :producer, Keyword.put(config, :endpoints, endpoints)) - end) - - assert Kaffe.Config.Producer.configuration() == expected + assert Kaffe.Config.Producer.configuration(@default_producer) == expected end test "adds ssl when true" do - config = Application.get_env(:kaffe, :producer) - ssl = Keyword.get(config, :ssl) - Application.put_env(:kaffe, :producer, Keyword.put(config, :ssl, true)) + update_producer_config(@default_producer, :ssl, true) expected = %{ endpoints: [{~c"kafka", 9092}], @@ -131,14 +290,16 @@ defmodule Kaffe.Config.ProducerTest do ssl: true ], topics: ["kaffe-test"], - client_name: :kaffe_producer_client, + client_name: :kaffe_producer_client_producer_name, partition_strategy: :md5 } - on_exit(fn -> - Application.put_env(:kaffe, :producer, Keyword.put(config, :ssl, ssl)) - end) + assert Kaffe.Config.Producer.configuration(@default_producer) == expected + end - assert Kaffe.Config.Producer.configuration() == expected + defp update_producer_config(config_key, key, value) do + producers_config = Application.get_env(:kaffe, :producers) + Application.put_env(:kaffe, :producers, put_in(producers_config, [config_key, key], value)) + on_exit(fn -> Application.put_env(:kaffe, :producers, producers_config) end) end end diff --git a/test/kaffe/producer_test.exs b/test/kaffe/producer_test.exs index 3d87399..d0143e1 100644 --- a/test/kaffe/producer_test.exs +++ b/test/kaffe/producer_test.exs @@ -3,10 +3,13 @@ defmodule Kaffe.ProducerTest do alias Kaffe.Producer + @default_client_config_key "producer_name" + @default_client_name :kaffe_producer_client_producer_name + setup do Process.register(self(), :test_case) - update_producer_config(:topics, ["topic", "topic2"]) - update_producer_config(:partition_strategy, :md5) + update_producer_config(@default_client_config_key, :topics, ["topic", "topic2"]) + update_producer_config(@default_client_config_key, :partition_strategy, :md5) TestBrod.set_produce_response(:ok) :ok end @@ -14,17 +17,33 @@ defmodule Kaffe.ProducerTest do describe "produce_sync" do test "(key, value) produces a message to the first configured topic" do :ok = Producer.produce_sync("key8", "value") - assert_receive [:produce_sync, "topic", 17, "key8", "value"] + assert_receive [:produce_sync, @default_client_name, "topic", 17, "key8", "value"] end test "(topic, message_list) produces messages to the specific topic" do :ok = Producer.produce_sync("topic2", [{"key8", "value1"}, {"key12", "value2"}]) - assert_receive [:produce_sync, "topic2", 17, "ignored", [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}]] + + assert_receive [ + :produce_sync, + @default_client_name, + "topic2", + 17, + "ignored", + [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}] + ] end test "(topic, message_list, partition_strategy) produces messages to the specific topic" do :ok = Producer.produce("topic2", [{"key8", "value1"}, {"key12", "value2"}], partition_strategy: :md5) - assert_receive [:produce_sync, "topic2", 17, "ignored", [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}]] + + assert_receive [ + :produce_sync, + @default_client_name, + "topic2", + 17, + "ignored", + [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}] + ] end test "(topic, message_list, partition_strategy) produces messages to the specific topic and partition" do @@ -33,26 +52,34 @@ defmodule Kaffe.ProducerTest do partition_strategy: fn _topic, _partitions_count, _key, _value -> 19 end ) - assert_receive [:produce_sync, "topic2", 19, "ignored", [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}]] + assert_receive [ + :produce_sync, + @default_client_name, + "topic2", + 19, + "ignored", + [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}] + ] end test "(topic, key, value) produces a message to the specific topic" do :ok = Producer.produce_sync("topic2", "key8", "value") - assert_receive [:produce_sync, "topic2", 17, "key8", "value"] + assert_receive [:produce_sync, @default_client_name, "topic2", 17, "key8", "value"] end test "(topic, partition, key, value) produces a message to the specific topic/partition" do partition = 99 :ok = Producer.produce_sync("topic2", partition, "key", "value") - assert_receive [:produce_sync, "topic2", ^partition, "key", "value"] + assert_receive [:produce_sync, @default_client_name, "topic2", ^partition, "key", "value"] end - test "(topic, partition, key, message_list) produces a list of messages to the specific topic/parition" do + test "(topic, partition, key, message_list) produces a list of messages to the specific topic/partition" do partition = 99 :ok = Producer.produce_sync("topic2", partition, [{"key8", "value1"}, {"key12", "value2"}]) assert_receive [ :produce_sync, + @default_client_name, "topic2", ^partition, "ignored", @@ -66,22 +93,162 @@ defmodule Kaffe.ProducerTest do end end + describe "produce_sync_with_client" do + setup do + current_producers_config = Application.get_env(:kaffe, :producers) + + producers_config = %{ + "producer_1" => [ + endpoints: [kafka1: 9092], + topics: ["topic", "topic2"], + partition_strategy: :md5 + ], + "producer_2" => [ + endpoints: [kafka2: 9092], + topics: ["topic"], + partition_strategy: :md5 + ] + } + + Application.put_env(:kaffe, :producers, producers_config) + on_exit(fn -> Application.put_env(:kaffe, :producers, current_producers_config) end) + + producers = + producers_config + |> Map.keys() + |> Enum.map(&{&1, Kaffe.Config.Producer.configuration(&1)}) + + %{producers: producers} + end + + test "(key, value) produces a message to the first configured topic with specified client", %{producers: producers} do + Enum.each(producers, fn {config_key, %{client_name: client_name, topics: [topic | _]}} -> + :ok = Producer.produce_sync_with_client(config_key, "key8", "value") + assert_receive [:produce_sync, ^client_name, ^topic, 17, "key8", "value"] + end) + end + + test "(topic, message_list) produces messages to the specific topic with specified client", %{producers: producers} do + Enum.each(producers, fn {config_key, %{client_name: client_name}} -> + :ok = Producer.produce_sync_with_client(config_key, "topic2", [{"key8", "value1"}, {"key12", "value2"}]) + + assert_receive [ + :produce_sync, + ^client_name, + "topic2", + 17, + "ignored", + [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}] + ] + end) + end + + test "(topic, message_list, partition_strategy) produces messages to the specific topic with specified client", + %{producers: producers} do + Enum.each(producers, fn {config_key, %{client_name: client_name}} -> + :ok = + Producer.produce_with_client(config_key, "topic2", [{"key8", "value1"}, {"key12", "value2"}], + partition_strategy: :md5 + ) + + assert_receive [ + :produce_sync, + ^client_name, + "topic2", + 17, + "ignored", + [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}] + ] + end) + end + + test "(topic, message_list, partition_strategy) produces messages to the specific topic and partition with specified client", + %{producers: producers} do + Enum.each(producers, fn {config_key, %{client_name: client_name}} -> + :ok = + Producer.produce_with_client(config_key, "topic2", [{"key8", "value1"}, {"key12", "value2"}], + partition_strategy: fn _topic, _partitions_count, _key, _value -> 19 end + ) + + assert_receive [ + :produce_sync, + ^client_name, + "topic2", + 19, + "ignored", + [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}] + ] + end) + end + + test "(topic, key, value) produces a message to the specific topic with specified client", %{producers: producers} do + Enum.each(producers, fn {config_key, %{client_name: client_name}} -> + :ok = Producer.produce_sync_with_client(config_key, "topic2", "key8", "value") + assert_receive [:produce_sync, ^client_name, "topic2", 17, "key8", "value"] + end) + end + + test "(topic, partition, key, value) produces a message to the specific topic/partition with specified client", + %{producers: producers} do + Enum.each(producers, fn {config_key, %{client_name: client_name}} -> + partition = 99 + :ok = Producer.produce_sync_with_client(config_key, "topic2", partition, "key", "value") + assert_receive [:produce_sync, ^client_name, "topic2", ^partition, "key", "value"] + end) + end + + test "(topic, partition, key, message_list) produces a list of messages to the specific topic/partition with specified client", + %{producers: producers} do + Enum.each(producers, fn {config_key, %{client_name: client_name}} -> + partition = 99 + + :ok = + Producer.produce_sync_with_client(config_key, "topic2", partition, [{"key8", "value1"}, {"key12", "value2"}]) + + assert_receive [ + :produce_sync, + ^client_name, + "topic2", + ^partition, + "ignored", + [{_ts1, "key8", "value1"}, {_ts2, "key12", "value2"}] + ] + end) + end + + test "passes through the result", %{producers: producers} do + Enum.each(producers, fn {config_key, _} -> + TestBrod.set_produce_response(response = {:error, {:producer_down, :noproc}}) + assert response == Producer.produce_sync_with_client(config_key, "key", "value") + end) + end + + test "raises an error if a non-existent config key is passed" do + invalid_config_key = :unknown_producer + config = Application.get_env(:kaffe, :producers) + + assert_raise KeyError, "key #{inspect(invalid_config_key)} not found in: #{inspect(config, pretty: true)}", fn -> + :ok = Producer.produce_sync_with_client(invalid_config_key, "key8", "value") + end + end + end + describe "partition selection" do test "random" do - update_producer_config(:partition_strategy, :random) + update_producer_config(@default_client_config_key, :partition_strategy, :random) :ok = Producer.produce_sync("topic2", "key", "value") - assert_receive [:produce_sync, "topic2", random_partition, "key", "value"] + assert_receive [:produce_sync, @default_client_name, "topic2", random_partition, "key", "value"] assert 0 <= random_partition && random_partition <= 32 :ok = Producer.produce_sync("topic2", "key", "value") - assert_receive [:produce_sync, "topic2", random_partition, "key", "value"] + assert_receive [:produce_sync, @default_client_name, "topic2", random_partition, "key", "value"] assert 0 <= random_partition && random_partition <= 32 :ok = Producer.produce_sync("topic2", "key", "value") - assert_receive [:produce_sync, "topic2", random_partition, "key", "value"] + assert_receive [:produce_sync, @default_client_name, "topic2", random_partition, "key", "value"] assert 0 <= random_partition && random_partition <= 32 end @@ -89,21 +256,21 @@ defmodule Kaffe.ProducerTest do test "md5" do System.put_env("KAFFE_PRODUCER_USER", "Alice") System.put_env("KAFFE_PRODUCER_PASSWORD", "ecilA") - update_producer_config(:partition_strategy, :md5) + update_producer_config(@default_client_config_key, :partition_strategy, :md5) :ok = Producer.produce_sync("topic2", "key1", "value") - assert_receive [:produce_sync, "topic2", partition1, "key1", "value"] + assert_receive [:produce_sync, @default_client_name, "topic2", partition1, "key1", "value"] assert 0 <= partition1 && partition1 <= 32, "The partition should be in the range" :ok = Producer.produce_sync("topic2", "key1", "value") - assert_receive [:produce_sync, "topic2", ^partition1, "key1", "value"] + assert_receive [:produce_sync, @default_client_name, "topic2", ^partition1, "key1", "value"] # "Should receive the same partition for the same key" :ok = Producer.produce_sync("topic2", "key2", "value") - assert_receive [:produce_sync, "topic2", partition2, "key2", "value"] + assert_receive [:produce_sync, @default_client_name, "topic2", partition2, "key2", "value"] assert partition1 != partition2, "Partitions should vary" @@ -118,26 +285,27 @@ defmodule Kaffe.ProducerTest do 0 end - update_producer_config(:partition_strategy, choose_partition) + update_producer_config(@default_client_config_key, :partition_strategy, choose_partition) :ok = Producer.produce_sync("topic", "key", "value") - assert_receive [:produce_sync, "topic", 0, "key", "value"] + assert_receive [:produce_sync, @default_client_name, "topic", 0, "key", "value"] :ok = Producer.produce_sync("topic", "key", "value") - assert_receive [:produce_sync, "topic", 0, "key", "value"] + assert_receive [:produce_sync, @default_client_name, "topic", 0, "key", "value"] end test "does not use a selection strategy when given a direct partition" do :ok = Producer.produce_sync("topic", 0, "key", "value") - assert_receive [:produce_sync, "topic", 0, "key", "value"] + assert_receive [:produce_sync, @default_client_name, "topic", 0, "key", "value"] :ok = Producer.produce_sync("topic", 0, "key", "value") - assert_receive [:produce_sync, "topic", 0, "key", "value"] + assert_receive [:produce_sync, @default_client_name, "topic", 0, "key", "value"] end end - defp update_producer_config(key, value) do - producer_config = Application.get_env(:kaffe, :producer) - Application.put_env(:kaffe, :producer, put_in(producer_config, [key], value)) + defp update_producer_config(config_key, key, value) do + producers_config = Application.get_env(:kaffe, :producers) + Application.put_env(:kaffe, :producers, put_in(producers_config, [config_key, key], value)) + on_exit(fn -> Application.put_env(:kaffe, :producers, producers_config) end) end end diff --git a/test/support/test_brod.ex b/test/support/test_brod.ex index 5c40a54..74cec00 100644 --- a/test/support/test_brod.ex +++ b/test/support/test_brod.ex @@ -22,8 +22,8 @@ defmodule TestBrod do {:ok, Process.whereis(TestBrod)} end - def produce_sync(_client, topic, partition, key, value) do - GenServer.call(TestBrod, {:produce_sync, topic, partition, key, value}) + def produce_sync(client, topic, partition, key, value) do + GenServer.call(TestBrod, {:produce_sync, client, topic, partition, key, value}) end def get_partitions_count(_client, _topic), do: {:ok, @test_partition_count} @@ -36,8 +36,8 @@ defmodule TestBrod do {:ok, %{produce_response: :ok}} end - def handle_call({:produce_sync, topic, partition, key, value}, _from, state) do - send(:test_case, [:produce_sync, topic, partition, key, value]) + def handle_call({:produce_sync, client, topic, partition, key, value}, _from, state) do + send(:test_case, [:produce_sync, client, topic, partition, key, value]) {:reply, state.produce_response, state} end