Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 50 additions & 20 deletions lib/kaffe/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ defmodule Kaffe.Producer do
## public api
## -------------------------------------------------------------------------

def start_producer_client do
@kafka.start_client(config().endpoints, client_name(), config().producer_config)
def start_producer_client(client_name \\ client_name()) do
@kafka.start_client(config().endpoints, client_name, config().producer_config)
end

def stop_producer_client(client_name \\ client_name()) do
@kafka.stop_client(client_name)
end

@doc """
Expand All @@ -62,8 +66,12 @@ defmodule Kaffe.Producer do
* `:ok` on successfully producing each message
* `{:error, reason}` for any error
"""
def produce_to(client_name, topic, message_list, opts \\ []) do
produce_list(client_name, topic, message_list, partition_strategy_from(opts))
end

def produce(topic, message_list, opts \\ []) do
produce_list(topic, message_list, partition_strategy_from(opts))
produce_list(client_name(), topic, message_list, partition_strategy_from(opts))
end

@doc """
Expand All @@ -76,8 +84,12 @@ defmodule Kaffe.Producer do
* `:ok` on successfully producing each message
* `{:error, reason}` for any error
"""
def produce_sync_to(client_name, topic, message_list) when is_list(message_list) do
produce_list(client_name, topic, message_list, global_partition_strategy())
end

def produce_sync(topic, message_list) when is_list(message_list) do
produce_list(topic, message_list, global_partition_strategy())
produce_list(client_name(), topic, message_list, global_partition_strategy())
end

@doc """
Expand All @@ -91,9 +103,14 @@ defmodule Kaffe.Producer do
* `:ok` on successfully producing the message
* `{:error, reason}` for any error
"""
def produce_sync_to(client_name, key, value) do
topic = config().topics |> List.first()
produce_value(client_name, topic, key, value)
end

def produce_sync(key, value) do
topic = config().topics |> List.first()
produce_value(topic, key, value)
produce_value(client_name(), topic, key, value)
end

@doc """
Expand All @@ -106,24 +123,36 @@ defmodule Kaffe.Producer do
* `:ok` on successfully producing each message
* `{:error, reason}` for any error
"""
def produce_sync_to(client_name, topic, partition, message_list) when is_list(message_list) do
produce_list(client_name, topic, message_list, fn _, _, _, _ -> partition end)
end

def produce_sync(topic, partition, message_list) when is_list(message_list) do
produce_list(topic, message_list, fn _, _, _, _ -> partition end)
produce_list(client_name(), topic, message_list, fn _, _, _, _ -> partition end)
end

@doc """
Synchronously produce the `key`/`value` to `topic`

See `produce_sync/2` for returns.
"""
def produce_sync_to(client_name, topic, key, value) do
produce_value(client_name, topic, key, value)
end

def produce_sync(topic, key, value) do
produce_value(topic, key, value)
produce_value(client_name(), topic, key, value)
end

@doc """
Synchronously produce the given `key`/`value` to the `topic`/`partition`

See `produce_sync/2` for returns.
"""
def produce_sync_to(client_name, topic, partition, key, value) do
@kafka.produce_sync(client_name, topic, partition, key, value)
end

def produce_sync(topic, partition, key, value) do
@kafka.produce_sync(client_name(), topic, partition, key, value)
end
Expand All @@ -132,34 +161,35 @@ defmodule Kaffe.Producer do
## internal
## -------------------------------------------------------------------------

defp produce_list(topic, message_list, partition_strategy) when is_list(message_list) do
defp produce_list(client_name, topic, message_list, partition_strategy) when is_list(message_list) do
Logger.debug("event#produce_list topic=#{topic}")

message_list
|> add_timestamp
|> group_by_partition(topic, partition_strategy)
|> group_by_partition(client_name, topic, partition_strategy)
|> case do
messages = %{} -> produce_list_to_topic(messages, topic)
messages = %{} ->
produce_list_to_topic(client_name, messages, topic)

{:error, reason} ->
Logger.warn("Error while grouping by partition #{inspect(reason)}")
{:error, reason}
end
end

defp produce_value(topic, key, value) do
case @kafka.get_partitions_count(client_name(), topic) do
defp produce_value(client_name, topic, key, value) do
case @kafka.get_partitions_count(client_name, topic) do
{:ok, partitions_count} ->
partition = choose_partition(topic, partitions_count, key, value, global_partition_strategy())

Logger.debug(
"event#produce topic=#{topic} key=#{key} partitions_count=#{partitions_count} selected_partition=#{partition}"
)

@kafka.produce_sync(client_name(), topic, partition, key, value)
@kafka.produce_sync(client_name, topic, partition, key, value)

error ->
Logger.warn(
"event#produce topic=#{topic} key=#{key} error=#{inspect(error)}"
)
Logger.warn("event#produce topic=#{topic} key=#{key} error=#{inspect(error)}")

error
end
Expand All @@ -177,8 +207,8 @@ defmodule Kaffe.Producer do

defp add_timestamp_to_message({key, message}), do: {System.system_time(:millisecond), key, message}

defp group_by_partition(messages, topic, partition_strategy) do
with {:ok, partitions_count} <- @kafka.get_partitions_count(client_name(), topic) do
defp group_by_partition(messages, client_name, topic, partition_strategy) do
with {:ok, partitions_count} <- @kafka.get_partitions_count(client_name, topic) do
messages
|> Enum.group_by(fn
{_timestamp, key, message} ->
Expand All @@ -190,12 +220,12 @@ defmodule Kaffe.Producer do
end
end

defp produce_list_to_topic(message_list, topic) do
defp produce_list_to_topic(client_name, message_list, topic) do
message_list
|> Enum.reduce_while(:ok, fn {partition, messages}, :ok ->
Logger.debug("event#produce_list_to_topic topic=#{topic} partition=#{partition}")

case @kafka.produce_sync(client_name(), topic, partition, "ignored", messages) do
case @kafka.produce_sync(client_name, topic, partition, "ignored", messages) do
:ok -> {:cont, :ok}
{:error, _reason} = error -> {:halt, error}
end
Expand Down