From 00ec5ce8fb61123a86a9b7d008dc7b8635b68cd7 Mon Sep 17 00:00:00 2001 From: Jake Morrison Date: Fri, 8 Dec 2017 14:25:03 +0800 Subject: [PATCH 1/5] Add parens to silence warnings in Elixir 1.5 --- lib/kaffe/config/consumer.ex | 36 ++++++++++++++++++------------------ lib/kaffe/config/producer.ex | 18 +++++++++--------- lib/kaffe/consumer.ex | 2 +- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/lib/kaffe/config/consumer.ex b/lib/kaffe/config/consumer.ex index 5fb924f..6be102a 100644 --- a/lib/kaffe/config/consumer.ex +++ b/lib/kaffe/config/consumer.ex @@ -1,16 +1,16 @@ defmodule Kaffe.Config.Consumer do def configuration do %{ - endpoints: endpoints, - subscriber_name: subscriber_name, - consumer_group: consumer_group, - topics: topics, - group_config: consumer_group_config, - consumer_config: client_consumer_config, - message_handler: message_handler, - async_message_ack: async_message_ack, - rebalance_delay_ms: rebalance_delay_ms, - max_bytes: max_bytes, + endpoints: endpoints(), + subscriber_name: subscriber_name(), + consumer_group: consumer_group(), + topics: topics(), + group_config: consumer_group_config(), + consumer_config: client_consumer_config(), + message_handler: message_handler(), + async_message_ack: async_message_ack(), + rebalance_delay_ms: rebalance_delay_ms(), + max_bytes: max_bytes(), subscriber_retries: subscriber_retries(), subscriber_retry_delay_ms: subscriber_retry_delay_ms(), offset_reset_policy: offset_reset_policy(), @@ -20,7 +20,7 @@ defmodule Kaffe.Config.Consumer do def consumer_group, do: config_get!(:consumer_group) - def subscriber_name, do: config_get(:subscriber_name, consumer_group) |> String.to_atom + def subscriber_name, do: config_get(:subscriber_name, consumer_group()) |> String.to_atom def topics, do: config_get!(:topics) @@ -29,8 +29,8 @@ defmodule Kaffe.Config.Consumer do def async_message_ack, do: config_get(:async_message_ack, false) def endpoints do - case heroku_kafka? do - true -> Kaffe.Config.heroku_kafka_endpoints + case heroku_kafka?() do + true -> Kaffe.Config.heroku_kafka_endpoints() false -> config_get!(:endpoints) end end @@ -59,15 +59,15 @@ defmodule Kaffe.Config.Consumer do end def client_consumer_config do - default_client_consumer_config - ++ maybe_heroku_kafka_ssl + default_client_consumer_config() + ++ maybe_heroku_kafka_ssl() end def default_client_consumer_config do [ auto_start_producers: false, allow_topic_auto_creation: false, - begin_offset: begin_offset, + begin_offset: begin_offset(), ] end @@ -87,8 +87,8 @@ defmodule Kaffe.Config.Consumer do end def maybe_heroku_kafka_ssl do - case heroku_kafka? do - true -> Kaffe.Config.ssl_config + case heroku_kafka?() do + true -> Kaffe.Config.ssl_config() false -> [] end end diff --git a/lib/kaffe/config/producer.ex b/lib/kaffe/config/producer.ex index 6e670d0..183ddea 100644 --- a/lib/kaffe/config/producer.ex +++ b/lib/kaffe/config/producer.ex @@ -1,10 +1,10 @@ defmodule Kaffe.Config.Producer do def configuration do %{ - endpoints: endpoints, - producer_config: client_producer_config, + endpoints: endpoints(), + producer_config: client_producer_config(), client_name: config_get(:client_name, :kaffe_producer_client), - topics: producer_topics, + topics: producer_topics(), partition_strategy: config_get(:partition_strategy, :md5), } end @@ -12,20 +12,20 @@ defmodule Kaffe.Config.Producer do def producer_topics, do: config_get!(:topics) def endpoints do - case heroku_kafka? do - true -> Kaffe.Config.heroku_kafka_endpoints + case heroku_kafka?() do + true -> Kaffe.Config.heroku_kafka_endpoints() false -> config_get!(:endpoints) end end def client_producer_config do - default_client_producer_config - ++ maybe_heroku_kafka_ssl + default_client_producer_config() + ++ maybe_heroku_kafka_ssl() end def maybe_heroku_kafka_ssl do - case heroku_kafka? do - true -> Kaffe.Config.ssl_config + case heroku_kafka?() do + true -> Kaffe.Config.ssl_config() false -> [] end end diff --git a/lib/kaffe/consumer.ex b/lib/kaffe/consumer.ex index de3639f..0eb8f2a 100644 --- a/lib/kaffe/consumer.ex +++ b/lib/kaffe/consumer.ex @@ -132,7 +132,7 @@ defmodule Kaffe.Consumer do end def handle_message(topic, partition, msg, %{async: true, message_handler: handler} = state) do - :ok = apply(handler, :handle_message, [self, compile_message(msg, topic, partition)]) + :ok = apply(handler, :handle_message, [self(), compile_message(msg, topic, partition)]) {:ok, state} end From 25d0dfb3df11ce67b70a6a6f8c42f02494d41606 Mon Sep 17 00:00:00 2001 From: Jake Morrison Date: Fri, 8 Dec 2017 21:26:43 +0800 Subject: [PATCH 2/5] Support file based ssl config --- README.md | 57 +++++++++------------- lib/kaffe/config.ex | 93 +++++++++++++++++------------------- lib/kaffe/config/consumer.ex | 37 ++++---------- lib/kaffe/config/producer.ex | 32 +++---------- mix.exs | 1 + mix.lock | 1 + test/data/ca.der | 1 + test/data/ca.pem | 29 +++++++++++ test/data/cert.der | 15 ++++++ test/data/cert.pem | 20 ++++++++ test/data/key.der | 22 +++++++++ test/data/key.pem | 30 ++++++++++++ test/kaffe/config_test.exs | 78 ++++++++++++++++++++++++------ test/support/send_message.ex | 2 +- 14 files changed, 267 insertions(+), 151 deletions(-) create mode 100644 test/data/ca.der create mode 100644 test/data/ca.pem create mode 100644 test/data/cert.der create mode 100644 test/data/cert.pem create mode 100644 test/data/key.der create mode 100644 test/data/key.pem diff --git a/README.md b/README.md index e9f789e..d5cca8c 100644 --- a/README.md +++ b/README.md @@ -84,31 +84,27 @@ Batch message consumers receive a list of messages and work as part of the `:bro # optional async_message_ack: false, # see "async message acknowledgement" below start_with_earliest_message: true # default false + ssl: [ + certfile: '/etc/kafka/ssl/cert.pem', + keyfile: '/etc/kafka/ssl/key.pem', + cacertfile: '/etc/kafka/ssl/ca.pem' + # See http://erlang.org/doc/man/ssl.html for other options + ] ], ``` The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted then your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages. - ### Heroku Configuration - To configure a Kaffe Consumer for a Heroku Kafka compatible environment including SSL omit the `endpoint` and instead set `heroku_kafka_env: true` - ```elixir - config :kaffe, - consumer: [ - heroku_kafka_env: true, - topics: ["interesting-topic"], - consumer_group: "your-app-consumer-group", - message_handler: MessageProcessor - ] - ``` + ### Heroku Configuration - With that setting in place Kaffe will automatically pull required info from the following ENV variables: + To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables: - - `KAFKA_URL` - - `KAFKA_CLIENT_CERT` - - `KAFKA_CLIENT_CERT_KEY` - - `KAFKA_TRUSTED_CERT` (not used yet) + - `KAFKA_URL`: `kafka://host:port` (default port 9092) or `kafka+ssl://host:port` (default port 9093) + - `KAFKA_CLIENT_CERT`: PEM encoded client certificate + - `KAFKA_CLIENT_CERT_KEY`: PEM encoded client private key + - `KAFKA_TRUSTED_CERT`: PEM encoded CA cert(s) 3. Add `Kaffe.Consumer` as a worker in your supervision tree @@ -237,6 +233,12 @@ Configure your Kaffe Producer in your mix config # optional partition_strategy: :md5 + ssl: [ + certfile: '/etc/kafka/ssl/cert.pem', + keyfile: '/etc/kafka/ssl/key.pem', + cacertfile: '/etc/kafka/ssl/ca.pem' + # See http://erlang.org/doc/man/ssl.html for other options + ] ] ``` @@ -250,25 +252,12 @@ You can also set any of the Brod producer configuration options in the `producer ### Heroku Configuration -To configure a Kaffe Producer for a Heroku Kafka compatible environment including SSL omit the `endpoint` and instead set `heroku_kafka_env: true` - - ```elixir - config :kaffe, - producer: [ - heroku_kafka_env: true, - topics: ["kafka-topic"], - - # optional - partition_strategy: :md5 - ] - ``` - -With that setting in place Kaffe will automatically pull required info from the following ENV variables: + To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables: - - `KAFKA_URL` - - `KAFKA_CLIENT_CERT` - - `KAFKA_CLIENT_CERT_KEY` - - `KAFKA_TRUSTED_CERT` + - `KAFKA_URL`: `kafka://host:port` (default port 9092) or `kafka+ssl://host:port` (default port 9093) + - `KAFKA_CLIENT_CERT`: PEM encoded client certificate + - `KAFKA_CLIENT_CERT_KEY`: PEM encoded client private key + - `KAFKA_TRUSTED_CERT`: PEM encoded CA cert(s) ### Producing to Kafka diff --git a/lib/kaffe/config.ex b/lib/kaffe/config.ex index ad60a7d..2684bf3 100644 --- a/lib/kaffe/config.ex +++ b/lib/kaffe/config.ex @@ -1,67 +1,64 @@ defmodule Kaffe.Config do - def heroku_kafka_endpoints do - "KAFKA_URL" - |> System.get_env - |> heroku_kafka_endpoints - end - def heroku_kafka_endpoints(kafka_url) do - kafka_url - |> String.replace("kafka+ssl://", "") - |> String.replace("kafka://", "") - |> String.split(",") - |> Enum.map(&url_endpoint_to_tuple/1) + @doc "Get list of endpoints in brod format" + @spec endpoints(Keyword.t) :: list({atom, non_neg_integer}) + def endpoints(config) do + parse_kafka_urls(System.get_env("KAFKA_URL")) || Keyword.get(config, :endpoints, []) end - def url_endpoint_to_tuple(endpoint) do - [ip, port] = endpoint |> String.split(":") - {ip |> String.to_atom, port |> String.to_integer} + @doc "Parse Kafka URL(s) into list of endpoints" + @spec parse_kafka_urls(binary | nil) :: list({atom, non_neg_integer}) | nil + def parse_kafka_urls(nil), do: nil + def parse_kafka_urls(urls) do + for url <- String.split(urls, ","), do: parse_kafka_url(url) end - def ssl_config do - ssl_config(client_cert, client_cert_key) - end + @doc "Parse Kafka URL into {host, port}" + @spec parse_kafka_url(binary) :: {atom, non_neg_integer} + def parse_kafka_url(<<"kafka://", host :: binary>>), do: parse_kafka_host(host, 9092) + def parse_kafka_url(<<"kafka+ssl://", host ::binary>>), do: parse_kafka_host(host, 9093) - def ssl_config(_client_cert=nil, _client_cert_key=nil) do - [] + @spec parse_kafka_host(binary, non_neg_integer) :: {atom, non_neg_integer} + defp parse_kafka_host(host_port, default_port) do + case String.split(host_port, ":") do + [host, port] -> {String.to_atom(host), String.to_integer(port)} + [host] -> {String.to_atom(host), default_port} + end end - def ssl_config(client_cert, client_cert_key) do - [ - ssl: [ - cert: client_cert, - key: client_cert_key, - ] - ] + @doc "Get ssl options from config and OS environment vars" + @spec ssl(Keyword.t) :: Keyword.t + def ssl(config) do + ssl_defaults = config[:ssl] || [] + ssl_options(ssl_cert_option(System.get_env("KAFKA_CLIENT_CERT")) ++ + ssl_key_option(System.get_env("KAFKA_CLIENT_CERT_KEY")) ++ + ssl_cacerts_option(System.get_env("KAFKA_TRUSTED_CERT")) ++ + ssl_defaults) end - def client_cert do - case System.get_env("KAFKA_CLIENT_CERT") do - nil -> nil - cert -> extract_der(cert) - end - end + @spec ssl_options(Keyword.t) :: Keyword.t + defp ssl_options([]), do: [] + defp ssl_options(options), do: [ssl: options] - def client_cert_key do - case System.get_env("KAFKA_CLIENT_CERT_KEY") do - nil -> nil - cert_key -> extract_type_and_der(cert_key) - end + @spec ssl_cert_option(binary | nil) :: Keyword.t + def ssl_cert_option(nil), do: [] + def ssl_cert_option(pem) do + [{_type, der, _cypher_info} | _] = :public_key.pem_decode(pem) + [cert: der] end - def decode_pem(pem) do - pem - |> :public_key.pem_decode - |> List.first + @spec ssl_key_option(binary | nil) :: Keyword.t + def ssl_key_option(nil), do: [] + def ssl_key_option(pem) do + [{type, der, _cypher_info} | _] = :public_key.pem_decode(pem) + [key: {type, der}] end - def extract_der(cert) do - {_type, der, _} = decode_pem(cert) - der + @spec ssl_cacerts_option(binary | nil) :: Keyword.t + def ssl_cacerts_option(nil), do: [] + def ssl_cacerts_option(pem) do + certs = for {_type, der, _cypher_info} <- :public_key.pem_decode(pem), do: der + [cacerts: certs] end - def extract_type_and_der(cert_key) do - {type, der_cert, _} = decode_pem(cert_key) - {type, der_cert} - end end diff --git a/lib/kaffe/config/consumer.ex b/lib/kaffe/config/consumer.ex index 6be102a..904868b 100644 --- a/lib/kaffe/config/consumer.ex +++ b/lib/kaffe/config/consumer.ex @@ -28,12 +28,7 @@ defmodule Kaffe.Config.Consumer do def async_message_ack, do: config_get(:async_message_ack, false) - def endpoints do - case heroku_kafka?() do - true -> Kaffe.Config.heroku_kafka_endpoints() - false -> config_get!(:endpoints) - end - end + def endpoints, do: Kaffe.Config.endpoints(config()) def consumer_group_config do [ @@ -59,8 +54,7 @@ defmodule Kaffe.Config.Consumer do end def client_consumer_config do - default_client_consumer_config() - ++ maybe_heroku_kafka_ssl() + default_client_consumer_config() ++ Kaffe.Config.ssl(config()) end def default_client_consumer_config do @@ -72,9 +66,10 @@ defmodule Kaffe.Config.Consumer do end def begin_offset do - case config_get(:start_with_earliest_message, false) do - true -> :earliest - false -> -1 + if config_get(:start_with_earliest_message, false) do + :earliest + else + -1 end end @@ -86,24 +81,10 @@ defmodule Kaffe.Config.Consumer do config_get(:worker_allocation_strategy, :worker_per_partition) end - def maybe_heroku_kafka_ssl do - case heroku_kafka?() do - true -> Kaffe.Config.ssl_config() - false -> [] - end - end + def config, do: Application.get_env(:kaffe, :consumer) - def heroku_kafka? do - config_get(:heroku_kafka_env, false) - end + def config_get!(key), do: Keyword.fetch!(config(), key) - def config_get!(key) do - Application.get_env(:kaffe, :consumer) - |> Keyword.fetch!(key) - end + def config_get(key, default), do: Keyword.get(config(), key, default) - def config_get(key, default) do - Application.get_env(:kaffe, :consumer) - |> Keyword.get(key, default) - end end diff --git a/lib/kaffe/config/producer.ex b/lib/kaffe/config/producer.ex index 183ddea..226a103 100644 --- a/lib/kaffe/config/producer.ex +++ b/lib/kaffe/config/producer.ex @@ -11,23 +11,10 @@ defmodule Kaffe.Config.Producer do def producer_topics, do: config_get!(:topics) - def endpoints do - case heroku_kafka?() do - true -> Kaffe.Config.heroku_kafka_endpoints() - false -> config_get!(:endpoints) - end - end + def endpoints, do: Kaffe.Config.endpoints(config()) def client_producer_config do - default_client_producer_config() - ++ maybe_heroku_kafka_ssl() - end - - def maybe_heroku_kafka_ssl do - case heroku_kafka?() do - true -> Kaffe.Config.ssl_config() - false -> [] - end + default_client_producer_config() ++ Kaffe.Config.ssl(config()) end def default_client_producer_config do @@ -48,17 +35,10 @@ defmodule Kaffe.Config.Producer do ] end - def heroku_kafka? do - config_get(:heroku_kafka_env, false) - end + def config, do: Application.get_env(:kaffe, :producer) - def config_get!(key) do - Application.get_env(:kaffe, :producer) - |> Keyword.fetch!(key) - end + def config_get!(key), do: Keyword.fetch!(config(), key) + + def config_get(key, default), do: Keyword.get(config(), key, default) - def config_get(key, default) do - Application.get_env(:kaffe, :producer) - |> Keyword.get(key, default) - end end diff --git a/mix.exs b/mix.exs index 27622a7..52e7de3 100644 --- a/mix.exs +++ b/mix.exs @@ -27,6 +27,7 @@ defmodule Kaffe.Mixfile do [ {:brod, "~> 3.0"}, {:ex_doc, "~> 0.14", only: :dev, runtime: false}, + {:meck, "~> 0.8.8", only: :test, runtime: false}, ] end diff --git a/mix.lock b/mix.lock index 354d013..0dc1d78 100644 --- a/mix.lock +++ b/mix.lock @@ -3,6 +3,7 @@ "ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]}, "kafka_protocol": {:hex, :kafka_protocol, "1.1.0", "817c07a6339cbfb32d1f20a588353bf8d9a8944df296eb2e930360b83760c171", [:rebar, :rebar3], [{:snappyer, "1.2.1", [repo: "hexpm", hex: :snappyer, optional: false]}], "hexpm"}, "logfmt": {:hex, :logfmt, "3.2.0", "887a091adad28acc6e4d8b3d3bce177b934e7c61e7655c86946410f44aca6d84", [:mix], []}, + "meck": {:hex, :meck, "0.8.9", "64c5c0bd8bcca3a180b44196265c8ed7594e16bcc845d0698ec6b4e577f48188", [], [], "hexpm"}, "metrix": {:git, "https://github.com/rwdaigle/metrix.git", "a6738df9346da0412ca68f82a24a67d2a32b066e", [branch: "master"]}, "snappyer": {:hex, :snappyer, "1.2.1", "06c5f5c8afe80ba38e94e1ca1bd9253de95d8f2c85b08783e8d0f63815580556", [:make, :rebar, :rebar3], [], "hexpm"}, "supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], []}} diff --git a/test/data/ca.der b/test/data/ca.der new file mode 100644 index 0000000..e997d1e --- /dev/null +++ b/test/data/ca.der @@ -0,0 +1 @@ +MIIE4jCCAsoCCQDYI7B+PTiZPDANBgkqhkiG9w0BAQsFADAzMQswCQYDVQQGEwJVUzEOMAwGA1UECgwFS2Fma2ExFDASBgNVBAMMC0NOPUthZmthIENBMB4XDTE3MTIwNjExMTQyOVoXDTE4MTIwNjExMTQyOVowMzELMAkGA1UEBhMCVVMxDjAMBgNVBAoMBUthZmthMRQwEgYDVQQDDAtDTj1LYWZrYSBDQTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBALzYrZmxwZbrIwh7dEzyD844X4dkJlVa2Hl+9LUrRFif1mbqkmcEhyHpdAfovV85OoJrp8FTp4GhXqOQM1qu1OoZdT9+x842h7F0kM524Ax9Z7nBzjrLBZaaTEOzaug6HDz/jT+NDqVrPPqLoft1hxG801FCHlmz2wRS+K/RvicbiC574PVw9lvVbZfsmUKARhj2Z81NByixKP1VFQrp6aAhwn6f44OKXs9i0twqypbA7LHw07QPedz6ebDtkQqr9Nz7LK41qXwUMm8lozgn+X2RNoRqyklT7xyp3i271huCO70mYK6gRccLjdjATBHsevngmeehr2jhr2IST8Xvfa5fYAku783/GZxE57sLTV+vzQ++Dube/xVX/bCp3lJ7rpbavdSkHX3Myw+BMiH5XPeshzaPXuNU0rTQtUNpRzBp4dcQRVsuHUCuJHA7PnmO/4KMW7EwLKRoemcHI1wPEAOLHDvgVl2kGBwVLrphw13aVpt2QG+r2pnfk5yGugoOKH2o+l/SIQzadGues7zvPhIC08hl5j8TE++gdSur9yWDfiU3mjubn1d7bmgnxzH67Yxj2iwMVV+5dtwHpD3UM8o8fCA1D7KA/UXLzFWoN8/tM6xmfeX+SnihjfchQk4OW1lUL7Yf1HQqexlK8BiD4e+k63MwhpKVXk84fzCa8y1BAgMBAAEwDQYJKoZIhvcNAQELBQADggIBADtmo5aPCoVwGGTEcM1HDx6le8BehOOBm0Hqkku21vsHyv+L65PGZ+1/1ilXOhsloaQngorxijXlEchaOJ6It8aev65m1+qovlLrJGXYMHOpRJ/cKfWDgKPCZ1hgbbtZ2UG4VUQqYklVfqSLPF7oO1PnVKamecQ+9PZpyPX+lHz+CN7gk1bw2jcjZoi8VLuD1/QQ1rwhRBgkkj+MArjzBaxO996VN54EhHNHdmGXc5hhun3USXMKniiGot/xUjVus0aSxZNRpScHstpScki36w/tT/e6hHKi59hqRV09FN5EjlAHRjQKxYruqzTUKG0q/f0Al2asIZ04+VRLbANdcdvNNDcwJRUZirasrEVxy9lP5LW4NtVHNOjmTZY1AEQTTlkvwo4ZczFVsmMaJ6FTJ6rYIC87sDf5USM/Yil0fjq7fO/oAdOVNUjArStaogFUWWuM/EFBeO5U72i2ohPz0rCl1R+GWdrmmdbnHdjIQB2Kj79Vnd/GnJcV4tZVqYbpBtp1zlWxlWo4sjVV1+45JxvGskWSIb0lVVSq/HOrKRq54zxFivCUoVR4PQC3a8qdx6o2/COxHfccNdfxBK4Vj2uX/1L2EmbEHl09duLdZH7THevaSz25eByX7eTLIRGIIyTbyTgz/RI6G3itW4ZdMG9CwuwTmi/FqO/Xoffmjo06 \ No newline at end of file diff --git a/test/data/ca.pem b/test/data/ca.pem new file mode 100644 index 0000000..c848b32 --- /dev/null +++ b/test/data/ca.pem @@ -0,0 +1,29 @@ +-----BEGIN CERTIFICATE----- +MIIE4jCCAsoCCQDYI7B+PTiZPDANBgkqhkiG9w0BAQsFADAzMQswCQYDVQQGEwJV +UzEOMAwGA1UECgwFS2Fma2ExFDASBgNVBAMMC0NOPUthZmthIENBMB4XDTE3MTIw +NjExMTQyOVoXDTE4MTIwNjExMTQyOVowMzELMAkGA1UEBhMCVVMxDjAMBgNVBAoM +BUthZmthMRQwEgYDVQQDDAtDTj1LYWZrYSBDQTCCAiIwDQYJKoZIhvcNAQEBBQAD +ggIPADCCAgoCggIBALzYrZmxwZbrIwh7dEzyD844X4dkJlVa2Hl+9LUrRFif1mbq +kmcEhyHpdAfovV85OoJrp8FTp4GhXqOQM1qu1OoZdT9+x842h7F0kM524Ax9Z7nB +zjrLBZaaTEOzaug6HDz/jT+NDqVrPPqLoft1hxG801FCHlmz2wRS+K/RvicbiC57 +4PVw9lvVbZfsmUKARhj2Z81NByixKP1VFQrp6aAhwn6f44OKXs9i0twqypbA7LHw +07QPedz6ebDtkQqr9Nz7LK41qXwUMm8lozgn+X2RNoRqyklT7xyp3i271huCO70m +YK6gRccLjdjATBHsevngmeehr2jhr2IST8Xvfa5fYAku783/GZxE57sLTV+vzQ++ +Dube/xVX/bCp3lJ7rpbavdSkHX3Myw+BMiH5XPeshzaPXuNU0rTQtUNpRzBp4dcQ +RVsuHUCuJHA7PnmO/4KMW7EwLKRoemcHI1wPEAOLHDvgVl2kGBwVLrphw13aVpt2 +QG+r2pnfk5yGugoOKH2o+l/SIQzadGues7zvPhIC08hl5j8TE++gdSur9yWDfiU3 +mjubn1d7bmgnxzH67Yxj2iwMVV+5dtwHpD3UM8o8fCA1D7KA/UXLzFWoN8/tM6xm +feX+SnihjfchQk4OW1lUL7Yf1HQqexlK8BiD4e+k63MwhpKVXk84fzCa8y1BAgMB +AAEwDQYJKoZIhvcNAQELBQADggIBADtmo5aPCoVwGGTEcM1HDx6le8BehOOBm0Hq +kku21vsHyv+L65PGZ+1/1ilXOhsloaQngorxijXlEchaOJ6It8aev65m1+qovlLr +JGXYMHOpRJ/cKfWDgKPCZ1hgbbtZ2UG4VUQqYklVfqSLPF7oO1PnVKamecQ+9PZp +yPX+lHz+CN7gk1bw2jcjZoi8VLuD1/QQ1rwhRBgkkj+MArjzBaxO996VN54EhHNH +dmGXc5hhun3USXMKniiGot/xUjVus0aSxZNRpScHstpScki36w/tT/e6hHKi59hq +RV09FN5EjlAHRjQKxYruqzTUKG0q/f0Al2asIZ04+VRLbANdcdvNNDcwJRUZiras +rEVxy9lP5LW4NtVHNOjmTZY1AEQTTlkvwo4ZczFVsmMaJ6FTJ6rYIC87sDf5USM/ +Yil0fjq7fO/oAdOVNUjArStaogFUWWuM/EFBeO5U72i2ohPz0rCl1R+GWdrmmdbn +HdjIQB2Kj79Vnd/GnJcV4tZVqYbpBtp1zlWxlWo4sjVV1+45JxvGskWSIb0lVVSq +/HOrKRq54zxFivCUoVR4PQC3a8qdx6o2/COxHfccNdfxBK4Vj2uX/1L2EmbEHl09 +duLdZH7THevaSz25eByX7eTLIRGIIyTbyTgz/RI6G3itW4ZdMG9CwuwTmi/FqO/X +offmjo06 +-----END CERTIFICATE----- diff --git a/test/data/cert.der b/test/data/cert.der new file mode 100644 index 0000000..218b2da --- /dev/null +++ b/test/data/cert.der @@ -0,0 +1,15 @@ +MIIDATCCAemgAwIBAgIEdz2C7zANBgkqhkiG9w0BAQsFADAxMQswCQYDVQQGEwJVUzEOMAwGA1UE +ChMFS2Fma2ExEjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0xNzEyMDgwNDA5MDlaFw0xODEyMDgwNDA5 +MDlaMDExCzAJBgNVBAYTAlVTMQ4wDAYDVQQKEwVLYWZrYTESMBAGA1UEAxMJbG9jYWxob3N0MIIB +IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAlE3difFwZjIOgRtRHY1SKvTbBmn3N+3jfOYE +JUibWZNrEGJcUARRQpFGDs7svp9KLan2D7S2Ss4OvwnD4DmdLMqto6mYx/4Di2zF8GZOci7Qz03v +7pEpgKGzHz77SvpmVYK3lk6M+LvapH6Q6AlPDUieRLc4JWsDoMRDTG4/9lSsZZCP6WyF9yK+Xl0I +PsPvVmEjXzxPq/Lc5+9Cy4M+KIGlaXzGU0zE3xFwdlBUUmwml6SbFRVW7uwZEaCkqX6wZbFb4NQx +1qYGKhz034inrqtLqbFRHCDrGJO+yVparsFnHjbYzck0BLX9Xm2TOVCpavYD7YOc6FHnDlhl9kqQ +eQIDAQABoyEwHzAdBgNVHQ4EFgQUuKYCPpnTs+2pvBRzGEbRR3Bk0TgwDQYJKoZIhvcNAQELBQAD +ggEBABvrkVnc5wu9hjkdW+rEW2b3FqmUPinxvly8fe57OpWElq22jdgVpiaftMB8fDn4NVOdOU2R +QQ1/wG1/5K8CnHNFEvpNVyE7lo7qD5AvG8RXFlX6UoBX682icw6r+3NTDRiTTKJm4fpLT77dsBhK +HfurS8C2qfhcwBmcA3cgrPgrsxs6OxAbKr9HlJQmwYTSqhM2Yd7e06ew2tDeIfUl1GZAAp11/gtF +tumcWqON8L/qKYmJO9SQzexcjVYS3yXwdS0bMv5r/D2jmbFWw8jiqU+BSTE99c/xOxMvLbswuyyy +YhJQ2EQ4Z+ZnlvEDGM5uicSya/OiscxGYtq2BQ0k7xk= + diff --git a/test/data/cert.pem b/test/data/cert.pem new file mode 100644 index 0000000..ddb7289 --- /dev/null +++ b/test/data/cert.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDATCCAemgAwIBAgIEdz2C7zANBgkqhkiG9w0BAQsFADAxMQswCQYDVQQGEwJV +UzEOMAwGA1UEChMFS2Fma2ExEjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0xNzEyMDgw +NDA5MDlaFw0xODEyMDgwNDA5MDlaMDExCzAJBgNVBAYTAlVTMQ4wDAYDVQQKEwVL +YWZrYTESMBAGA1UEAxMJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAlE3difFwZjIOgRtRHY1SKvTbBmn3N+3jfOYEJUibWZNrEGJcUARR +QpFGDs7svp9KLan2D7S2Ss4OvwnD4DmdLMqto6mYx/4Di2zF8GZOci7Qz03v7pEp +gKGzHz77SvpmVYK3lk6M+LvapH6Q6AlPDUieRLc4JWsDoMRDTG4/9lSsZZCP6WyF +9yK+Xl0IPsPvVmEjXzxPq/Lc5+9Cy4M+KIGlaXzGU0zE3xFwdlBUUmwml6SbFRVW +7uwZEaCkqX6wZbFb4NQx1qYGKhz034inrqtLqbFRHCDrGJO+yVparsFnHjbYzck0 +BLX9Xm2TOVCpavYD7YOc6FHnDlhl9kqQeQIDAQABoyEwHzAdBgNVHQ4EFgQUuKYC +PpnTs+2pvBRzGEbRR3Bk0TgwDQYJKoZIhvcNAQELBQADggEBABvrkVnc5wu9hjkd +W+rEW2b3FqmUPinxvly8fe57OpWElq22jdgVpiaftMB8fDn4NVOdOU2RQQ1/wG1/ +5K8CnHNFEvpNVyE7lo7qD5AvG8RXFlX6UoBX682icw6r+3NTDRiTTKJm4fpLT77d +sBhKHfurS8C2qfhcwBmcA3cgrPgrsxs6OxAbKr9HlJQmwYTSqhM2Yd7e06ew2tDe +IfUl1GZAAp11/gtFtumcWqON8L/qKYmJO9SQzexcjVYS3yXwdS0bMv5r/D2jmbFW +w8jiqU+BSTE99c/xOxMvLbswuyyyYhJQ2EQ4Z+ZnlvEDGM5uicSya/OiscxGYtq2 +BQ0k7xk= +-----END CERTIFICATE----- + diff --git a/test/data/key.der b/test/data/key.der new file mode 100644 index 0000000..b152686 --- /dev/null +++ b/test/data/key.der @@ -0,0 +1,22 @@ +TAyeJz2yJV6dXrCcLSg82S+Vpe4v5x2g7+cL5X7VR++NlBACRK8UPrfoJUqt/c6tX+bvgAqqzySn +f1v6n8ORhK0W7nYFdzK04w8YzbFOHm2wgtgwM2Im3jDKzmSBjpLELl3QyjBGcn6JKTQCa9GzA2DR +rD9L7YY54/WQKWM7YCkmIZq+OMCnEYx3houNOM7VlMdOPLJrhph9CRZqwVYYR2Ibu8yA63wOX2bv +hqAcHHp93q/aqq3L8oBDqiCuau8EnXXrXYOx2jKovGTO5cdKTp7o63h4ABx9yelwkxWRQjPpHob0 +4vFE5UCAiUotYCW0JNTeH3X+EWPPXBSLmSLR+Pb9/teLsLvNoy5LzlPps4oBZmzwbpp3+pEvmly+ +EeIxcpMw4zhvWyr3pGPfAVfJcCGl68idR/YtFTxwuSCFKUQG9LccAIbEBTeJGabTaDVSmucLfxom +GsYnujwAXsacC1uEAQdCdl/0eB64VQqWayyQypL1nAimrvdjJXjhFEFyvKciYd2kmJfJVI1jixEW +QsmVdZmCi86ImyzWik9eQT2AvtA8Sn2nZFUXzlZ086c22UVd3NNVn+LcZWHrT55CzPij5i4HradS +P+eoWtsQwPhQb4W42f/v2tahSxDe6h2vvGabdfjE91E5wIhmhDMmjTOLGPi2Ckm61GmbqE74ZKOQ +9uQtWVmSfUPsYmCn0iFo0hjpEqRu/8Q0KhRiZWS85CpuK2wtBLaGEVduqHvjBgO21e81NGrTZkkL +tr3H+SMXhI4iOcJIytWDF9yDBGrUjBd3in3T5Jq2HvAh+mWssPKA9yRMF28nIuur9fY2UgMNrDMD +watev2GlyV8JrhtfAyhVl0ZJuqp9IVcusemklCiwPBn+OxFP/gayCdA40kxaDwOQa9iYy+fVMyuP +f+9A3cw5hZ8M5DAE78g6S+vICB+JpPt2XA/6GfoV69xZFmJLl9cKyacFNjomnKZ8C77glVgSOvnu +EwWAoOYTkNFHH5gLAhoKErcnmvfoFMiBRuzhGAmKYKYDSYNXblgvAErOyCsKnEk17B+Q71Us/uvY +F/lSnMPlTS7n1SONJbiEGQvoIGU7JriWqtI+yg9joec1TFGE9r+7GF1FaUQy9JliXWcTz1+Qovj0 +yrCOw6JyaIucYRstzNZINbwhlLC8cp+ipNsT23vvixaAXthqmQlpgSaVWggDS3Oz8/dSkPoFctKC +EHfb5zA7MeJnVVV6N2KWuqFvdgMwpOmBwG0qEBS0jiDyMsisf/7ri6mztUC8YFYF9b3XPU3ZQ2c1 +ozdzC08Gyf6yieNKfPHcGi+Lf/UN5Stx+jLjVl7kAybTj6kcQY04RdG7tkjHR69lKLbzMj+Y5a3c +U3vrC3Nc6hC2yrCz3lSU6Hwxq0VCs2hZugFbBvPEaoHki/qQxv35OnyCFWvs+qNitEgBSS4OBirQ +zGD37GMwX9rzXRYapSiLi2SclTrcTqsFCJXElqsnxlg8pp+KX9pyHagPSPuDaE7/ZOsVzgzBDtMR +dL6ohytKXc29umTc3BCIb9vPzsx10Dw8+/pf/xgSAwny4/DJlRgivKx2SFKO3wx5C131JsO9QLTO +ceb9zAv9m+mAT6ER9e1wLGy0L/mK5TqXTc3G diff --git a/test/data/key.pem b/test/data/key.pem new file mode 100644 index 0000000..51fc857 --- /dev/null +++ b/test/data/key.pem @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFDjBABgkqhkiG9w0BBQ0wMzAbBgkqhkiG9w0BBQwwDgQImLd1sCBnPeUCAggA +MBQGCCqGSIb3DQMHBAjNJxIKccy26ASCBMhMDJ4nPbIlXp1esJwtKDzZL5Wl7i/n +HaDv5wvlftVH742UEAJErxQ+t+glSq39zq1f5u+ACqrPJKd/W/qfw5GErRbudgV3 +MrTjDxjNsU4ebbCC2DAzYibeMMrOZIGOksQuXdDKMEZyfokpNAJr0bMDYNGsP0vt +hjnj9ZApYztgKSYhmr44wKcRjHeGi404ztWUx048smuGmH0JFmrBVhhHYhu7zIDr +fA5fZu+GoBwcen3er9qqrcvygEOqIK5q7wSddetdg7HaMqi8ZM7lx0pOnujreHgA +HH3J6XCTFZFCM+kehvTi8UTlQICJSi1gJbQk1N4fdf4RY89cFIuZItH49v3+14uw +u82jLkvOU+mzigFmbPBumnf6kS+aXL4R4jFykzDjOG9bKvekY98BV8lwIaXryJ1H +9i0VPHC5IIUpRAb0txwAhsQFN4kZptNoNVKa5wt/GiYaxie6PABexpwLW4QBB0J2 +X/R4HrhVCpZrLJDKkvWcCKau92MleOEUQXK8pyJh3aSYl8lUjWOLERZCyZV1mYKL +zoibLNaKT15BPYC+0DxKfadkVRfOVnTzpzbZRV3c01Wf4txlYetPnkLM+KPmLget +p1I/56ha2xDA+FBvhbjZ/+/a1qFLEN7qHa+8Zpt1+MT3UTnAiGaEMyaNM4sY+LYK +SbrUaZuoTvhko5D25C1ZWZJ9Q+xiYKfSIWjSGOkSpG7/xDQqFGJlZLzkKm4rbC0E +toYRV26oe+MGA7bV7zU0atNmSQu2vcf5IxeEjiI5wkjK1YMX3IMEatSMF3eKfdPk +mrYe8CH6Zayw8oD3JEwXbyci66v19jZSAw2sMwPBq16/YaXJXwmuG18DKFWXRkm6 +qn0hVy6x6aSUKLA8Gf47EU/+BrIJ0DjSTFoPA5Br2JjL59UzK49/70DdzDmFnwzk +MATvyDpL68gIH4mk+3ZcD/oZ+hXr3FkWYkuX1wrJpwU2OiacpnwLvuCVWBI6+e4T +BYCg5hOQ0UcfmAsCGgoStyea9+gUyIFG7OEYCYpgpgNJg1duWC8ASs7IKwqcSTXs +H5DvVSz+69gX+VKcw+VNLufVI40luIQZC+ggZTsmuJaq0j7KD2Oh5zVMUYT2v7sY +XUVpRDL0mWJdZxPPX5Ci+PTKsI7DonJoi5xhGy3M1kg1vCGUsLxyn6Kk2xPbe++L +FoBe2GqZCWmBJpVaCANLc7Pz91KQ+gVy0oIQd9vnMDsx4mdVVXo3Ypa6oW92AzCk +6YHAbSoQFLSOIPIyyKx//uuLqbO1QLxgVgX1vdc9TdlDZzWjN3MLTwbJ/rKJ40p8 +8dwaL4t/9Q3lK3H6MuNWXuQDJtOPqRxBjThF0bu2SMdHr2UotvMyP5jlrdxTe+sL +c1zqELbKsLPeVJTofDGrRUKzaFm6AVsG88RqgeSL+pDG/fk6fIIVa+z6o2K0SAFJ +Lg4GKtDMYPfsYzBf2vNdFhqlKIuLZJyVOtxOqwUIlcSWqyfGWDymn4pf2nIdqA9I ++4NoTv9k6xXODMEO0xF0vqiHK0pdzb26ZNzcEIhv28/OzHXQPDz7+l//GBIDCfLj +8MmVGCK8rHZIUo7fDHkLXfUmw71AtM5x5v3MC/2b6YBPoRH17XAsbLQv+YrlOpdN +zcY= +-----END ENCRYPTED PRIVATE KEY----- diff --git a/test/kaffe/config_test.exs b/test/kaffe/config_test.exs index 838dddb..11a9c8d 100644 --- a/test/kaffe/config_test.exs +++ b/test/kaffe/config_test.exs @@ -1,29 +1,79 @@ defmodule Kaffe.ConfigTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false + + describe "endpoints/1" do + test "parses urls to correct format" do + assert Kaffe.Config.parse_kafka_url("kafka+ssl://192.168.1.100:9096") == {:"192.168.1.100", 9096} + assert Kaffe.Config.parse_kafka_url("kafka+ssl://192.168.1.100") == {:"192.168.1.100", 9093} + assert Kaffe.Config.parse_kafka_url("kafka+ssl://kafka.example.com") == {:"kafka.example.com", 9093} + + assert Kaffe.Config.parse_kafka_url("kafka://192.168.1.100:9096") == {:"192.168.1.100", 9096} + assert Kaffe.Config.parse_kafka_url("kafka://192.168.1.100") == {:"192.168.1.100", 9092} + + assert Kaffe.Config.parse_kafka_urls(nil) == nil - describe "heroku_kafka_endpoints/1" do - test "transforms endpoints into the correct format" do kafka_url = "kafka+ssl://192.168.1.100:9096,kafka+ssl://192.168.1.101:9096,kafka+ssl://192.168.1.102:9096" expected = [{:"192.168.1.100", 9096}, {:"192.168.1.101", 9096}, {:"192.168.1.102", 9096}] + assert Kaffe.Config.parse_kafka_urls(kafka_url) == expected + end - assert Kaffe.Config.heroku_kafka_endpoints(kafka_url) == expected + test "gets endpoint data properly" do + assert Kaffe.Config.endpoints([endpoints: [kafka: 9096]]) == [kafka: 9096] end - end - describe "ssl_config/2" do - test "ssl_config returns an empty list when cert and key are nil" do - assert Kaffe.Config.ssl_config(nil, nil) == [] + test "gets endpoint data from OS environment" do + :meck.new(System) + :meck.expect(System, :get_env, fn("KAFKA_URL") -> + "kafka+ssl://192.168.1.100:9096,kafka+ssl://192.168.1.101:9096,kafka+ssl://192.168.1.102:9096" + end) + + assert Kaffe.Config.endpoints([]) == [{:"192.168.1.100", 9096}, {:"192.168.1.101", 9096}, {:"192.168.1.102", 9096}] + + :meck.unload() end + end + + describe "ssl/1" do + test "ssl returns Erlang SSL module config from OS environment vars" do + cert_pem = File.read!("./test/data/cert.pem") + # System.put_env("KAFKA_CLIENT_CERT", cert_pem) - test "ssl_config returns Erlang SSL module config with cert and key" do - client_cert = "not really a cert" - client_cert_key = "not really a cert key" - assert Kaffe.Config.ssl_config(client_cert, client_cert_key) == [ + key_pem = File.read!("./test/data/key.pem") + # System.put_env("KAFKA_CLIENT_CERT_KEY", key_pem) + + ca_pem = File.read!("./test/data/ca.pem") + # System.put_env("KAFKA_TRUSTED_CERT", ca_pem) + + cert_der_base64 = File.read!("./test/data/cert.der") + {:ok, cert_der} = Base.decode64(cert_der_base64, ignore: :whitespace) + + key_der_base64 = File.read!("./test/data/key.der") + {:ok, key_der} = Base.decode64(key_der_base64, ignore: :whitespace) + + ca_der_base64 = File.read!("./test/data/ca.der") + {:ok, ca_der} = Base.decode64(ca_der_base64, ignore: :whitespace) + + assert Kaffe.Config.ssl_cert_option(cert_pem) == [cert: cert_der] + assert Kaffe.Config.ssl_key_option(key_pem) == [key: {:"PrivateKeyInfo", key_der}] + assert Kaffe.Config.ssl_cacerts_option(ca_pem) == [cacerts: [ca_der]] + + :meck.new(System) + :meck.expect(System, :get_env, + fn + ("KAFKA_CLIENT_CERT") -> cert_pem + ("KAFKA_CLIENT_CERT_KEY") -> key_pem + ("KAFKA_TRUSTED_CERT") -> ca_pem + end) + + assert Kaffe.Config.ssl([]) == [ ssl: [ - cert: client_cert, - key: client_cert_key, + cert: cert_der, + key: {:"PrivateKeyInfo", key_der}, + cacerts: [ca_der], ] ] + + :meck.unload() end end end diff --git a/test/support/send_message.ex b/test/support/send_message.ex index 6ae6da7..537575d 100644 --- a/test/support/send_message.ex +++ b/test/support/send_message.ex @@ -1,6 +1,6 @@ defmodule SendMessage do def handle_message(message) do - send self, message + send self(), message :ok end From 1680b34ac6679edddd13d0ab72c833ea60dca0e1 Mon Sep 17 00:00:00 2001 From: Jake Morrison Date: Fri, 8 Dec 2017 21:32:57 +0800 Subject: [PATCH 3/5] Format markdown --- README.md | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index d5cca8c..176c68b 100644 --- a/README.md +++ b/README.md @@ -96,7 +96,6 @@ Batch message consumers receive a list of messages and work as part of the `:bro The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted then your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages. - ### Heroku Configuration To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables: @@ -252,12 +251,12 @@ You can also set any of the Brod producer configuration options in the `producer ### Heroku Configuration - To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables: +To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables: - - `KAFKA_URL`: `kafka://host:port` (default port 9092) or `kafka+ssl://host:port` (default port 9093) - - `KAFKA_CLIENT_CERT`: PEM encoded client certificate - - `KAFKA_CLIENT_CERT_KEY`: PEM encoded client private key - - `KAFKA_TRUSTED_CERT`: PEM encoded CA cert(s) +- `KAFKA_URL`: `kafka://host:port` (default port 9092) or `kafka+ssl://host:port` (default port 9093) +- `KAFKA_CLIENT_CERT`: PEM encoded client certificate +- `KAFKA_CLIENT_CERT_KEY`: PEM encoded client private key +- `KAFKA_TRUSTED_CERT`: PEM encoded CA cert(s) ### Producing to Kafka From aad8a7a2bb6fcd642f935f5c0670ad78b3ebe056 Mon Sep 17 00:00:00 2001 From: Jake Morrison Date: Fri, 8 Dec 2017 21:26:43 +0800 Subject: [PATCH 4/5] Support file based ssl config --- README.md | 57 +++++++++------------- lib/kaffe/config.ex | 93 +++++++++++++++++------------------- lib/kaffe/config/consumer.ex | 37 ++++---------- lib/kaffe/config/producer.ex | 32 +++---------- mix.exs | 1 + mix.lock | 1 + test/data/ca.der | 1 + test/data/ca.pem | 29 +++++++++++ test/data/cert.der | 15 ++++++ test/data/cert.pem | 20 ++++++++ test/data/key.der | 22 +++++++++ test/data/key.pem | 30 ++++++++++++ test/kaffe/config_test.exs | 78 ++++++++++++++++++++++++------ 13 files changed, 266 insertions(+), 150 deletions(-) create mode 100644 test/data/ca.der create mode 100644 test/data/ca.pem create mode 100644 test/data/cert.der create mode 100644 test/data/cert.pem create mode 100644 test/data/key.der create mode 100644 test/data/key.pem diff --git a/README.md b/README.md index bb6f408..ebecf62 100644 --- a/README.md +++ b/README.md @@ -86,31 +86,27 @@ Batch message consumers receive a list of messages and work as part of the `:bro # optional async_message_ack: false, # see "async message acknowledgement" below start_with_earliest_message: true # default false + ssl: [ + certfile: '/etc/kafka/ssl/cert.pem', + keyfile: '/etc/kafka/ssl/key.pem', + cacertfile: '/etc/kafka/ssl/ca.pem' + # See http://erlang.org/doc/man/ssl.html for other options + ] ], ``` The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted then your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages. - ### Heroku Configuration - To configure a Kaffe Consumer for a Heroku Kafka compatible environment including SSL omit the `endpoint` and instead set `heroku_kafka_env: true` - ```elixir - config :kaffe, - consumer: [ - heroku_kafka_env: true, - topics: ["interesting-topic"], - consumer_group: "your-app-consumer-group", - message_handler: MessageProcessor - ] - ``` + ### Heroku Configuration - With that setting in place Kaffe will automatically pull required info from the following ENV variables: + To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables: - - `KAFKA_URL` - - `KAFKA_CLIENT_CERT` - - `KAFKA_CLIENT_CERT_KEY` - - `KAFKA_TRUSTED_CERT` (not used yet) + - `KAFKA_URL`: `kafka://host:port` (default port 9092) or `kafka+ssl://host:port` (default port 9093) + - `KAFKA_CLIENT_CERT`: PEM encoded client certificate + - `KAFKA_CLIENT_CERT_KEY`: PEM encoded client private key + - `KAFKA_TRUSTED_CERT`: PEM encoded CA cert(s) 3. Add `Kaffe.Consumer` as a worker in your supervision tree @@ -239,6 +235,12 @@ Configure your Kaffe Producer in your mix config # optional partition_strategy: :md5 + ssl: [ + certfile: '/etc/kafka/ssl/cert.pem', + keyfile: '/etc/kafka/ssl/key.pem', + cacertfile: '/etc/kafka/ssl/ca.pem' + # See http://erlang.org/doc/man/ssl.html for other options + ] ] ``` @@ -252,25 +254,12 @@ You can also set any of the Brod producer configuration options in the `producer ### Heroku Configuration -To configure a Kaffe Producer for a Heroku Kafka compatible environment including SSL omit the `endpoint` and instead set `heroku_kafka_env: true` - - ```elixir - config :kaffe, - producer: [ - heroku_kafka_env: true, - topics: ["kafka-topic"], - - # optional - partition_strategy: :md5 - ] - ``` - -With that setting in place Kaffe will automatically pull required info from the following ENV variables: + To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables: - - `KAFKA_URL` - - `KAFKA_CLIENT_CERT` - - `KAFKA_CLIENT_CERT_KEY` - - `KAFKA_TRUSTED_CERT` + - `KAFKA_URL`: `kafka://host:port` (default port 9092) or `kafka+ssl://host:port` (default port 9093) + - `KAFKA_CLIENT_CERT`: PEM encoded client certificate + - `KAFKA_CLIENT_CERT_KEY`: PEM encoded client private key + - `KAFKA_TRUSTED_CERT`: PEM encoded CA cert(s) ### Producing to Kafka diff --git a/lib/kaffe/config.ex b/lib/kaffe/config.ex index 74ea583..2684bf3 100644 --- a/lib/kaffe/config.ex +++ b/lib/kaffe/config.ex @@ -1,67 +1,64 @@ defmodule Kaffe.Config do - def heroku_kafka_endpoints do - "KAFKA_URL" - |> System.get_env - |> heroku_kafka_endpoints - end - def heroku_kafka_endpoints(kafka_url) do - kafka_url - |> String.replace("kafka+ssl://", "") - |> String.replace("kafka://", "") - |> String.split(",") - |> Enum.map(&url_endpoint_to_tuple/1) + @doc "Get list of endpoints in brod format" + @spec endpoints(Keyword.t) :: list({atom, non_neg_integer}) + def endpoints(config) do + parse_kafka_urls(System.get_env("KAFKA_URL")) || Keyword.get(config, :endpoints, []) end - def url_endpoint_to_tuple(endpoint) do - [ip, port] = endpoint |> String.split(":") - {ip |> String.to_atom, port |> String.to_integer} + @doc "Parse Kafka URL(s) into list of endpoints" + @spec parse_kafka_urls(binary | nil) :: list({atom, non_neg_integer}) | nil + def parse_kafka_urls(nil), do: nil + def parse_kafka_urls(urls) do + for url <- String.split(urls, ","), do: parse_kafka_url(url) end - def ssl_config do - ssl_config(client_cert(), client_cert_key()) - end + @doc "Parse Kafka URL into {host, port}" + @spec parse_kafka_url(binary) :: {atom, non_neg_integer} + def parse_kafka_url(<<"kafka://", host :: binary>>), do: parse_kafka_host(host, 9092) + def parse_kafka_url(<<"kafka+ssl://", host ::binary>>), do: parse_kafka_host(host, 9093) - def ssl_config(_client_cert=nil, _client_cert_key=nil) do - [] + @spec parse_kafka_host(binary, non_neg_integer) :: {atom, non_neg_integer} + defp parse_kafka_host(host_port, default_port) do + case String.split(host_port, ":") do + [host, port] -> {String.to_atom(host), String.to_integer(port)} + [host] -> {String.to_atom(host), default_port} + end end - def ssl_config(client_cert, client_cert_key) do - [ - ssl: [ - cert: client_cert, - key: client_cert_key, - ] - ] + @doc "Get ssl options from config and OS environment vars" + @spec ssl(Keyword.t) :: Keyword.t + def ssl(config) do + ssl_defaults = config[:ssl] || [] + ssl_options(ssl_cert_option(System.get_env("KAFKA_CLIENT_CERT")) ++ + ssl_key_option(System.get_env("KAFKA_CLIENT_CERT_KEY")) ++ + ssl_cacerts_option(System.get_env("KAFKA_TRUSTED_CERT")) ++ + ssl_defaults) end - def client_cert do - case System.get_env("KAFKA_CLIENT_CERT") do - nil -> nil - cert -> extract_der(cert) - end - end + @spec ssl_options(Keyword.t) :: Keyword.t + defp ssl_options([]), do: [] + defp ssl_options(options), do: [ssl: options] - def client_cert_key do - case System.get_env("KAFKA_CLIENT_CERT_KEY") do - nil -> nil - cert_key -> extract_type_and_der(cert_key) - end + @spec ssl_cert_option(binary | nil) :: Keyword.t + def ssl_cert_option(nil), do: [] + def ssl_cert_option(pem) do + [{_type, der, _cypher_info} | _] = :public_key.pem_decode(pem) + [cert: der] end - def decode_pem(pem) do - pem - |> :public_key.pem_decode - |> List.first + @spec ssl_key_option(binary | nil) :: Keyword.t + def ssl_key_option(nil), do: [] + def ssl_key_option(pem) do + [{type, der, _cypher_info} | _] = :public_key.pem_decode(pem) + [key: {type, der}] end - def extract_der(cert) do - {_type, der, _} = decode_pem(cert) - der + @spec ssl_cacerts_option(binary | nil) :: Keyword.t + def ssl_cacerts_option(nil), do: [] + def ssl_cacerts_option(pem) do + certs = for {_type, der, _cypher_info} <- :public_key.pem_decode(pem), do: der + [cacerts: certs] end - def extract_type_and_der(cert_key) do - {type, der_cert, _} = decode_pem(cert_key) - {type, der_cert} - end end diff --git a/lib/kaffe/config/consumer.ex b/lib/kaffe/config/consumer.ex index 6be102a..904868b 100644 --- a/lib/kaffe/config/consumer.ex +++ b/lib/kaffe/config/consumer.ex @@ -28,12 +28,7 @@ defmodule Kaffe.Config.Consumer do def async_message_ack, do: config_get(:async_message_ack, false) - def endpoints do - case heroku_kafka?() do - true -> Kaffe.Config.heroku_kafka_endpoints() - false -> config_get!(:endpoints) - end - end + def endpoints, do: Kaffe.Config.endpoints(config()) def consumer_group_config do [ @@ -59,8 +54,7 @@ defmodule Kaffe.Config.Consumer do end def client_consumer_config do - default_client_consumer_config() - ++ maybe_heroku_kafka_ssl() + default_client_consumer_config() ++ Kaffe.Config.ssl(config()) end def default_client_consumer_config do @@ -72,9 +66,10 @@ defmodule Kaffe.Config.Consumer do end def begin_offset do - case config_get(:start_with_earliest_message, false) do - true -> :earliest - false -> -1 + if config_get(:start_with_earliest_message, false) do + :earliest + else + -1 end end @@ -86,24 +81,10 @@ defmodule Kaffe.Config.Consumer do config_get(:worker_allocation_strategy, :worker_per_partition) end - def maybe_heroku_kafka_ssl do - case heroku_kafka?() do - true -> Kaffe.Config.ssl_config() - false -> [] - end - end + def config, do: Application.get_env(:kaffe, :consumer) - def heroku_kafka? do - config_get(:heroku_kafka_env, false) - end + def config_get!(key), do: Keyword.fetch!(config(), key) - def config_get!(key) do - Application.get_env(:kaffe, :consumer) - |> Keyword.fetch!(key) - end + def config_get(key, default), do: Keyword.get(config(), key, default) - def config_get(key, default) do - Application.get_env(:kaffe, :consumer) - |> Keyword.get(key, default) - end end diff --git a/lib/kaffe/config/producer.ex b/lib/kaffe/config/producer.ex index 183ddea..226a103 100644 --- a/lib/kaffe/config/producer.ex +++ b/lib/kaffe/config/producer.ex @@ -11,23 +11,10 @@ defmodule Kaffe.Config.Producer do def producer_topics, do: config_get!(:topics) - def endpoints do - case heroku_kafka?() do - true -> Kaffe.Config.heroku_kafka_endpoints() - false -> config_get!(:endpoints) - end - end + def endpoints, do: Kaffe.Config.endpoints(config()) def client_producer_config do - default_client_producer_config() - ++ maybe_heroku_kafka_ssl() - end - - def maybe_heroku_kafka_ssl do - case heroku_kafka?() do - true -> Kaffe.Config.ssl_config() - false -> [] - end + default_client_producer_config() ++ Kaffe.Config.ssl(config()) end def default_client_producer_config do @@ -48,17 +35,10 @@ defmodule Kaffe.Config.Producer do ] end - def heroku_kafka? do - config_get(:heroku_kafka_env, false) - end + def config, do: Application.get_env(:kaffe, :producer) - def config_get!(key) do - Application.get_env(:kaffe, :producer) - |> Keyword.fetch!(key) - end + def config_get!(key), do: Keyword.fetch!(config(), key) + + def config_get(key, default), do: Keyword.get(config(), key, default) - def config_get(key, default) do - Application.get_env(:kaffe, :producer) - |> Keyword.get(key, default) - end end diff --git a/mix.exs b/mix.exs index a036feb..1130ac6 100644 --- a/mix.exs +++ b/mix.exs @@ -27,6 +27,7 @@ defmodule Kaffe.Mixfile do [ {:brod, "~> 3.0"}, {:ex_doc, "~> 0.14", only: :dev, runtime: false}, + {:meck, "~> 0.8.8", only: :test, runtime: false}, ] end diff --git a/mix.lock b/mix.lock index 354d013..0dc1d78 100644 --- a/mix.lock +++ b/mix.lock @@ -3,6 +3,7 @@ "ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]}, "kafka_protocol": {:hex, :kafka_protocol, "1.1.0", "817c07a6339cbfb32d1f20a588353bf8d9a8944df296eb2e930360b83760c171", [:rebar, :rebar3], [{:snappyer, "1.2.1", [repo: "hexpm", hex: :snappyer, optional: false]}], "hexpm"}, "logfmt": {:hex, :logfmt, "3.2.0", "887a091adad28acc6e4d8b3d3bce177b934e7c61e7655c86946410f44aca6d84", [:mix], []}, + "meck": {:hex, :meck, "0.8.9", "64c5c0bd8bcca3a180b44196265c8ed7594e16bcc845d0698ec6b4e577f48188", [], [], "hexpm"}, "metrix": {:git, "https://github.com/rwdaigle/metrix.git", "a6738df9346da0412ca68f82a24a67d2a32b066e", [branch: "master"]}, "snappyer": {:hex, :snappyer, "1.2.1", "06c5f5c8afe80ba38e94e1ca1bd9253de95d8f2c85b08783e8d0f63815580556", [:make, :rebar, :rebar3], [], "hexpm"}, "supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], []}} diff --git a/test/data/ca.der b/test/data/ca.der new file mode 100644 index 0000000..e997d1e --- /dev/null +++ b/test/data/ca.der @@ -0,0 +1 @@ +MIIE4jCCAsoCCQDYI7B+PTiZPDANBgkqhkiG9w0BAQsFADAzMQswCQYDVQQGEwJVUzEOMAwGA1UECgwFS2Fma2ExFDASBgNVBAMMC0NOPUthZmthIENBMB4XDTE3MTIwNjExMTQyOVoXDTE4MTIwNjExMTQyOVowMzELMAkGA1UEBhMCVVMxDjAMBgNVBAoMBUthZmthMRQwEgYDVQQDDAtDTj1LYWZrYSBDQTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBALzYrZmxwZbrIwh7dEzyD844X4dkJlVa2Hl+9LUrRFif1mbqkmcEhyHpdAfovV85OoJrp8FTp4GhXqOQM1qu1OoZdT9+x842h7F0kM524Ax9Z7nBzjrLBZaaTEOzaug6HDz/jT+NDqVrPPqLoft1hxG801FCHlmz2wRS+K/RvicbiC574PVw9lvVbZfsmUKARhj2Z81NByixKP1VFQrp6aAhwn6f44OKXs9i0twqypbA7LHw07QPedz6ebDtkQqr9Nz7LK41qXwUMm8lozgn+X2RNoRqyklT7xyp3i271huCO70mYK6gRccLjdjATBHsevngmeehr2jhr2IST8Xvfa5fYAku783/GZxE57sLTV+vzQ++Dube/xVX/bCp3lJ7rpbavdSkHX3Myw+BMiH5XPeshzaPXuNU0rTQtUNpRzBp4dcQRVsuHUCuJHA7PnmO/4KMW7EwLKRoemcHI1wPEAOLHDvgVl2kGBwVLrphw13aVpt2QG+r2pnfk5yGugoOKH2o+l/SIQzadGues7zvPhIC08hl5j8TE++gdSur9yWDfiU3mjubn1d7bmgnxzH67Yxj2iwMVV+5dtwHpD3UM8o8fCA1D7KA/UXLzFWoN8/tM6xmfeX+SnihjfchQk4OW1lUL7Yf1HQqexlK8BiD4e+k63MwhpKVXk84fzCa8y1BAgMBAAEwDQYJKoZIhvcNAQELBQADggIBADtmo5aPCoVwGGTEcM1HDx6le8BehOOBm0Hqkku21vsHyv+L65PGZ+1/1ilXOhsloaQngorxijXlEchaOJ6It8aev65m1+qovlLrJGXYMHOpRJ/cKfWDgKPCZ1hgbbtZ2UG4VUQqYklVfqSLPF7oO1PnVKamecQ+9PZpyPX+lHz+CN7gk1bw2jcjZoi8VLuD1/QQ1rwhRBgkkj+MArjzBaxO996VN54EhHNHdmGXc5hhun3USXMKniiGot/xUjVus0aSxZNRpScHstpScki36w/tT/e6hHKi59hqRV09FN5EjlAHRjQKxYruqzTUKG0q/f0Al2asIZ04+VRLbANdcdvNNDcwJRUZirasrEVxy9lP5LW4NtVHNOjmTZY1AEQTTlkvwo4ZczFVsmMaJ6FTJ6rYIC87sDf5USM/Yil0fjq7fO/oAdOVNUjArStaogFUWWuM/EFBeO5U72i2ohPz0rCl1R+GWdrmmdbnHdjIQB2Kj79Vnd/GnJcV4tZVqYbpBtp1zlWxlWo4sjVV1+45JxvGskWSIb0lVVSq/HOrKRq54zxFivCUoVR4PQC3a8qdx6o2/COxHfccNdfxBK4Vj2uX/1L2EmbEHl09duLdZH7THevaSz25eByX7eTLIRGIIyTbyTgz/RI6G3itW4ZdMG9CwuwTmi/FqO/Xoffmjo06 \ No newline at end of file diff --git a/test/data/ca.pem b/test/data/ca.pem new file mode 100644 index 0000000..c848b32 --- /dev/null +++ b/test/data/ca.pem @@ -0,0 +1,29 @@ +-----BEGIN CERTIFICATE----- +MIIE4jCCAsoCCQDYI7B+PTiZPDANBgkqhkiG9w0BAQsFADAzMQswCQYDVQQGEwJV +UzEOMAwGA1UECgwFS2Fma2ExFDASBgNVBAMMC0NOPUthZmthIENBMB4XDTE3MTIw +NjExMTQyOVoXDTE4MTIwNjExMTQyOVowMzELMAkGA1UEBhMCVVMxDjAMBgNVBAoM +BUthZmthMRQwEgYDVQQDDAtDTj1LYWZrYSBDQTCCAiIwDQYJKoZIhvcNAQEBBQAD +ggIPADCCAgoCggIBALzYrZmxwZbrIwh7dEzyD844X4dkJlVa2Hl+9LUrRFif1mbq +kmcEhyHpdAfovV85OoJrp8FTp4GhXqOQM1qu1OoZdT9+x842h7F0kM524Ax9Z7nB +zjrLBZaaTEOzaug6HDz/jT+NDqVrPPqLoft1hxG801FCHlmz2wRS+K/RvicbiC57 +4PVw9lvVbZfsmUKARhj2Z81NByixKP1VFQrp6aAhwn6f44OKXs9i0twqypbA7LHw +07QPedz6ebDtkQqr9Nz7LK41qXwUMm8lozgn+X2RNoRqyklT7xyp3i271huCO70m +YK6gRccLjdjATBHsevngmeehr2jhr2IST8Xvfa5fYAku783/GZxE57sLTV+vzQ++ +Dube/xVX/bCp3lJ7rpbavdSkHX3Myw+BMiH5XPeshzaPXuNU0rTQtUNpRzBp4dcQ +RVsuHUCuJHA7PnmO/4KMW7EwLKRoemcHI1wPEAOLHDvgVl2kGBwVLrphw13aVpt2 +QG+r2pnfk5yGugoOKH2o+l/SIQzadGues7zvPhIC08hl5j8TE++gdSur9yWDfiU3 +mjubn1d7bmgnxzH67Yxj2iwMVV+5dtwHpD3UM8o8fCA1D7KA/UXLzFWoN8/tM6xm +feX+SnihjfchQk4OW1lUL7Yf1HQqexlK8BiD4e+k63MwhpKVXk84fzCa8y1BAgMB +AAEwDQYJKoZIhvcNAQELBQADggIBADtmo5aPCoVwGGTEcM1HDx6le8BehOOBm0Hq +kku21vsHyv+L65PGZ+1/1ilXOhsloaQngorxijXlEchaOJ6It8aev65m1+qovlLr +JGXYMHOpRJ/cKfWDgKPCZ1hgbbtZ2UG4VUQqYklVfqSLPF7oO1PnVKamecQ+9PZp +yPX+lHz+CN7gk1bw2jcjZoi8VLuD1/QQ1rwhRBgkkj+MArjzBaxO996VN54EhHNH +dmGXc5hhun3USXMKniiGot/xUjVus0aSxZNRpScHstpScki36w/tT/e6hHKi59hq +RV09FN5EjlAHRjQKxYruqzTUKG0q/f0Al2asIZ04+VRLbANdcdvNNDcwJRUZiras +rEVxy9lP5LW4NtVHNOjmTZY1AEQTTlkvwo4ZczFVsmMaJ6FTJ6rYIC87sDf5USM/ +Yil0fjq7fO/oAdOVNUjArStaogFUWWuM/EFBeO5U72i2ohPz0rCl1R+GWdrmmdbn +HdjIQB2Kj79Vnd/GnJcV4tZVqYbpBtp1zlWxlWo4sjVV1+45JxvGskWSIb0lVVSq +/HOrKRq54zxFivCUoVR4PQC3a8qdx6o2/COxHfccNdfxBK4Vj2uX/1L2EmbEHl09 +duLdZH7THevaSz25eByX7eTLIRGIIyTbyTgz/RI6G3itW4ZdMG9CwuwTmi/FqO/X +offmjo06 +-----END CERTIFICATE----- diff --git a/test/data/cert.der b/test/data/cert.der new file mode 100644 index 0000000..218b2da --- /dev/null +++ b/test/data/cert.der @@ -0,0 +1,15 @@ +MIIDATCCAemgAwIBAgIEdz2C7zANBgkqhkiG9w0BAQsFADAxMQswCQYDVQQGEwJVUzEOMAwGA1UE +ChMFS2Fma2ExEjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0xNzEyMDgwNDA5MDlaFw0xODEyMDgwNDA5 +MDlaMDExCzAJBgNVBAYTAlVTMQ4wDAYDVQQKEwVLYWZrYTESMBAGA1UEAxMJbG9jYWxob3N0MIIB +IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAlE3difFwZjIOgRtRHY1SKvTbBmn3N+3jfOYE +JUibWZNrEGJcUARRQpFGDs7svp9KLan2D7S2Ss4OvwnD4DmdLMqto6mYx/4Di2zF8GZOci7Qz03v +7pEpgKGzHz77SvpmVYK3lk6M+LvapH6Q6AlPDUieRLc4JWsDoMRDTG4/9lSsZZCP6WyF9yK+Xl0I +PsPvVmEjXzxPq/Lc5+9Cy4M+KIGlaXzGU0zE3xFwdlBUUmwml6SbFRVW7uwZEaCkqX6wZbFb4NQx +1qYGKhz034inrqtLqbFRHCDrGJO+yVparsFnHjbYzck0BLX9Xm2TOVCpavYD7YOc6FHnDlhl9kqQ +eQIDAQABoyEwHzAdBgNVHQ4EFgQUuKYCPpnTs+2pvBRzGEbRR3Bk0TgwDQYJKoZIhvcNAQELBQAD +ggEBABvrkVnc5wu9hjkdW+rEW2b3FqmUPinxvly8fe57OpWElq22jdgVpiaftMB8fDn4NVOdOU2R +QQ1/wG1/5K8CnHNFEvpNVyE7lo7qD5AvG8RXFlX6UoBX682icw6r+3NTDRiTTKJm4fpLT77dsBhK +HfurS8C2qfhcwBmcA3cgrPgrsxs6OxAbKr9HlJQmwYTSqhM2Yd7e06ew2tDeIfUl1GZAAp11/gtF +tumcWqON8L/qKYmJO9SQzexcjVYS3yXwdS0bMv5r/D2jmbFWw8jiqU+BSTE99c/xOxMvLbswuyyy +YhJQ2EQ4Z+ZnlvEDGM5uicSya/OiscxGYtq2BQ0k7xk= + diff --git a/test/data/cert.pem b/test/data/cert.pem new file mode 100644 index 0000000..ddb7289 --- /dev/null +++ b/test/data/cert.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDATCCAemgAwIBAgIEdz2C7zANBgkqhkiG9w0BAQsFADAxMQswCQYDVQQGEwJV +UzEOMAwGA1UEChMFS2Fma2ExEjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0xNzEyMDgw +NDA5MDlaFw0xODEyMDgwNDA5MDlaMDExCzAJBgNVBAYTAlVTMQ4wDAYDVQQKEwVL +YWZrYTESMBAGA1UEAxMJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAlE3difFwZjIOgRtRHY1SKvTbBmn3N+3jfOYEJUibWZNrEGJcUARR +QpFGDs7svp9KLan2D7S2Ss4OvwnD4DmdLMqto6mYx/4Di2zF8GZOci7Qz03v7pEp +gKGzHz77SvpmVYK3lk6M+LvapH6Q6AlPDUieRLc4JWsDoMRDTG4/9lSsZZCP6WyF +9yK+Xl0IPsPvVmEjXzxPq/Lc5+9Cy4M+KIGlaXzGU0zE3xFwdlBUUmwml6SbFRVW +7uwZEaCkqX6wZbFb4NQx1qYGKhz034inrqtLqbFRHCDrGJO+yVparsFnHjbYzck0 +BLX9Xm2TOVCpavYD7YOc6FHnDlhl9kqQeQIDAQABoyEwHzAdBgNVHQ4EFgQUuKYC +PpnTs+2pvBRzGEbRR3Bk0TgwDQYJKoZIhvcNAQELBQADggEBABvrkVnc5wu9hjkd +W+rEW2b3FqmUPinxvly8fe57OpWElq22jdgVpiaftMB8fDn4NVOdOU2RQQ1/wG1/ +5K8CnHNFEvpNVyE7lo7qD5AvG8RXFlX6UoBX682icw6r+3NTDRiTTKJm4fpLT77d +sBhKHfurS8C2qfhcwBmcA3cgrPgrsxs6OxAbKr9HlJQmwYTSqhM2Yd7e06ew2tDe +IfUl1GZAAp11/gtFtumcWqON8L/qKYmJO9SQzexcjVYS3yXwdS0bMv5r/D2jmbFW +w8jiqU+BSTE99c/xOxMvLbswuyyyYhJQ2EQ4Z+ZnlvEDGM5uicSya/OiscxGYtq2 +BQ0k7xk= +-----END CERTIFICATE----- + diff --git a/test/data/key.der b/test/data/key.der new file mode 100644 index 0000000..b152686 --- /dev/null +++ b/test/data/key.der @@ -0,0 +1,22 @@ +TAyeJz2yJV6dXrCcLSg82S+Vpe4v5x2g7+cL5X7VR++NlBACRK8UPrfoJUqt/c6tX+bvgAqqzySn +f1v6n8ORhK0W7nYFdzK04w8YzbFOHm2wgtgwM2Im3jDKzmSBjpLELl3QyjBGcn6JKTQCa9GzA2DR +rD9L7YY54/WQKWM7YCkmIZq+OMCnEYx3houNOM7VlMdOPLJrhph9CRZqwVYYR2Ibu8yA63wOX2bv +hqAcHHp93q/aqq3L8oBDqiCuau8EnXXrXYOx2jKovGTO5cdKTp7o63h4ABx9yelwkxWRQjPpHob0 +4vFE5UCAiUotYCW0JNTeH3X+EWPPXBSLmSLR+Pb9/teLsLvNoy5LzlPps4oBZmzwbpp3+pEvmly+ +EeIxcpMw4zhvWyr3pGPfAVfJcCGl68idR/YtFTxwuSCFKUQG9LccAIbEBTeJGabTaDVSmucLfxom +GsYnujwAXsacC1uEAQdCdl/0eB64VQqWayyQypL1nAimrvdjJXjhFEFyvKciYd2kmJfJVI1jixEW +QsmVdZmCi86ImyzWik9eQT2AvtA8Sn2nZFUXzlZ086c22UVd3NNVn+LcZWHrT55CzPij5i4HradS +P+eoWtsQwPhQb4W42f/v2tahSxDe6h2vvGabdfjE91E5wIhmhDMmjTOLGPi2Ckm61GmbqE74ZKOQ +9uQtWVmSfUPsYmCn0iFo0hjpEqRu/8Q0KhRiZWS85CpuK2wtBLaGEVduqHvjBgO21e81NGrTZkkL +tr3H+SMXhI4iOcJIytWDF9yDBGrUjBd3in3T5Jq2HvAh+mWssPKA9yRMF28nIuur9fY2UgMNrDMD +watev2GlyV8JrhtfAyhVl0ZJuqp9IVcusemklCiwPBn+OxFP/gayCdA40kxaDwOQa9iYy+fVMyuP +f+9A3cw5hZ8M5DAE78g6S+vICB+JpPt2XA/6GfoV69xZFmJLl9cKyacFNjomnKZ8C77glVgSOvnu +EwWAoOYTkNFHH5gLAhoKErcnmvfoFMiBRuzhGAmKYKYDSYNXblgvAErOyCsKnEk17B+Q71Us/uvY +F/lSnMPlTS7n1SONJbiEGQvoIGU7JriWqtI+yg9joec1TFGE9r+7GF1FaUQy9JliXWcTz1+Qovj0 +yrCOw6JyaIucYRstzNZINbwhlLC8cp+ipNsT23vvixaAXthqmQlpgSaVWggDS3Oz8/dSkPoFctKC +EHfb5zA7MeJnVVV6N2KWuqFvdgMwpOmBwG0qEBS0jiDyMsisf/7ri6mztUC8YFYF9b3XPU3ZQ2c1 +ozdzC08Gyf6yieNKfPHcGi+Lf/UN5Stx+jLjVl7kAybTj6kcQY04RdG7tkjHR69lKLbzMj+Y5a3c +U3vrC3Nc6hC2yrCz3lSU6Hwxq0VCs2hZugFbBvPEaoHki/qQxv35OnyCFWvs+qNitEgBSS4OBirQ +zGD37GMwX9rzXRYapSiLi2SclTrcTqsFCJXElqsnxlg8pp+KX9pyHagPSPuDaE7/ZOsVzgzBDtMR +dL6ohytKXc29umTc3BCIb9vPzsx10Dw8+/pf/xgSAwny4/DJlRgivKx2SFKO3wx5C131JsO9QLTO +ceb9zAv9m+mAT6ER9e1wLGy0L/mK5TqXTc3G diff --git a/test/data/key.pem b/test/data/key.pem new file mode 100644 index 0000000..51fc857 --- /dev/null +++ b/test/data/key.pem @@ -0,0 +1,30 @@ +-----BEGIN ENCRYPTED PRIVATE KEY----- +MIIFDjBABgkqhkiG9w0BBQ0wMzAbBgkqhkiG9w0BBQwwDgQImLd1sCBnPeUCAggA +MBQGCCqGSIb3DQMHBAjNJxIKccy26ASCBMhMDJ4nPbIlXp1esJwtKDzZL5Wl7i/n +HaDv5wvlftVH742UEAJErxQ+t+glSq39zq1f5u+ACqrPJKd/W/qfw5GErRbudgV3 +MrTjDxjNsU4ebbCC2DAzYibeMMrOZIGOksQuXdDKMEZyfokpNAJr0bMDYNGsP0vt +hjnj9ZApYztgKSYhmr44wKcRjHeGi404ztWUx048smuGmH0JFmrBVhhHYhu7zIDr +fA5fZu+GoBwcen3er9qqrcvygEOqIK5q7wSddetdg7HaMqi8ZM7lx0pOnujreHgA +HH3J6XCTFZFCM+kehvTi8UTlQICJSi1gJbQk1N4fdf4RY89cFIuZItH49v3+14uw +u82jLkvOU+mzigFmbPBumnf6kS+aXL4R4jFykzDjOG9bKvekY98BV8lwIaXryJ1H +9i0VPHC5IIUpRAb0txwAhsQFN4kZptNoNVKa5wt/GiYaxie6PABexpwLW4QBB0J2 +X/R4HrhVCpZrLJDKkvWcCKau92MleOEUQXK8pyJh3aSYl8lUjWOLERZCyZV1mYKL +zoibLNaKT15BPYC+0DxKfadkVRfOVnTzpzbZRV3c01Wf4txlYetPnkLM+KPmLget +p1I/56ha2xDA+FBvhbjZ/+/a1qFLEN7qHa+8Zpt1+MT3UTnAiGaEMyaNM4sY+LYK +SbrUaZuoTvhko5D25C1ZWZJ9Q+xiYKfSIWjSGOkSpG7/xDQqFGJlZLzkKm4rbC0E +toYRV26oe+MGA7bV7zU0atNmSQu2vcf5IxeEjiI5wkjK1YMX3IMEatSMF3eKfdPk +mrYe8CH6Zayw8oD3JEwXbyci66v19jZSAw2sMwPBq16/YaXJXwmuG18DKFWXRkm6 +qn0hVy6x6aSUKLA8Gf47EU/+BrIJ0DjSTFoPA5Br2JjL59UzK49/70DdzDmFnwzk +MATvyDpL68gIH4mk+3ZcD/oZ+hXr3FkWYkuX1wrJpwU2OiacpnwLvuCVWBI6+e4T +BYCg5hOQ0UcfmAsCGgoStyea9+gUyIFG7OEYCYpgpgNJg1duWC8ASs7IKwqcSTXs +H5DvVSz+69gX+VKcw+VNLufVI40luIQZC+ggZTsmuJaq0j7KD2Oh5zVMUYT2v7sY +XUVpRDL0mWJdZxPPX5Ci+PTKsI7DonJoi5xhGy3M1kg1vCGUsLxyn6Kk2xPbe++L +FoBe2GqZCWmBJpVaCANLc7Pz91KQ+gVy0oIQd9vnMDsx4mdVVXo3Ypa6oW92AzCk +6YHAbSoQFLSOIPIyyKx//uuLqbO1QLxgVgX1vdc9TdlDZzWjN3MLTwbJ/rKJ40p8 +8dwaL4t/9Q3lK3H6MuNWXuQDJtOPqRxBjThF0bu2SMdHr2UotvMyP5jlrdxTe+sL +c1zqELbKsLPeVJTofDGrRUKzaFm6AVsG88RqgeSL+pDG/fk6fIIVa+z6o2K0SAFJ +Lg4GKtDMYPfsYzBf2vNdFhqlKIuLZJyVOtxOqwUIlcSWqyfGWDymn4pf2nIdqA9I ++4NoTv9k6xXODMEO0xF0vqiHK0pdzb26ZNzcEIhv28/OzHXQPDz7+l//GBIDCfLj +8MmVGCK8rHZIUo7fDHkLXfUmw71AtM5x5v3MC/2b6YBPoRH17XAsbLQv+YrlOpdN +zcY= +-----END ENCRYPTED PRIVATE KEY----- diff --git a/test/kaffe/config_test.exs b/test/kaffe/config_test.exs index 838dddb..11a9c8d 100644 --- a/test/kaffe/config_test.exs +++ b/test/kaffe/config_test.exs @@ -1,29 +1,79 @@ defmodule Kaffe.ConfigTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false + + describe "endpoints/1" do + test "parses urls to correct format" do + assert Kaffe.Config.parse_kafka_url("kafka+ssl://192.168.1.100:9096") == {:"192.168.1.100", 9096} + assert Kaffe.Config.parse_kafka_url("kafka+ssl://192.168.1.100") == {:"192.168.1.100", 9093} + assert Kaffe.Config.parse_kafka_url("kafka+ssl://kafka.example.com") == {:"kafka.example.com", 9093} + + assert Kaffe.Config.parse_kafka_url("kafka://192.168.1.100:9096") == {:"192.168.1.100", 9096} + assert Kaffe.Config.parse_kafka_url("kafka://192.168.1.100") == {:"192.168.1.100", 9092} + + assert Kaffe.Config.parse_kafka_urls(nil) == nil - describe "heroku_kafka_endpoints/1" do - test "transforms endpoints into the correct format" do kafka_url = "kafka+ssl://192.168.1.100:9096,kafka+ssl://192.168.1.101:9096,kafka+ssl://192.168.1.102:9096" expected = [{:"192.168.1.100", 9096}, {:"192.168.1.101", 9096}, {:"192.168.1.102", 9096}] + assert Kaffe.Config.parse_kafka_urls(kafka_url) == expected + end - assert Kaffe.Config.heroku_kafka_endpoints(kafka_url) == expected + test "gets endpoint data properly" do + assert Kaffe.Config.endpoints([endpoints: [kafka: 9096]]) == [kafka: 9096] end - end - describe "ssl_config/2" do - test "ssl_config returns an empty list when cert and key are nil" do - assert Kaffe.Config.ssl_config(nil, nil) == [] + test "gets endpoint data from OS environment" do + :meck.new(System) + :meck.expect(System, :get_env, fn("KAFKA_URL") -> + "kafka+ssl://192.168.1.100:9096,kafka+ssl://192.168.1.101:9096,kafka+ssl://192.168.1.102:9096" + end) + + assert Kaffe.Config.endpoints([]) == [{:"192.168.1.100", 9096}, {:"192.168.1.101", 9096}, {:"192.168.1.102", 9096}] + + :meck.unload() end + end + + describe "ssl/1" do + test "ssl returns Erlang SSL module config from OS environment vars" do + cert_pem = File.read!("./test/data/cert.pem") + # System.put_env("KAFKA_CLIENT_CERT", cert_pem) - test "ssl_config returns Erlang SSL module config with cert and key" do - client_cert = "not really a cert" - client_cert_key = "not really a cert key" - assert Kaffe.Config.ssl_config(client_cert, client_cert_key) == [ + key_pem = File.read!("./test/data/key.pem") + # System.put_env("KAFKA_CLIENT_CERT_KEY", key_pem) + + ca_pem = File.read!("./test/data/ca.pem") + # System.put_env("KAFKA_TRUSTED_CERT", ca_pem) + + cert_der_base64 = File.read!("./test/data/cert.der") + {:ok, cert_der} = Base.decode64(cert_der_base64, ignore: :whitespace) + + key_der_base64 = File.read!("./test/data/key.der") + {:ok, key_der} = Base.decode64(key_der_base64, ignore: :whitespace) + + ca_der_base64 = File.read!("./test/data/ca.der") + {:ok, ca_der} = Base.decode64(ca_der_base64, ignore: :whitespace) + + assert Kaffe.Config.ssl_cert_option(cert_pem) == [cert: cert_der] + assert Kaffe.Config.ssl_key_option(key_pem) == [key: {:"PrivateKeyInfo", key_der}] + assert Kaffe.Config.ssl_cacerts_option(ca_pem) == [cacerts: [ca_der]] + + :meck.new(System) + :meck.expect(System, :get_env, + fn + ("KAFKA_CLIENT_CERT") -> cert_pem + ("KAFKA_CLIENT_CERT_KEY") -> key_pem + ("KAFKA_TRUSTED_CERT") -> ca_pem + end) + + assert Kaffe.Config.ssl([]) == [ ssl: [ - cert: client_cert, - key: client_cert_key, + cert: cert_der, + key: {:"PrivateKeyInfo", key_der}, + cacerts: [ca_der], ] ] + + :meck.unload() end end end From 41482a6dea07155c96c3f029fddbd71028e2c07a Mon Sep 17 00:00:00 2001 From: Jake Morrison Date: Fri, 8 Dec 2017 21:32:57 +0800 Subject: [PATCH 5/5] Format markdown --- README.md | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index ebecf62..28d9134 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,6 @@ Batch message consumers receive a list of messages and work as part of the `:bro The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted then your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages. - ### Heroku Configuration To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables: @@ -254,12 +253,12 @@ You can also set any of the Brod producer configuration options in the `producer ### Heroku Configuration - To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables: +To configure a Kaffe Consumer for a Heroku Kafka compatible environment, set the following OS environment variables: - - `KAFKA_URL`: `kafka://host:port` (default port 9092) or `kafka+ssl://host:port` (default port 9093) - - `KAFKA_CLIENT_CERT`: PEM encoded client certificate - - `KAFKA_CLIENT_CERT_KEY`: PEM encoded client private key - - `KAFKA_TRUSTED_CERT`: PEM encoded CA cert(s) +- `KAFKA_URL`: `kafka://host:port` (default port 9092) or `kafka+ssl://host:port` (default port 9093) +- `KAFKA_CLIENT_CERT`: PEM encoded client certificate +- `KAFKA_CLIENT_CERT_KEY`: PEM encoded client private key +- `KAFKA_TRUSTED_CERT`: PEM encoded CA cert(s) ### Producing to Kafka