Skip to content

Commit c83632e

Browse files
committed
initial client lb implementation
1 parent 4feee64 commit c83632e

File tree

10 files changed

+213
-3
lines changed

10 files changed

+213
-3
lines changed

lib/grpc/client/resolver.ex

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
defmodule GRPC.Client.Resolver do
2+
@moduledoc """
3+
Behaviour for gRPC client resolvers.
4+
"""
5+
@type service_config :: GRPC.Client.ServiceConfig.t() | nil
6+
7+
@callback resolve(String.t()) ::
8+
{:ok, %{addresses: list(map()), service_config: service_config()}}
9+
| {:error, term()}
10+
11+
@behaviour GRPC.Client.Resolver
12+
13+
@doc """
14+
Resolves a gRPC target string into a list of connection endpoints and an optional ServiceConfig.
15+
16+
The `target` string can use one of the supported URI schemes:
17+
18+
* `dns://[authority/]host[:port]` – resolves via DNS; looks up both A/AAAA records and optional `_grpc_config.<host>` TXT record.
19+
* `ipv4:addr[:port][,addr[:port],...]` – uses a fixed list of IPv4 addresses.
20+
* `ipv6:[addr][:port][,[addr][:port],...]` – uses a fixed list of IPv6 addresses.
21+
* `unix:/absolute_path` – connects via Unix domain socket.
22+
* `unix-abstract:name` – connects via abstract Unix socket (Linux only).
23+
* `vsock:cid:port` – connects via VSOCK (Linux only).
24+
* `xds:///name` – resolves via xDS control plane (Envoy/Istio/Traffic Director).
25+
26+
If no scheme is specified, `dns` is assumed. Default ports:
27+
28+
* `dns`, `ipv4`, `ipv6` → 50051
29+
* `xds` → 443
30+
31+
Returns:
32+
33+
* `{:ok, %{addresses: list(map()), service_config: GRPC.Client.ServiceConfig.t() | nil}}` on success
34+
* `{:error, reason}` on failure
35+
36+
Each `address` map includes at least:
37+
38+
* `:address` – host, IP, or socket path
39+
* `:port` – TCP port (if applicable)
40+
* additional fields may be present depending on the scheme (e.g., `:socket`, `:cid` for vsock).
41+
42+
This function abstracts the resolution mechanism, allowing the gRPC client to obtain endpoints and service configuration regardless of the underlying target type.
43+
"""
44+
@impl GRPC.Client.Resolver
45+
@spec resolve(String.t()) ::
46+
{:ok, %{addresses: list(map()), service_config: GRPC.Client.ServiceConfig.t()}}
47+
| {:error, term()}
48+
def resolve(target) do
49+
uri = URI.parse(target)
50+
scheme = uri.scheme || "dns"
51+
52+
case scheme do
53+
"dns" ->
54+
GRPC.Client.Resolver.DNS.resolve(target)
55+
56+
"ipv4" ->
57+
GRPC.Client.Resolver.IPv4.resolve(target)
58+
59+
"ipv6" ->
60+
GRPC.Client.Resolver.IPv6.resolve(target)
61+
62+
"unix" ->
63+
GRPC.Client.Resolver.Unix.resolve(target)
64+
65+
"xds" ->
66+
GRPC.Client.Resolver.XDS.resolve(target)
67+
68+
_ ->
69+
{:error, {:unknown_scheme, scheme}}
70+
end
71+
end
72+
end

lib/grpc/client/resolver/dns.ex

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
defmodule GRPC.Client.Resolver.DNS do
2+
@behaviour GRPC.Client.Resolver
3+
4+
alias GRPC.Client.ServiceConfig
5+
6+
@impl GRPC.Client.Resolver
7+
def resolve(target) do
8+
uri = URI.parse(target)
9+
host = uri.host || target
10+
port = uri.port || 50051
11+
12+
# resolve A/AAAA
13+
{:ok, addresses} = :inet_res.lookup(String.to_charlist(host), :in, :a)
14+
15+
addrs =
16+
Enum.map(addresses, fn ip ->
17+
%{address: :inet.ntoa(ip) |> to_string(), port: port}
18+
end)
19+
20+
# tries to resolve TXT to service config
21+
service_config_json =
22+
case :inet_res.lookup(~c"_grpc_config." ++ String.to_charlist(host), :in, :txt) do
23+
[txt | _] ->
24+
str = List.to_string(txt)
25+
26+
# TXT may have prefix "grpc_config="
27+
case String.split(str, "grpc_config=") do
28+
[_, json] -> json
29+
_ -> nil
30+
end
31+
32+
_ ->
33+
nil
34+
end
35+
36+
{:ok, %{addresses: addrs, service_config: ServiceConfig.parse(service_config_json)}}
37+
end
38+
end

lib/grpc/client/resolver/ipv4.ex

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
defmodule GRPC.Client.Resolver.IPv4 do
2+
@behaviour GRPC.Client.Resolver
3+
4+
@impl GRPC.Client.Resolver
5+
def resolve(target) do
6+
# target exemplo: "ipv4:10.0.0.1:50051,10.0.0.2:50052"
7+
[_scheme, addrs_str] = String.split(target, ":", parts: 2)
8+
9+
addresses =
10+
String.split(addrs_str, ",")
11+
|> Enum.map(fn addr ->
12+
[ip, port] = String.split(addr, ":")
13+
%{address: ip, port: String.to_integer(port)}
14+
end)
15+
16+
{:ok, %{addresses: addresses, service_config: nil}}
17+
end
18+
end

lib/grpc/client/resolver/ipv6.ex

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
defmodule GRPC.Client.Resolver.IPv6 do
2+
@behaviour GRPC.Client.Resolver
3+
4+
@impl GRPC.Client.Resolver
5+
def resolve(target) do
6+
# target example: "ipv6:[2607:f8b0:400e:c00::ef]:443,[::1]:50051"
7+
[_scheme, addrs_str] = String.split(target, ":", parts: 2)
8+
9+
addresses =
10+
String.split(addrs_str, ",")
11+
|> Enum.map(fn entry ->
12+
[ip, port] =
13+
case Regex.run(~r/\[(.*?)\]:(\d+)/, entry) do
14+
[_, ip, port] -> [ip, port]
15+
_ -> [entry, "443"]
16+
end
17+
18+
%{address: ip, port: String.to_integer(port)}
19+
end)
20+
21+
{:ok, %{addresses: addresses, service_config: nil}}
22+
end
23+
end

lib/grpc/client/resolver/unix.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
defmodule GRPC.Client.Resolver.Unix do
2+
@behaviour GRPC.Client.Resolver
3+
4+
@impl GRPC.Client.Resolver
5+
def resolve(target) do
6+
# E.g.: "unix:///var/run/my.sock"
7+
uri = URI.parse(target)
8+
path = uri.path
9+
10+
{:ok, %{addresses: [%{address: path, port: nil, socket: :unix}], service_config: nil}}
11+
end
12+
end

lib/grpc/client/resolver/xds.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
defmodule GRPC.Client.Resolver.XDS do
2+
@behaviour GRPC.Client.Resolver
3+
4+
@impl GRPC.Client.Resolver
5+
def resolve(_target) do
6+
# E.g.: "xds:///myservice"
7+
{:error, :not_implemented}
8+
end
9+
end

lib/grpc/client/service_config.ex

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
defmodule GRPC.Client.ServiceConfig do
2+
@moduledoc """
3+
Represents the gRPC ServiceConfig parsed from JSON (from DNS TXT or xDS).
4+
"""
5+
6+
defstruct load_balancing_policy: :pick_first,
7+
method_configs: [],
8+
raw: %{}
9+
10+
@type t :: %__MODULE__{
11+
load_balancing_policy: atom(),
12+
method_configs: list(),
13+
raw: map()
14+
}
15+
16+
def parse(nil), do: {:ok, %__MODULE__{}}
17+
18+
def parse(json) when is_binary(json) do
19+
case Jason.decode(json) do
20+
{:ok, map} -> {:ok, from_map(map)}
21+
error -> error
22+
end
23+
end
24+
25+
defp from_map(map) do
26+
lb =
27+
map
28+
|> Map.get("loadBalancingPolicy", "pick_first")
29+
|> String.downcase()
30+
|> String.to_existing_atom()
31+
32+
%__MODULE__{
33+
load_balancing_policy: lb,
34+
method_configs: Map.get(map, "methodConfig", []),
35+
raw: map
36+
}
37+
end
38+
end

lib/grpc/server/supervisor.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ defmodule GRPC.Server.Supervisor do
5555
end
5656

5757
def init(opts) when is_list(opts) do
58-
unless is_nil(Application.get_env(:grpc, :start_server)) do
58+
if not is_nil(Application.get_env(:grpc, :start_server)) do
5959
raise "the :start_server config key has been deprecated.\
6060
The currently supported way is to configure it\
6161
through the :start_server option for the GRPC.Server.Supervisor"

lib/grpc/stream.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ defmodule GRPC.Stream do
148148
"""
149149
@spec run(t()) :: any()
150150
def run(%__MODULE__{flow: flow, options: opts}) do
151-
unless Keyword.get(opts, :unary, false) do
151+
if !Keyword.get(opts, :unary, false) do
152152
raise ArgumentError, "run/2 is not supported for non-unary streams"
153153
end
154154

lib/grpc/stub.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ defmodule GRPC.Stub do
231231

232232
adapter_opts = opts[:adapter_opts]
233233

234-
unless is_list(adapter_opts) do
234+
if not is_list(adapter_opts) do
235235
raise ArgumentError, ":adapter_opts must be a keyword list if present"
236236
end
237237

0 commit comments

Comments
 (0)