Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 23 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,31 +86,26 @@ Batch message consumers receive a list of messages and work as part of the `:bro
# optional
async_message_ack: false, # see "async message acknowledgement" below
start_with_earliest_message: true # default false
ssl: [
certfile: '/etc/kafka/ssl/cert.pem',
keyfile: '/etc/kafka/ssl/key.pem',
cacertfile: '/etc/kafka/ssl/ca.pem'
# See http://erlang.org/doc/man/ssl.html for other options
]
],
```

The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted then your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages.

### Heroku Configuration

To configure a Kaffe Consumer for a Heroku Kafka compatible environment including SSL omit the `endpoint` and instead set `heroku_kafka_env: true`

```elixir
config :kaffe,
consumer: [
heroku_kafka_env: true,
topics: ["interesting-topic"],
consumer_group: "your-app-consumer-group",
message_handler: MessageProcessor
]
```
### Heroku Configuration

With that setting in place Kaffe will automatically pull required info from the following ENV variables:
To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables:

- `KAFKA_URL`
- `KAFKA_CLIENT_CERT`
- `KAFKA_CLIENT_CERT_KEY`
- `KAFKA_TRUSTED_CERT` (not used yet)
- `KAFKA_URL`: `kafka://host:port` (default port 9092) or `kafka+ssl://host:port` (default port 9093)
- `KAFKA_CLIENT_CERT`: PEM encoded client certificate
- `KAFKA_CLIENT_CERT_KEY`: PEM encoded client private key
- `KAFKA_TRUSTED_CERT`: PEM encoded CA cert(s)

3. Add `Kaffe.Consumer` as a worker in your supervision tree

Expand Down Expand Up @@ -239,6 +234,12 @@ Configure your Kaffe Producer in your mix config

# optional
partition_strategy: :md5
ssl: [
certfile: '/etc/kafka/ssl/cert.pem',
keyfile: '/etc/kafka/ssl/key.pem',
cacertfile: '/etc/kafka/ssl/ca.pem'
# See http://erlang.org/doc/man/ssl.html for other options
]
]
```

Expand All @@ -252,25 +253,12 @@ You can also set any of the Brod producer configuration options in the `producer

### Heroku Configuration

To configure a Kaffe Producer for a Heroku Kafka compatible environment including SSL omit the `endpoint` and instead set `heroku_kafka_env: true`

```elixir
config :kaffe,
producer: [
heroku_kafka_env: true,
topics: ["kafka-topic"],

# optional
partition_strategy: :md5
]
```

With that setting in place Kaffe will automatically pull required info from the following ENV variables:
To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables:

- `KAFKA_URL`
- `KAFKA_CLIENT_CERT`
- `KAFKA_CLIENT_CERT_KEY`
- `KAFKA_TRUSTED_CERT`
- `KAFKA_URL`: `kafka://host:port` (default port 9092) or `kafka+ssl://host:port` (default port 9093)
- `KAFKA_CLIENT_CERT`: PEM encoded client certificate
- `KAFKA_CLIENT_CERT_KEY`: PEM encoded client private key
- `KAFKA_TRUSTED_CERT`: PEM encoded CA cert(s)

### Producing to Kafka

Expand Down
93 changes: 45 additions & 48 deletions lib/kaffe/config.ex
Original file line number Diff line number Diff line change
@@ -1,67 +1,64 @@
defmodule Kaffe.Config do
def heroku_kafka_endpoints do
"KAFKA_URL"
|> System.get_env
|> heroku_kafka_endpoints
end

def heroku_kafka_endpoints(kafka_url) do
kafka_url
|> String.replace("kafka+ssl://", "")
|> String.replace("kafka://", "")
|> String.split(",")
|> Enum.map(&url_endpoint_to_tuple/1)
@doc "Get list of endpoints in brod format"
@spec endpoints(Keyword.t) :: list({atom, non_neg_integer})
def endpoints(config) do
parse_kafka_urls(System.get_env("KAFKA_URL")) || Keyword.get(config, :endpoints, [])
end

def url_endpoint_to_tuple(endpoint) do
[ip, port] = endpoint |> String.split(":")
{ip |> String.to_atom, port |> String.to_integer}
@doc "Parse Kafka URL(s) into list of endpoints"
@spec parse_kafka_urls(binary | nil) :: list({atom, non_neg_integer}) | nil
def parse_kafka_urls(nil), do: nil
def parse_kafka_urls(urls) do
for url <- String.split(urls, ","), do: parse_kafka_url(url)
end

def ssl_config do
ssl_config(client_cert(), client_cert_key())
end
@doc "Parse Kafka URL into {host, port}"
@spec parse_kafka_url(binary) :: {atom, non_neg_integer}
def parse_kafka_url(<<"kafka://", host :: binary>>), do: parse_kafka_host(host, 9092)
def parse_kafka_url(<<"kafka+ssl://", host ::binary>>), do: parse_kafka_host(host, 9093)

def ssl_config(_client_cert=nil, _client_cert_key=nil) do
[]
@spec parse_kafka_host(binary, non_neg_integer) :: {atom, non_neg_integer}
defp parse_kafka_host(host_port, default_port) do
case String.split(host_port, ":") do
[host, port] -> {String.to_atom(host), String.to_integer(port)}
[host] -> {String.to_atom(host), default_port}
end
end

def ssl_config(client_cert, client_cert_key) do
[
ssl: [
cert: client_cert,
key: client_cert_key,
]
]
@doc "Get ssl options from config and OS environment vars"
@spec ssl(Keyword.t) :: Keyword.t
def ssl(config) do
ssl_defaults = config[:ssl] || []
ssl_options(ssl_cert_option(System.get_env("KAFKA_CLIENT_CERT")) ++
ssl_key_option(System.get_env("KAFKA_CLIENT_CERT_KEY")) ++
ssl_cacerts_option(System.get_env("KAFKA_TRUSTED_CERT")) ++
ssl_defaults)
end

def client_cert do
case System.get_env("KAFKA_CLIENT_CERT") do
nil -> nil
cert -> extract_der(cert)
end
end
@spec ssl_options(Keyword.t) :: Keyword.t
defp ssl_options([]), do: []
defp ssl_options(options), do: [ssl: options]

def client_cert_key do
case System.get_env("KAFKA_CLIENT_CERT_KEY") do
nil -> nil
cert_key -> extract_type_and_der(cert_key)
end
@spec ssl_cert_option(binary | nil) :: Keyword.t
def ssl_cert_option(nil), do: []
def ssl_cert_option(pem) do
[{_type, der, _cypher_info} | _] = :public_key.pem_decode(pem)
[cert: der]
end

def decode_pem(pem) do
pem
|> :public_key.pem_decode
|> List.first
@spec ssl_key_option(binary | nil) :: Keyword.t
def ssl_key_option(nil), do: []
def ssl_key_option(pem) do
[{type, der, _cypher_info} | _] = :public_key.pem_decode(pem)
[key: {type, der}]
end

def extract_der(cert) do
{_type, der, _} = decode_pem(cert)
der
@spec ssl_cacerts_option(binary | nil) :: Keyword.t
def ssl_cacerts_option(nil), do: []
def ssl_cacerts_option(pem) do
certs = for {_type, der, _cypher_info} <- :public_key.pem_decode(pem), do: der
[cacerts: certs]
end

def extract_type_and_der(cert_key) do
{type, der_cert, _} = decode_pem(cert_key)
{type, der_cert}
end
end
37 changes: 9 additions & 28 deletions lib/kaffe/config/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,7 @@ defmodule Kaffe.Config.Consumer do

def async_message_ack, do: config_get(:async_message_ack, false)

def endpoints do
case heroku_kafka?() do
true -> Kaffe.Config.heroku_kafka_endpoints()
false -> config_get!(:endpoints)
end
end
def endpoints, do: Kaffe.Config.endpoints(config())

def consumer_group_config do
[
Expand All @@ -59,8 +54,7 @@ defmodule Kaffe.Config.Consumer do
end

def client_consumer_config do
default_client_consumer_config()
++ maybe_heroku_kafka_ssl()
default_client_consumer_config() ++ Kaffe.Config.ssl(config())
end

def default_client_consumer_config do
Expand All @@ -72,9 +66,10 @@ defmodule Kaffe.Config.Consumer do
end

def begin_offset do
case config_get(:start_with_earliest_message, false) do
true -> :earliest
false -> -1
if config_get(:start_with_earliest_message, false) do
:earliest
else
-1
end
end

Expand All @@ -86,24 +81,10 @@ defmodule Kaffe.Config.Consumer do
config_get(:worker_allocation_strategy, :worker_per_partition)
end

def maybe_heroku_kafka_ssl do
case heroku_kafka?() do
true -> Kaffe.Config.ssl_config()
false -> []
end
end
def config, do: Application.get_env(:kaffe, :consumer)

def heroku_kafka? do
config_get(:heroku_kafka_env, false)
end
def config_get!(key), do: Keyword.fetch!(config(), key)

def config_get!(key) do
Application.get_env(:kaffe, :consumer)
|> Keyword.fetch!(key)
end
def config_get(key, default), do: Keyword.get(config(), key, default)

def config_get(key, default) do
Application.get_env(:kaffe, :consumer)
|> Keyword.get(key, default)
end
end
32 changes: 6 additions & 26 deletions lib/kaffe/config/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,10 @@ defmodule Kaffe.Config.Producer do

def producer_topics, do: config_get!(:topics)

def endpoints do
case heroku_kafka?() do
true -> Kaffe.Config.heroku_kafka_endpoints()
false -> config_get!(:endpoints)
end
end
def endpoints, do: Kaffe.Config.endpoints(config())

def client_producer_config do
default_client_producer_config()
++ maybe_heroku_kafka_ssl()
end

def maybe_heroku_kafka_ssl do
case heroku_kafka?() do
true -> Kaffe.Config.ssl_config()
false -> []
end
default_client_producer_config() ++ Kaffe.Config.ssl(config())
end

def default_client_producer_config do
Expand All @@ -48,17 +35,10 @@ defmodule Kaffe.Config.Producer do
]
end

def heroku_kafka? do
config_get(:heroku_kafka_env, false)
end
def config, do: Application.get_env(:kaffe, :producer)

def config_get!(key) do
Application.get_env(:kaffe, :producer)
|> Keyword.fetch!(key)
end
def config_get!(key), do: Keyword.fetch!(config(), key)

def config_get(key, default), do: Keyword.get(config(), key, default)

def config_get(key, default) do
Application.get_env(:kaffe, :producer)
|> Keyword.get(key, default)
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule Kaffe.Mixfile do
[
{:brod, "~> 3.0"},
{:ex_doc, "~> 0.14", only: :dev, runtime: false},
{:meck, "~> 0.8.8", only: :test, runtime: false},
]
end

Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]},
"kafka_protocol": {:hex, :kafka_protocol, "1.1.0", "817c07a6339cbfb32d1f20a588353bf8d9a8944df296eb2e930360b83760c171", [:rebar, :rebar3], [{:snappyer, "1.2.1", [repo: "hexpm", hex: :snappyer, optional: false]}], "hexpm"},
"logfmt": {:hex, :logfmt, "3.2.0", "887a091adad28acc6e4d8b3d3bce177b934e7c61e7655c86946410f44aca6d84", [:mix], []},
"meck": {:hex, :meck, "0.8.9", "64c5c0bd8bcca3a180b44196265c8ed7594e16bcc845d0698ec6b4e577f48188", [], [], "hexpm"},
"metrix": {:git, "https://github.com/rwdaigle/metrix.git", "a6738df9346da0412ca68f82a24a67d2a32b066e", [branch: "master"]},
"snappyer": {:hex, :snappyer, "1.2.1", "06c5f5c8afe80ba38e94e1ca1bd9253de95d8f2c85b08783e8d0f63815580556", [:make, :rebar, :rebar3], [], "hexpm"},
"supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], []}}
1 change: 1 addition & 0 deletions test/data/ca.der
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
MIIE4jCCAsoCCQDYI7B+PTiZPDANBgkqhkiG9w0BAQsFADAzMQswCQYDVQQGEwJVUzEOMAwGA1UECgwFS2Fma2ExFDASBgNVBAMMC0NOPUthZmthIENBMB4XDTE3MTIwNjExMTQyOVoXDTE4MTIwNjExMTQyOVowMzELMAkGA1UEBhMCVVMxDjAMBgNVBAoMBUthZmthMRQwEgYDVQQDDAtDTj1LYWZrYSBDQTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBALzYrZmxwZbrIwh7dEzyD844X4dkJlVa2Hl+9LUrRFif1mbqkmcEhyHpdAfovV85OoJrp8FTp4GhXqOQM1qu1OoZdT9+x842h7F0kM524Ax9Z7nBzjrLBZaaTEOzaug6HDz/jT+NDqVrPPqLoft1hxG801FCHlmz2wRS+K/RvicbiC574PVw9lvVbZfsmUKARhj2Z81NByixKP1VFQrp6aAhwn6f44OKXs9i0twqypbA7LHw07QPedz6ebDtkQqr9Nz7LK41qXwUMm8lozgn+X2RNoRqyklT7xyp3i271huCO70mYK6gRccLjdjATBHsevngmeehr2jhr2IST8Xvfa5fYAku783/GZxE57sLTV+vzQ++Dube/xVX/bCp3lJ7rpbavdSkHX3Myw+BMiH5XPeshzaPXuNU0rTQtUNpRzBp4dcQRVsuHUCuJHA7PnmO/4KMW7EwLKRoemcHI1wPEAOLHDvgVl2kGBwVLrphw13aVpt2QG+r2pnfk5yGugoOKH2o+l/SIQzadGues7zvPhIC08hl5j8TE++gdSur9yWDfiU3mjubn1d7bmgnxzH67Yxj2iwMVV+5dtwHpD3UM8o8fCA1D7KA/UXLzFWoN8/tM6xmfeX+SnihjfchQk4OW1lUL7Yf1HQqexlK8BiD4e+k63MwhpKVXk84fzCa8y1BAgMBAAEwDQYJKoZIhvcNAQELBQADggIBADtmo5aPCoVwGGTEcM1HDx6le8BehOOBm0Hqkku21vsHyv+L65PGZ+1/1ilXOhsloaQngorxijXlEchaOJ6It8aev65m1+qovlLrJGXYMHOpRJ/cKfWDgKPCZ1hgbbtZ2UG4VUQqYklVfqSLPF7oO1PnVKamecQ+9PZpyPX+lHz+CN7gk1bw2jcjZoi8VLuD1/QQ1rwhRBgkkj+MArjzBaxO996VN54EhHNHdmGXc5hhun3USXMKniiGot/xUjVus0aSxZNRpScHstpScki36w/tT/e6hHKi59hqRV09FN5EjlAHRjQKxYruqzTUKG0q/f0Al2asIZ04+VRLbANdcdvNNDcwJRUZirasrEVxy9lP5LW4NtVHNOjmTZY1AEQTTlkvwo4ZczFVsmMaJ6FTJ6rYIC87sDf5USM/Yil0fjq7fO/oAdOVNUjArStaogFUWWuM/EFBeO5U72i2ohPz0rCl1R+GWdrmmdbnHdjIQB2Kj79Vnd/GnJcV4tZVqYbpBtp1zlWxlWo4sjVV1+45JxvGskWSIb0lVVSq/HOrKRq54zxFivCUoVR4PQC3a8qdx6o2/COxHfccNdfxBK4Vj2uX/1L2EmbEHl09duLdZH7THevaSz25eByX7eTLIRGIIyTbyTgz/RI6G3itW4ZdMG9CwuwTmi/FqO/Xoffmjo06
29 changes: 29 additions & 0 deletions test/data/ca.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-----BEGIN CERTIFICATE-----
MIIE4jCCAsoCCQDYI7B+PTiZPDANBgkqhkiG9w0BAQsFADAzMQswCQYDVQQGEwJV
UzEOMAwGA1UECgwFS2Fma2ExFDASBgNVBAMMC0NOPUthZmthIENBMB4XDTE3MTIw
NjExMTQyOVoXDTE4MTIwNjExMTQyOVowMzELMAkGA1UEBhMCVVMxDjAMBgNVBAoM
BUthZmthMRQwEgYDVQQDDAtDTj1LYWZrYSBDQTCCAiIwDQYJKoZIhvcNAQEBBQAD
ggIPADCCAgoCggIBALzYrZmxwZbrIwh7dEzyD844X4dkJlVa2Hl+9LUrRFif1mbq
kmcEhyHpdAfovV85OoJrp8FTp4GhXqOQM1qu1OoZdT9+x842h7F0kM524Ax9Z7nB
zjrLBZaaTEOzaug6HDz/jT+NDqVrPPqLoft1hxG801FCHlmz2wRS+K/RvicbiC57
4PVw9lvVbZfsmUKARhj2Z81NByixKP1VFQrp6aAhwn6f44OKXs9i0twqypbA7LHw
07QPedz6ebDtkQqr9Nz7LK41qXwUMm8lozgn+X2RNoRqyklT7xyp3i271huCO70m
YK6gRccLjdjATBHsevngmeehr2jhr2IST8Xvfa5fYAku783/GZxE57sLTV+vzQ++
Dube/xVX/bCp3lJ7rpbavdSkHX3Myw+BMiH5XPeshzaPXuNU0rTQtUNpRzBp4dcQ
RVsuHUCuJHA7PnmO/4KMW7EwLKRoemcHI1wPEAOLHDvgVl2kGBwVLrphw13aVpt2
QG+r2pnfk5yGugoOKH2o+l/SIQzadGues7zvPhIC08hl5j8TE++gdSur9yWDfiU3
mjubn1d7bmgnxzH67Yxj2iwMVV+5dtwHpD3UM8o8fCA1D7KA/UXLzFWoN8/tM6xm
feX+SnihjfchQk4OW1lUL7Yf1HQqexlK8BiD4e+k63MwhpKVXk84fzCa8y1BAgMB
AAEwDQYJKoZIhvcNAQELBQADggIBADtmo5aPCoVwGGTEcM1HDx6le8BehOOBm0Hq
kku21vsHyv+L65PGZ+1/1ilXOhsloaQngorxijXlEchaOJ6It8aev65m1+qovlLr
JGXYMHOpRJ/cKfWDgKPCZ1hgbbtZ2UG4VUQqYklVfqSLPF7oO1PnVKamecQ+9PZp
yPX+lHz+CN7gk1bw2jcjZoi8VLuD1/QQ1rwhRBgkkj+MArjzBaxO996VN54EhHNH
dmGXc5hhun3USXMKniiGot/xUjVus0aSxZNRpScHstpScki36w/tT/e6hHKi59hq
RV09FN5EjlAHRjQKxYruqzTUKG0q/f0Al2asIZ04+VRLbANdcdvNNDcwJRUZiras
rEVxy9lP5LW4NtVHNOjmTZY1AEQTTlkvwo4ZczFVsmMaJ6FTJ6rYIC87sDf5USM/
Yil0fjq7fO/oAdOVNUjArStaogFUWWuM/EFBeO5U72i2ohPz0rCl1R+GWdrmmdbn
HdjIQB2Kj79Vnd/GnJcV4tZVqYbpBtp1zlWxlWo4sjVV1+45JxvGskWSIb0lVVSq
/HOrKRq54zxFivCUoVR4PQC3a8qdx6o2/COxHfccNdfxBK4Vj2uX/1L2EmbEHl09
duLdZH7THevaSz25eByX7eTLIRGIIyTbyTgz/RI6G3itW4ZdMG9CwuwTmi/FqO/X
offmjo06
-----END CERTIFICATE-----
15 changes: 15 additions & 0 deletions test/data/cert.der
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
MIIDATCCAemgAwIBAgIEdz2C7zANBgkqhkiG9w0BAQsFADAxMQswCQYDVQQGEwJVUzEOMAwGA1UE
ChMFS2Fma2ExEjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0xNzEyMDgwNDA5MDlaFw0xODEyMDgwNDA5
MDlaMDExCzAJBgNVBAYTAlVTMQ4wDAYDVQQKEwVLYWZrYTESMBAGA1UEAxMJbG9jYWxob3N0MIIB
IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAlE3difFwZjIOgRtRHY1SKvTbBmn3N+3jfOYE
JUibWZNrEGJcUARRQpFGDs7svp9KLan2D7S2Ss4OvwnD4DmdLMqto6mYx/4Di2zF8GZOci7Qz03v
7pEpgKGzHz77SvpmVYK3lk6M+LvapH6Q6AlPDUieRLc4JWsDoMRDTG4/9lSsZZCP6WyF9yK+Xl0I
PsPvVmEjXzxPq/Lc5+9Cy4M+KIGlaXzGU0zE3xFwdlBUUmwml6SbFRVW7uwZEaCkqX6wZbFb4NQx
1qYGKhz034inrqtLqbFRHCDrGJO+yVparsFnHjbYzck0BLX9Xm2TOVCpavYD7YOc6FHnDlhl9kqQ
eQIDAQABoyEwHzAdBgNVHQ4EFgQUuKYCPpnTs+2pvBRzGEbRR3Bk0TgwDQYJKoZIhvcNAQELBQAD
ggEBABvrkVnc5wu9hjkdW+rEW2b3FqmUPinxvly8fe57OpWElq22jdgVpiaftMB8fDn4NVOdOU2R
QQ1/wG1/5K8CnHNFEvpNVyE7lo7qD5AvG8RXFlX6UoBX682icw6r+3NTDRiTTKJm4fpLT77dsBhK
HfurS8C2qfhcwBmcA3cgrPgrsxs6OxAbKr9HlJQmwYTSqhM2Yd7e06ew2tDeIfUl1GZAAp11/gtF
tumcWqON8L/qKYmJO9SQzexcjVYS3yXwdS0bMv5r/D2jmbFWw8jiqU+BSTE99c/xOxMvLbswuyyy
YhJQ2EQ4Z+ZnlvEDGM5uicSya/OiscxGYtq2BQ0k7xk=

Loading