diff --git a/README.md b/README.md index 01d6b98d8..6b2f2bc74 100644 --- a/README.md +++ b/README.md @@ -19,15 +19,6 @@ - [Bidirectional Streaming](#bidirectional-streaming) - [Application Startup](#application-startup) - [Client Usage](#client-usage) - - [Basic Connection and RPC](#basic-connection-and-rpc) - - [Using Interceptors](#using-interceptors) - - [Target Schemes and Resolvers](#target-schemes-and-resolvers) - - [Supported formats](#supported-formats) - - [Example (DNS)](#example-dns) - - [Example (Unix socket)](#example-unix-socket) - - [Compression and Metadata](#compression-and-metadata) - - [Client Adapters](#client-adapters) - - [Using Mint Adapter](#using-mint-adapter) - [HTTP Transcoding](#http-transcoding) - [CORS](#cors) - [Features](#features) @@ -87,8 +78,6 @@ protoc --elixir_out=plugins=grpc:./lib -I./priv/protos helloworld.proto All RPC calls must be implemented using the stream-based API, even for unary requests. ->__NOTE__: The old API was deprecated based on `GRPC.Server.send_reply/2` and direct `struct` returns was deprecated as of version `0.10.x`. - ### Unary RPC using Stream API ```elixir @@ -144,10 +133,7 @@ def say_bid_stream_hello(request, materializer) do |> GRPC.Stream.run_with(materializer) end ``` - -The Stream API supports composable stream transformations via `ask`, `map`, `run` and other functions, enabling clean and declarative stream pipelines. For a complete list of available operators and detailed documentation, see [`GRPC.Stream`](lib/grpc/stream.ex). - ---- +__πŸ’‘__ The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. For a complete list of available operators see [here](lib/grpc/stream.ex). ## Application Startup @@ -180,106 +166,38 @@ end # Client Usage -This section demonstrates how to establish client connections and perform RPC calls using the Elixir gRPC client. - ---- - -## Basic Connection and RPC - ```elixir iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051") iex> request = Helloworld.HelloRequest.new(name: "grpc-elixir") iex> {:ok, reply} = channel |> Helloworld.GreetingServer.Stub.say_unary_hello(request) -``` - ---- - -## Using Interceptors - -Client interceptors allow you to add logic to the request/response lifecycle, such as logging, tracing, or authentication. - -```elixir -iex> {:ok, channel} = -...> GRPC.Stub.connect("localhost:50051", -...> interceptors: [GRPC.Client.Interceptors.Logger] -...> ) -iex> request = Helloworld.HelloRequest.new(name: "Alice") -iex> {:ok, reply} = channel |> Helloworld.GreetingServer.Stub.say_unary_hello(request) -``` - ---- - -## Target Schemes and Resolvers -The `connect/2` function supports URI-like targets that are resolved via the internal **gRPC** [Resolver](lib/grpc/client/resolver.ex). -You can connect using `DNS`, `Unix Domain sockets`, `IPv4/IPv6`, or even `xDS-based endpoints`. - -### Supported formats: - -| Scheme | Example | Description | -|:----------|:----------------------------|:---------------------------------------------| -| `dns://` | `"dns://example.com:50051"` | Resolves via DNS `A/AAAA` records | -| `ipv4:` | `"ipv4:10.0.0.5:50051"` | Connects directly to an IPv4 address | -| `unix:` | `"unix:/tmp/service.sock"` | Connects via a Unix domain socket | -| `xds:///` | `"xds:///my-service"` | Resolves via xDS control plane (Envoy/Istio) | -| none | `"127.0.0.1:50051"` | Implicit DNS (default port `50051`) | - -### Example (DNS): - -```elixir -iex> {:ok, channel} = GRPC.Stub.connect("dns://orders.prod.svc.cluster.local:50051") -iex> request = Orders.GetOrderRequest.new(id: "123") -iex> {:ok, reply} = channel |> Orders.OrderService.Stub.get_order(request) -``` - -### Example (Unix socket): - -```elixir -iex> {:ok, channel} = GRPC.Stub.connect("unix:/tmp/my.sock") -``` - ->__NOTE__: When using `DNS` or `xDS` targets, the connection layer periodically refreshes endpoints. ---- - -## Compression and Metadata - -You can specify message compression and attach default headers to all requests. - -```elixir -iex> {:ok, channel} = -...> GRPC.Stub.connect("localhost:50051", -...> compressor: GRPC.Compressor.Gzip, -...> headers: [{"authorization", "Bearer my-token"}] -...> ) +# With interceptors +iex> {:ok, channel} = GRPC.Stub.connect("localhost:50051", interceptors: [GRPC.Client.Interceptors.Logger]) +... ``` ---- - -## Client Adapters +Check the [examples](examples) and [interop](interop) directories in the project's source code for some examples. -By default, `GRPC.Stub.connect/2` uses the **Gun** adapter. -You can switch to **Mint** (pure Elixir HTTP/2) or other adapters as needed. +## Client Adapter and Configuration -### Using Mint Adapter +The default adapter used by `GRPC.Stub.connect/2` is `GRPC.Client.Adapter.Gun`. Another option is to use `GRPC.Client.Adapters.Mint` instead, like so: ```elixir -iex> GRPC.Stub.connect("localhost:50051", -...> adapter: GRPC.Client.Adapters.Mint -...> ) +GRPC.Stub.connect("localhost:50051", + # Use Mint adapter instead of default Gun + adapter: GRPC.Client.Adapters.Mint +) ``` -You can configure adapter options globally via your application’s config: +The `GRPC.Client.Adapters.Mint` adapter accepts custom configuration. To do so, you can configure it from your mix application via: ```elixir -# File: config/config.exs -config :grpc, GRPC.Client.Adapters.Mint, - timeout: 10_000, - transport_opts: [cacertfile: "/etc/ssl/certs/ca-certificates.crt"] +# File: your application's config file. +config :grpc, GRPC.Client.Adapters.Mint, custom_opts ``` -The accepted options are the same as [`Mint.HTTP.connect/4`](https://hexdocs.pm/mint/Mint.HTTP.html#connect/4-options). +The accepted options for configuration are the ones listed on [Mint.HTTP.connect/4](https://hexdocs.pm/mint/Mint.HTTP.html#connect/4-options) ---- ### **HTTP Transcoding** @@ -346,7 +264,7 @@ defmodule Helloworld.Greeter.Server do end ``` -See full application code in [helloworld_transcoding](https://github.com/elixir-grpc/tree/master/examples/helloworld_transcoding) example. +See full application code in [helloworld_transcoding](examples/helloworld_transcoding) example. ### **CORS** @@ -382,9 +300,9 @@ end ## Benchmark -1. [Simple benchmark](https://github.com/elixir-grpc/tree/master/examples/helloworld/README.md#Benchmark) by using [ghz](https://ghz.sh/) +1. [Simple benchmark](examples/helloworld/README.md#Benchmark) by using [ghz](https://ghz.sh/) -2. [Benchmark](https://github.com/elixir-grpc/tree/master/benchmark) followed by official spec +2. [Benchmark](benchmark) followed by official spec ## Contributing diff --git a/benchmark/lib/grpc/core/stats.pb.ex b/benchmark/lib/grpc/core/stats.pb.ex index 8fbed127d..d63472627 100644 --- a/benchmark/lib/grpc/core/stats.pb.ex +++ b/benchmark/lib/grpc/core/stats.pb.ex @@ -1,7 +1,7 @@ defmodule Grpc.Core.Bucket do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :start, 1, type: :double field :count, 2, type: :uint64 @@ -10,7 +10,7 @@ end defmodule Grpc.Core.Histogram do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :buckets, 1, repeated: true, type: Grpc.Core.Bucket end @@ -18,7 +18,7 @@ end defmodule Grpc.Core.Metric do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 oneof :value, 0 @@ -30,7 +30,7 @@ end defmodule Grpc.Core.Stats do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :metrics, 1, repeated: true, type: Grpc.Core.Metric end diff --git a/benchmark/lib/grpc/testing/benchmark_service.pb.ex b/benchmark/lib/grpc/testing/benchmark_service.pb.ex index 8d3622dc8..8b1378917 100644 --- a/benchmark/lib/grpc/testing/benchmark_service.pb.ex +++ b/benchmark/lib/grpc/testing/benchmark_service.pb.ex @@ -1,21 +1 @@ -defmodule Grpc.Testing.BenchmarkService.Service do - @moduledoc false - use GRPC.Service, name: "grpc.testing.BenchmarkService", protoc_gen_elixir_version: "0.14.0" - - rpc :UnaryCall, Grpc.Testing.SimpleRequest, Grpc.Testing.SimpleResponse - - rpc :StreamingCall, stream(Grpc.Testing.SimpleRequest), stream(Grpc.Testing.SimpleResponse) - - rpc :StreamingFromClient, stream(Grpc.Testing.SimpleRequest), Grpc.Testing.SimpleResponse - - rpc :StreamingFromServer, Grpc.Testing.SimpleRequest, stream(Grpc.Testing.SimpleResponse) - - rpc :StreamingBothWays, stream(Grpc.Testing.SimpleRequest), stream(Grpc.Testing.SimpleResponse) -end - -defmodule Grpc.Testing.BenchmarkService.Stub do - @moduledoc false - - use GRPC.Stub, service: Grpc.Testing.BenchmarkService.Service -end diff --git a/benchmark/lib/grpc/testing/control.pb.ex b/benchmark/lib/grpc/testing/control.pb.ex index 499842e3e..9687a0f78 100644 --- a/benchmark/lib/grpc/testing/control.pb.ex +++ b/benchmark/lib/grpc/testing/control.pb.ex @@ -1,7 +1,7 @@ defmodule Grpc.Testing.ClientType do @moduledoc false - use Protobuf, enum: true, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, enum: true, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :SYNC_CLIENT, 0 field :ASYNC_CLIENT, 1 @@ -11,7 +11,7 @@ end defmodule Grpc.Testing.ServerType do @moduledoc false - use Protobuf, enum: true, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, enum: true, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :SYNC_SERVER, 0 field :ASYNC_SERVER, 1 @@ -22,7 +22,7 @@ end defmodule Grpc.Testing.RpcType do @moduledoc false - use Protobuf, enum: true, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, enum: true, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :UNARY, 0 field :STREAMING, 1 @@ -34,7 +34,7 @@ end defmodule Grpc.Testing.PoissonParams do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :offered_load, 1, type: :double, json_name: "offeredLoad" end @@ -42,13 +42,13 @@ end defmodule Grpc.Testing.ClosedLoopParams do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 end defmodule Grpc.Testing.LoadParams do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 oneof :load, 0 @@ -59,7 +59,7 @@ end defmodule Grpc.Testing.SecurityParams do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :use_test_ca, 1, type: :bool, json_name: "useTestCa" field :server_host_override, 2, type: :string, json_name: "serverHostOverride" @@ -69,7 +69,7 @@ end defmodule Grpc.Testing.ChannelArg do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 oneof :value, 0 @@ -81,7 +81,7 @@ end defmodule Grpc.Testing.ClientConfig do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :server_targets, 1, repeated: true, type: :string, json_name: "serverTargets" field :client_type, 2, type: Grpc.Testing.ClientType, json_name: "clientType", enum: true @@ -105,7 +105,7 @@ end defmodule Grpc.Testing.ClientStatus do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :stats, 1, type: Grpc.Testing.ClientStats end @@ -113,7 +113,7 @@ end defmodule Grpc.Testing.Mark do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :reset, 1, type: :bool end @@ -121,7 +121,7 @@ end defmodule Grpc.Testing.ClientArgs do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 oneof :argtype, 0 @@ -132,7 +132,7 @@ end defmodule Grpc.Testing.ServerConfig do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :server_type, 1, type: Grpc.Testing.ServerType, json_name: "serverType", enum: true field :security_params, 2, type: Grpc.Testing.SecurityParams, json_name: "securityParams" @@ -154,7 +154,7 @@ end defmodule Grpc.Testing.ServerArgs do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 oneof :argtype, 0 @@ -165,7 +165,7 @@ end defmodule Grpc.Testing.ServerStatus do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :stats, 1, type: Grpc.Testing.ServerStats field :port, 2, type: :int32 @@ -175,13 +175,13 @@ end defmodule Grpc.Testing.CoreRequest do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 end defmodule Grpc.Testing.CoreResponse do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :cores, 1, type: :int32 end @@ -189,13 +189,13 @@ end defmodule Grpc.Testing.Void do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 end defmodule Grpc.Testing.Scenario do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :name, 1, type: :string field :client_config, 2, type: Grpc.Testing.ClientConfig, json_name: "clientConfig" @@ -210,7 +210,7 @@ end defmodule Grpc.Testing.Scenarios do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :scenarios, 1, repeated: true, type: Grpc.Testing.Scenario end @@ -218,7 +218,7 @@ end defmodule Grpc.Testing.ScenarioResultSummary do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :qps, 1, type: :double field :qps_per_server_core, 2, type: :double, json_name: "qpsPerServerCore" @@ -247,7 +247,7 @@ end defmodule Grpc.Testing.ScenarioResult do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :scenario, 1, type: Grpc.Testing.Scenario field :latencies, 2, type: Grpc.Testing.HistogramData diff --git a/benchmark/lib/grpc/testing/messages.pb.ex b/benchmark/lib/grpc/testing/messages.pb.ex index ad5a2c1e2..cb21c1aca 100644 --- a/benchmark/lib/grpc/testing/messages.pb.ex +++ b/benchmark/lib/grpc/testing/messages.pb.ex @@ -1,7 +1,7 @@ defmodule Grpc.Testing.PayloadType do @moduledoc false - use Protobuf, enum: true, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, enum: true, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :COMPRESSABLE, 0 end @@ -9,7 +9,7 @@ end defmodule Grpc.Testing.BoolValue do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :value, 1, type: :bool end @@ -17,7 +17,7 @@ end defmodule Grpc.Testing.Payload do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :type, 1, type: Grpc.Testing.PayloadType, enum: true field :body, 2, type: :bytes @@ -26,7 +26,7 @@ end defmodule Grpc.Testing.EchoStatus do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :code, 1, type: :int32 field :message, 2, type: :string @@ -35,7 +35,7 @@ end defmodule Grpc.Testing.SimpleRequest do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :response_type, 1, type: Grpc.Testing.PayloadType, json_name: "responseType", enum: true field :response_size, 2, type: :int32, json_name: "responseSize" @@ -50,7 +50,7 @@ end defmodule Grpc.Testing.SimpleResponse do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :payload, 1, type: Grpc.Testing.Payload field :username, 2, type: :string @@ -60,7 +60,7 @@ end defmodule Grpc.Testing.StreamingInputCallRequest do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :payload, 1, type: Grpc.Testing.Payload field :expect_compressed, 2, type: Grpc.Testing.BoolValue, json_name: "expectCompressed" @@ -69,7 +69,7 @@ end defmodule Grpc.Testing.StreamingInputCallResponse do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :aggregated_payload_size, 1, type: :int32, json_name: "aggregatedPayloadSize" end @@ -77,7 +77,7 @@ end defmodule Grpc.Testing.ResponseParameters do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :size, 1, type: :int32 field :interval_us, 2, type: :int32, json_name: "intervalUs" @@ -87,7 +87,7 @@ end defmodule Grpc.Testing.StreamingOutputCallRequest do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :response_type, 1, type: Grpc.Testing.PayloadType, json_name: "responseType", enum: true @@ -103,7 +103,7 @@ end defmodule Grpc.Testing.StreamingOutputCallResponse do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :payload, 1, type: Grpc.Testing.Payload end @@ -111,7 +111,7 @@ end defmodule Grpc.Testing.ReconnectParams do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :max_reconnect_backoff_ms, 1, type: :int32, json_name: "maxReconnectBackoffMs" end @@ -119,7 +119,7 @@ end defmodule Grpc.Testing.ReconnectInfo do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :passed, 1, type: :bool field :backoff_ms, 2, repeated: true, type: :int32, json_name: "backoffMs" diff --git a/benchmark/lib/grpc/testing/payloads.pb.ex b/benchmark/lib/grpc/testing/payloads.pb.ex index 6f0abbe6b..65a0abbc5 100644 --- a/benchmark/lib/grpc/testing/payloads.pb.ex +++ b/benchmark/lib/grpc/testing/payloads.pb.ex @@ -1,7 +1,7 @@ defmodule Grpc.Testing.ByteBufferParams do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :req_size, 1, type: :int32, json_name: "reqSize" field :resp_size, 2, type: :int32, json_name: "respSize" @@ -10,7 +10,7 @@ end defmodule Grpc.Testing.SimpleProtoParams do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :req_size, 1, type: :int32, json_name: "reqSize" field :resp_size, 2, type: :int32, json_name: "respSize" @@ -19,13 +19,13 @@ end defmodule Grpc.Testing.ComplexProtoParams do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 end defmodule Grpc.Testing.PayloadConfig do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 oneof :payload, 0 diff --git a/benchmark/lib/grpc/testing/stats.pb.ex b/benchmark/lib/grpc/testing/stats.pb.ex index 6eed7c447..9878eec0d 100644 --- a/benchmark/lib/grpc/testing/stats.pb.ex +++ b/benchmark/lib/grpc/testing/stats.pb.ex @@ -1,7 +1,7 @@ defmodule Grpc.Testing.ServerStats do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :time_elapsed, 1, type: :double, json_name: "timeElapsed" field :time_user, 2, type: :double, json_name: "timeUser" @@ -15,7 +15,7 @@ end defmodule Grpc.Testing.HistogramParams do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :resolution, 1, type: :double field :max_possible, 2, type: :double, json_name: "maxPossible" @@ -24,7 +24,7 @@ end defmodule Grpc.Testing.HistogramData do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :bucket, 1, repeated: true, type: :uint32 field :min_seen, 2, type: :double, json_name: "minSeen" @@ -37,7 +37,7 @@ end defmodule Grpc.Testing.RequestResultCount do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :status_code, 1, type: :int32, json_name: "statusCode" field :count, 2, type: :int64 @@ -46,7 +46,7 @@ end defmodule Grpc.Testing.ClientStats do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :latencies, 1, type: Grpc.Testing.HistogramData field :time_elapsed, 2, type: :double, json_name: "timeElapsed" diff --git a/benchmark/lib/grpc/testing/worker_service.pb.ex b/benchmark/lib/grpc/testing/worker_service.pb.ex index 2901521f6..8b1378917 100644 --- a/benchmark/lib/grpc/testing/worker_service.pb.ex +++ b/benchmark/lib/grpc/testing/worker_service.pb.ex @@ -1,19 +1 @@ -defmodule Grpc.Testing.WorkerService.Service do - @moduledoc false - use GRPC.Service, name: "grpc.testing.WorkerService", protoc_gen_elixir_version: "0.14.0" - - rpc :RunServer, stream(Grpc.Testing.ServerArgs), stream(Grpc.Testing.ServerStatus) - - rpc :RunClient, stream(Grpc.Testing.ClientArgs), stream(Grpc.Testing.ClientStatus) - - rpc :CoreCount, Grpc.Testing.CoreRequest, Grpc.Testing.CoreResponse - - rpc :QuitWorker, Grpc.Testing.Void, Grpc.Testing.Void -end - -defmodule Grpc.Testing.WorkerService.Stub do - @moduledoc false - - use GRPC.Stub, service: Grpc.Testing.WorkerService.Service -end diff --git a/benchmark/mix.lock b/benchmark/mix.lock index f251ba2f8..eeb146048 100644 --- a/benchmark/mix.lock +++ b/benchmark/mix.lock @@ -1,11 +1,8 @@ %{ "cowboy": {:hex, :cowboy, "2.12.0", "f276d521a1ff88b2b9b4c54d0e753da6c66dd7be6c9fca3d9418b561828a3731", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "8a7abe6d183372ceb21caa2709bec928ab2b72e18a3911aa1771639bef82651e"}, "cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"}, - "flow": {:hex, :flow, "1.2.4", "1dd58918287eb286656008777cb32714b5123d3855956f29aa141ebae456922d", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm", "874adde96368e71870f3510b91e35bc31652291858c86c0e75359cbdd35eb211"}, - "gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"}, "gun": {:hex, :gun, "2.1.0", "b4e4cbbf3026d21981c447e9e7ca856766046eff693720ba43114d7f5de36e87", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "52fc7fc246bfc3b00e01aea1c2854c70a366348574ab50c57dfe796d24a0101d"}, "hpax": {:hex, :hpax, "1.0.2", "762df951b0c399ff67cc57c3995ec3cf46d696e41f0bba17da0518d94acd4aac", [:mix], [], "hexpm", "2f09b4c1074e0abd846747329eaa26d535be0eb3d189fa69d812bfb8bfefd32f"}, - "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, "protobuf": {:hex, :protobuf, "0.14.1", "9ac0582170df27669ccb2ef6cb0a3d55020d58896edbba330f20d0748881530a", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "39a9d49d346e3ed597e5ae3168a43d9603870fc159419617f584cdf6071f0e25"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, diff --git a/config/test.exs b/config/test.exs index 20718e5b4..477c90797 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,5 +1,3 @@ import Config config :logger, level: :info - -config :grpc, :dns_adapter, GRPC.Client.Resolver.DNS.MockAdapter diff --git a/examples/route_guide/lib/route_guide.pb.ex b/examples/route_guide/lib/route_guide.pb.ex index 17cdb490f..f54465a45 100644 --- a/examples/route_guide/lib/route_guide.pb.ex +++ b/examples/route_guide/lib/route_guide.pb.ex @@ -1,7 +1,7 @@ defmodule Routeguide.Point do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :latitude, 1, type: :int32 field :longitude, 2, type: :int32 @@ -10,7 +10,7 @@ end defmodule Routeguide.Rectangle do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :lo, 1, type: Routeguide.Point field :hi, 2, type: Routeguide.Point @@ -19,7 +19,7 @@ end defmodule Routeguide.Feature do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :name, 1, type: :string field :location, 2, type: Routeguide.Point @@ -28,7 +28,7 @@ end defmodule Routeguide.RouteNote do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :location, 1, type: Routeguide.Point field :message, 2, type: :string @@ -37,30 +37,10 @@ end defmodule Routeguide.RouteSummary do @moduledoc false - use Protobuf, protoc_gen_elixir_version: "0.14.0", syntax: :proto3 + use Protobuf, protoc_gen_elixir_version: "0.14.1", syntax: :proto3 field :point_count, 1, type: :int32, json_name: "pointCount" field :feature_count, 2, type: :int32, json_name: "featureCount" field :distance, 3, type: :int32 field :elapsed_time, 4, type: :int32, json_name: "elapsedTime" end - -defmodule Routeguide.RouteGuide.Service do - @moduledoc false - - use GRPC.Service, name: "routeguide.RouteGuide", protoc_gen_elixir_version: "0.14.0" - - rpc :GetFeature, Routeguide.Point, Routeguide.Feature - - rpc :ListFeatures, Routeguide.Rectangle, stream(Routeguide.Feature) - - rpc :RecordRoute, stream(Routeguide.Point), Routeguide.RouteSummary - - rpc :RouteChat, stream(Routeguide.RouteNote), stream(Routeguide.RouteNote) -end - -defmodule Routeguide.RouteGuide.Stub do - @moduledoc false - - use GRPC.Stub, service: Routeguide.RouteGuide.Service -end diff --git a/examples/route_guide/lib/server.ex b/examples/route_guide/lib/server.ex index 2e2bbfa75..3e90b8bc8 100644 --- a/examples/route_guide/lib/server.ex +++ b/examples/route_guide/lib/server.ex @@ -1,20 +1,5 @@ -defmodule CustomErrorInterceptor do - @behaviour GRPC.Server.Interceptor - - def init(opts), do: opts - - def call(req, stream, next, opts) do - try do - next.(req, stream) - rescue - CustomError -> - Server.send_reply(stream, ...) - end - end -end - defmodule Routeguide.RouteGuide.Server do - use GRPC.Server, service: Routeguide.RouteGuide.Service, interceptors: [CustomErrorInterceptor] + use GRPC.Server, service: Routeguide.RouteGuide.Service alias GRPC.Server alias RouteGuide.Data diff --git a/examples/route_guide/mix.lock b/examples/route_guide/mix.lock index f251ba2f8..78e1d6e3d 100644 --- a/examples/route_guide/mix.lock +++ b/examples/route_guide/mix.lock @@ -1,8 +1,6 @@ %{ "cowboy": {:hex, :cowboy, "2.12.0", "f276d521a1ff88b2b9b4c54d0e753da6c66dd7be6c9fca3d9418b561828a3731", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "8a7abe6d183372ceb21caa2709bec928ab2b72e18a3911aa1771639bef82651e"}, "cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"}, - "flow": {:hex, :flow, "1.2.4", "1dd58918287eb286656008777cb32714b5123d3855956f29aa141ebae456922d", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm", "874adde96368e71870f3510b91e35bc31652291858c86c0e75359cbdd35eb211"}, - "gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"}, "gun": {:hex, :gun, "2.1.0", "b4e4cbbf3026d21981c447e9e7ca856766046eff693720ba43114d7f5de36e87", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "52fc7fc246bfc3b00e01aea1c2854c70a366348574ab50c57dfe796d24a0101d"}, "hpax": {:hex, :hpax, "1.0.2", "762df951b0c399ff67cc57c3995ec3cf46d696e41f0bba17da0518d94acd4aac", [:mix], [], "hexpm", "2f09b4c1074e0abd846747329eaa26d535be0eb3d189fa69d812bfb8bfefd32f"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, diff --git a/interop/lib/interop/client.ex b/interop/lib/interop/client.ex index 2f98ffe3d..760137366 100644 --- a/interop/lib/interop/client.ex +++ b/interop/lib/interop/client.ex @@ -8,14 +8,14 @@ defmodule Interop.Client do # we suggest you to check the documentation for `GRPC.Stub.recv/2` # there is some unusual behavior that can be observed. - def connect(host, opts \\ []) do - {:ok, ch} = GRPC.Stub.connect(host, opts) + def connect(host, port, opts \\ []) do + {:ok, ch} = GRPC.Stub.connect(host, port, opts) ch end def empty_unary!(ch) do Logger.info("Run empty_unary!") - empty = %Grpc.Testing.Empty{} + empty = Grpc.Testing.Empty.new() {:ok, ^empty} = Grpc.Testing.TestService.Stub.empty_call(ch, empty) end @@ -25,15 +25,15 @@ defmodule Interop.Client do def large_unary!(ch) do Logger.info("Run large_unary!") - req = %Grpc.Testing.SimpleRequest{response_size: 314_159, payload: payload(271_828)} - reply = %Grpc.Testing.SimpleResponse{payload: payload(314_159)} + req = Grpc.Testing.SimpleRequest.new(response_size: 314_159, payload: payload(271_828)) + reply = Grpc.Testing.SimpleResponse.new(payload: payload(314_159)) {:ok, ^reply} = Grpc.Testing.TestService.Stub.unary_call(ch, req) end def large_unary2!(ch) do Logger.info("Run large_unary2!") - req = %Grpc.Testing.SimpleRequest{response_size: 1024*1024*8, payload: payload(1024*1024*8)} - reply = %Grpc.Testing.SimpleResponse{payload: payload(1024*1024*8)} + req = Grpc.Testing.SimpleRequest.new(response_size: 1024*1024*8, payload: payload(1024*1024*8)) + reply = Grpc.Testing.SimpleResponse.new(payload: payload(1024*1024*8)) {:ok, ^reply} = Grpc.Testing.TestService.Stub.unary_call(ch, req) end @@ -41,24 +41,24 @@ defmodule Interop.Client do Logger.info("Run client_compressed_unary!") # "Client calls UnaryCall with the feature probe, an uncompressed message" is not supported - req = %Grpc.Testing.SimpleRequest{expect_compressed: %{value: true}, response_size: 314_159, payload: payload(271_828)} - reply = %Grpc.Testing.SimpleResponse{payload: payload(314_159)} + req = Grpc.Testing.SimpleRequest.new(expect_compressed: %{value: true}, response_size: 314_159, payload: payload(271_828)) + reply = Grpc.Testing.SimpleResponse.new(payload: payload(314_159)) {:ok, ^reply} = Grpc.Testing.TestService.Stub.unary_call(ch, req, compressor: GRPC.Compressor.Gzip) - req = %Grpc.Testing.SimpleRequest{expect_compressed: %{value: false}, response_size: 314_159, payload: payload(271_828)} - reply = %Grpc.Testing.SimpleResponse{payload: payload(314_159)} + req = Grpc.Testing.SimpleRequest.new(expect_compressed: %{value: false}, response_size: 314_159, payload: payload(271_828)) + reply = Grpc.Testing.SimpleResponse.new(payload: payload(314_159)) {:ok, ^reply} = Grpc.Testing.TestService.Stub.unary_call(ch, req) end def server_compressed_unary!(ch) do Logger.info("Run server_compressed_unary!") - req = %Grpc.Testing.SimpleRequest{response_compressed: %{value: true}, response_size: 314_159, payload: payload(271_828)} - reply = %Grpc.Testing.SimpleResponse{payload: payload(314_159)} + req = Grpc.Testing.SimpleRequest.new(response_compressed: %{value: true}, response_size: 314_159, payload: payload(271_828)) + reply = Grpc.Testing.SimpleResponse.new(payload: payload(314_159)) {:ok, ^reply, %{headers: %{"grpc-encoding" => "gzip"}}} = Grpc.Testing.TestService.Stub.unary_call(ch, req, compressor: GRPC.Compressor.Gzip, return_headers: true) - req = %Grpc.Testing.SimpleRequest{response_compressed: %{value: false}, response_size: 314_159, payload: payload(271_828)} - reply = %Grpc.Testing.SimpleResponse{payload: payload(314_159)} + req = Grpc.Testing.SimpleRequest.new(response_compressed: %{value: false}, response_size: 314_159, payload: payload(271_828)) + reply = Grpc.Testing.SimpleResponse.new(payload: payload(314_159)) {:ok, ^reply, headers} = Grpc.Testing.TestService.Stub.unary_call(ch, req, return_headers: true) refute headers[:headers]["grpc-encoding"] end @@ -70,18 +70,18 @@ defmodule Interop.Client do ch |> Grpc.Testing.TestService.Stub.streaming_input_call() |> GRPC.Stub.send_request( - %Grpc.Testing.StreamingInputCallRequest{payload: payload(27182)} + Grpc.Testing.StreamingInputCallRequest.new(payload: payload(27182)) ) - |> GRPC.Stub.send_request(%Grpc.Testing.StreamingInputCallRequest{payload: payload(8)}) + |> GRPC.Stub.send_request(Grpc.Testing.StreamingInputCallRequest.new(payload: payload(8))) |> GRPC.Stub.send_request( - %Grpc.Testing.StreamingInputCallRequest{payload: payload(1828)} + Grpc.Testing.StreamingInputCallRequest.new(payload: payload(1828)) ) |> GRPC.Stub.send_request( - %Grpc.Testing.StreamingInputCallRequest{payload: payload(45904)}, + Grpc.Testing.StreamingInputCallRequest.new(payload: payload(45904)), end_stream: true ) - reply = %Grpc.Testing.StreamingInputCallResponse{aggregated_payload_size: 74922} + reply = Grpc.Testing.StreamingInputCallResponse.new(aggregated_payload_size: 74922) {:ok, ^reply} = GRPC.Stub.recv(stream) end @@ -93,20 +93,20 @@ defmodule Interop.Client do stream = ch |> Grpc.Testing.TestService.Stub.streaming_input_call(compressor: GRPC.Compressor.Gzip) - |> GRPC.Stub.send_request(%Grpc.Testing.StreamingInputCallRequest{payload: payload(27182), expect_compressed: %{value: true}}) + |> GRPC.Stub.send_request(Grpc.Testing.StreamingInputCallRequest.new(payload: payload(27182), expect_compressed: %{value: true})) |> GRPC.Stub.send_request( - %Grpc.Testing.StreamingInputCallRequest{payload: payload(45904), expect_compressed: %{value: false}}, + Grpc.Testing.StreamingInputCallRequest.new(payload: payload(45904), expect_compressed: %{value: false}), end_stream: true, compress: false ) - reply = %Grpc.Testing.StreamingInputCallResponse{aggregated_payload_size: 73086} + reply = Grpc.Testing.StreamingInputCallResponse.new(aggregated_payload_size: 73086) {:ok, ^reply} = GRPC.Stub.recv(stream) end def server_streaming!(ch) do Logger.info("Run server_streaming!") params = Enum.map([31415, 9, 2653, 58979], &res_param(&1)) - req = %Grpc.Testing.StreamingOutputCallRequest{response_parameters: params} + req = Grpc.Testing.StreamingOutputCallRequest.new(response_parameters: params) {:ok, res_enum} = ch |> Grpc.Testing.TestService.Stub.streaming_output_call(req) result = Enum.map([9, 2653, 31415, 58979], &String.duplicate(<<0>>, &1)) @@ -115,12 +115,12 @@ defmodule Interop.Client do def server_compressed_streaming!(ch) do Logger.info("Run server_compressed_streaming!") - req = %Grpc.Testing.StreamingOutputCallRequest{response_parameters: [ + req = Grpc.Testing.StreamingOutputCallRequest.new(response_parameters: [ %{compressed: %{value: true}, size: 31415}, %{compressed: %{value: false}, size: 92653} - ]} + ]) {:ok, res_enum} = ch |> Grpc.Testing.TestService.Stub.streaming_output_call(req) result = Enum.map([31415, 92653], &String.duplicate(<<0>>, &1)) @@ -132,10 +132,10 @@ defmodule Interop.Client do stream = Grpc.Testing.TestService.Stub.full_duplex_call(ch) req = fn size1, size2 -> - %Grpc.Testing.StreamingOutputCallRequest{ + Grpc.Testing.StreamingOutputCallRequest.new( response_parameters: [res_param(size1)], payload: payload(size2) - } + ) end GRPC.Stub.send_request(stream, req.(31415, 27182)) @@ -169,8 +169,8 @@ defmodule Interop.Client do def custom_metadata!(ch) do Logger.info("Run custom_metadata!") # UnaryCall - req = %Grpc.Testing.SimpleRequest{response_size: 314_159, payload: payload(271_828)} - reply = %Grpc.Testing.SimpleResponse{payload: payload(314_159)} + req = Grpc.Testing.SimpleRequest.new(response_size: 314_159, payload: payload(271_828)) + reply = Grpc.Testing.SimpleResponse.new(payload: payload(314_159)) headers = %{"x-grpc-test-echo-initial" => "test_initial_metadata_value"} # 11250603 trailers = %{"x-grpc-test-echo-trailing-bin" => 0xABABAB} @@ -183,10 +183,10 @@ defmodule Interop.Client do # FullDuplexCall req = - %Grpc.Testing.StreamingOutputCallRequest{ + Grpc.Testing.StreamingOutputCallRequest.new( response_parameters: [res_param(314_159)], payload: payload(271_828) - } + ) {headers, data, trailers} = ch @@ -221,15 +221,15 @@ defmodule Interop.Client do code = 2 msg = "test status message" - status = %Grpc.Testing.EchoStatus{code: code, message: msg} + status = Grpc.Testing.EchoStatus.new(code: code, message: msg) error = GRPC.RPCError.exception(code, msg) # UnaryCall - req = %Grpc.Testing.SimpleRequest{response_status: status} + req = Grpc.Testing.SimpleRequest.new(response_status: status) {:error, ^error} = Grpc.Testing.TestService.Stub.unary_call(ch, req) # FullDuplexCall - req = %Grpc.Testing.StreamingOutputCallRequest{response_status: status} + req = Grpc.Testing.StreamingOutputCallRequest.new(response_status: status) {:error, ^error} = ch @@ -244,7 +244,7 @@ defmodule Interop.Client do def unimplemented_service!(ch) do Logger.info("Run unimplemented_service!") - req = %Grpc.Testing.Empty{} + req = Grpc.Testing.Empty.new() {:error, %GRPC.RPCError{status: 12}} = Grpc.Testing.TestService.Stub.unimplemented_call(ch, req) @@ -262,10 +262,10 @@ defmodule Interop.Client do Logger.info("Run cancel_after_first_response!") req = - %Grpc.Testing.StreamingOutputCallRequest{ + Grpc.Testing.StreamingOutputCallRequest.new( response_parameters: [res_param(31415)], payload: payload(27182) - } + ) stream = Grpc.Testing.TestService.Stub.full_duplex_call(ch) @@ -283,10 +283,10 @@ defmodule Interop.Client do Logger.info("Run timeout_on_sleeping_server!") req = - %Grpc.Testing.StreamingOutputCallRequest{ + Grpc.Testing.StreamingOutputCallRequest.new( payload: payload(27182), response_parameters: [res_param(31415)] - } + ) stream = Grpc.Testing.TestService.Stub.full_duplex_call(ch, timeout: 1) resp = stream |> GRPC.Stub.send_request(req) |> GRPC.Stub.recv() @@ -312,10 +312,10 @@ defmodule Interop.Client do end defp res_param(size) do - %Grpc.Testing.ResponseParameters{size: size} + Grpc.Testing.ResponseParameters.new(size: size) end defp payload(n) do - %Grpc.Testing.Payload{body: String.duplicate(<<0>>, n)} + Grpc.Testing.Payload.new(body: String.duplicate(<<0>>, n)) end end diff --git a/interop/lib/interop/server.ex b/interop/lib/interop/server.ex index 4d16f3733..6d95e527d 100644 --- a/interop/lib/interop/server.ex +++ b/interop/lib/interop/server.ex @@ -5,7 +5,7 @@ defmodule Interop.Server do import ExUnit.Assertions, only: [assert: 1, refute: 1] def empty_call(_, _stream) do - %Grpc.Testing.Empty{} + Grpc.Testing.Empty.new() end def unary_call(req, stream) do @@ -31,21 +31,21 @@ defmodule Interop.Server do raise GRPC.RPCError, status: status.code, message: status.message end - payload = %Grpc.Testing.Payload{body: String.duplicate(<<0>>, req.response_size)} - %Grpc.Testing.SimpleResponse{payload: payload} + payload = Grpc.Testing.Payload.new(body: String.duplicate(<<0>>, req.response_size)) + Grpc.Testing.SimpleResponse.new(payload: payload) end def streaming_input_call(req_enum, _stream) do size = Enum.reduce(req_enum, 0, fn req, acc -> acc + byte_size(req.payload.body) end) - %Grpc.Testing.StreamingInputCallResponse{aggregated_payload_size: size} + Grpc.Testing.StreamingInputCallResponse.new(aggregated_payload_size: size) end def streaming_output_call(req, stream) do GRPC.Server.set_compressor(stream, GRPC.Compressor.Gzip) Enum.map(req.response_parameters, fn params -> - resp = %Grpc.Testing.StreamingOutputCallResponse{payload: %{body: String.duplicate(<<0>>, params.size)}} + resp = Grpc.Testing.StreamingOutputCallResponse.new(payload: %{body: String.duplicate(<<0>>, params.size)}) opts = if params.compressed == false do [compress: false] else @@ -73,8 +73,8 @@ defmodule Interop.Server do if resp_param do size = resp_param.size - payload = %Grpc.Testing.Payload{body: String.duplicate(<<0>>, size)} - res = %Grpc.Testing.StreamingOutputCallResponse{payload: payload} + payload = Grpc.Testing.Payload.new(body: String.duplicate(<<0>>, size)) + res = Grpc.Testing.StreamingOutputCallResponse.new(payload: payload) GRPC.Server.send_reply(stream, res) end end) diff --git a/interop/mix.lock b/interop/mix.lock index 714f93f06..7e2e46ed5 100644 --- a/interop/mix.lock +++ b/interop/mix.lock @@ -2,13 +2,10 @@ "cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"}, "cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"}, "extrace": {:hex, :extrace, "0.5.0", "4ee5419fbc3820c4592daebe0f8527001aa623578d9a725d8ae521315fce0277", [:mix], [{:recon, "~> 2.5", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "2a3ab7fa0701949efee1034293fa0b0e65926ffe256ccd6d0e10dd8a9406cd02"}, - "flow": {:hex, :flow, "1.2.4", "1dd58918287eb286656008777cb32714b5123d3855956f29aa141ebae456922d", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm", "874adde96368e71870f3510b91e35bc31652291858c86c0e75359cbdd35eb211"}, - "gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"}, "grpc": {:git, "https://github.com/elixir-grpc/grpc.git", "21422839798e49bf6d29327fab0a7add51becedd", []}, "grpc_statsd": {:hex, :grpc_statsd, "0.1.0", "a95ae388188486043f92a3c5091c143f5a646d6af80c9da5ee616546c4d8f5ff", [:mix], [{:grpc, ">= 0.0.0", [hex: :grpc, repo: "hexpm", optional: true]}, {:statix, ">= 0.0.0", [hex: :statix, repo: "hexpm", optional: true]}], "hexpm", "de0c05db313c7b3ffeff345855d173fd82fec3de16591a126b673f7f698d9e74"}, "gun": {:hex, :gun, "2.0.1", "160a9a5394800fcba41bc7e6d421295cf9a7894c2252c0678244948e3336ad73", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "a10bc8d6096b9502205022334f719cc9a08d9adcfbfc0dbee9ef31b56274a20b"}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, - "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"}, "protobuf": {:hex, :protobuf, "0.14.1", "9ac0582170df27669ccb2ef6cb0a3d55020d58896edbba330f20d0748881530a", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "39a9d49d346e3ed597e5ae3168a43d9603870fc159419617f584cdf6071f0e25"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, diff --git a/interop/script/run.exs b/interop/script/run.exs index 324c74e10..554d641b5 100644 --- a/interop/script/run.exs +++ b/interop/script/run.exs @@ -21,10 +21,9 @@ alias Interop.Client defmodule InteropTestRunner do def run(_cli, adapter, port, rounds) do opts = [interceptors: [GRPC.Client.Interceptors.Logger], adapter: adapter] - ch = Client.connect("127.0.0.1:#{port}", opts) + ch = Client.connect("127.0.0.1", port, opts) - for round <- 1..rounds do - + for _ <- 1..rounds do Client.empty_unary!(ch) Client.cacheable_unary!(ch) Client.large_unary!(ch) @@ -43,19 +42,11 @@ defmodule InteropTestRunner do Client.cancel_after_begin!(ch) Client.cancel_after_first_response!(ch) Client.timeout_on_sleeping_server!(ch) - - IO.inspect(round, label: "Round #{round} --------------------------------") end :ok end end -{:ok, _pid} = - DynamicSupervisor.start_link( - strategy: :one_for_one, - name: GRPC.Client.Supervisor - ) - for adapter <- [Gun, Mint] do Logger.info("Starting run for adapter: #{adapter}") args = [adapter, port, rounds] diff --git a/lib/grpc/channel.ex b/lib/grpc/channel.ex index 8350b863c..d7818b07b 100644 --- a/lib/grpc/channel.ex +++ b/lib/grpc/channel.ex @@ -21,7 +21,6 @@ defmodule GRPC.Channel do port: non_neg_integer(), scheme: String.t(), cred: GRPC.Credential.t(), - ref: reference() | nil, adapter: atom(), adapter_payload: any(), codec: module(), @@ -34,7 +33,6 @@ defmodule GRPC.Channel do port: nil, scheme: nil, cred: nil, - ref: nil, adapter: nil, adapter_payload: nil, codec: GRPC.Codec.Proto, diff --git a/lib/grpc/client/connection.ex b/lib/grpc/client/connection.ex deleted file mode 100644 index c3e091ce6..000000000 --- a/lib/grpc/client/connection.ex +++ /dev/null @@ -1,511 +0,0 @@ -defmodule GRPC.Client.Connection do - @moduledoc """ - Connection manager for gRPC client channels, with optional **load balancing** - and **name resolution** support. - - A `Conn` process manages one or more underlying gRPC connections - (`GRPC.Channel` structs) and exposes a **virtual channel** to be used by - client stubs. The orchestration process runs as a `GenServer` registered - globally (via `:global`), so only one orchestrator exists **per connection** - in a BEAM node. - - ## Overview - - * `connect/2` – establishes a client connection (single or multi-channel). - * `pick/2` – chooses a channel according to the active load-balancing policy. - * `disconnect/1` – gracefully closes a connection and frees resources. - - Under the hood: - - * The target string is resolved using a [Resolver](GRPC.Client.Resolver). - * Depending on the target and service config, a load-balancing module is chosen - (e.g. `PickFirst`, `RoundRobin`). - * The orchestrator periodically refreshes the LB decision to adapt to changes. - - ## Target syntax - - The `target` argument to `connect/2` accepts URI-like strings that are resolved - via the configured `Resolver` (default `GRPC.Client.Resolver`). - - Examples of supported formats: - - * `"dns://example.com:50051"` - * `"ipv4:10.0.0.5:50051"` - * `"unix:/tmp/my.sock"` - * `"xds:///my-service"` - * `"127.0.0.1:50051"` (implicit DNS / fallback to IPv4) - - See [`GRPC.Client.Resolver`](GRPC.Client.Resolver) for the full specification. - - ## Examples - - ### Basic connect and RPC - - iex> opts = [adapter: GRPC.Client.Adapters.Gun] - iex> {:ok, ch} = GRPC.Client.Connection.connect("127.0.0.1:50051", opts) - iex> req = %Grpc.Testing.SimpleRequest{response_size: 42} - iex> {:ok, resp} = Grpc.Testing.TestService.Stub.unary_call(ch, req) - iex> resp.response_size - 42 - - ### Using interceptors and custom adapter - - iex> opts = [interceptors: [GRPC.Client.Interceptors.Logger], - ...> adapter: GRPC.Client.Adapters.Mint] - iex> {:ok, ch} = GRPC.Client.Connection.connect("dns://my-service.local:50051", opts) - iex> {:ok, channel} = GRPC.Client.Connection.pick(ch) - iex> channel.host - "127.0.0.1" - - ### Unix socket target - - iex> {:ok, ch} = GRPC.Client.Connection.connect("unix:/tmp/service.sock") - iex> Grpc.Testing.TestService.Stub.empty_call(ch, %{}) - - ### Disconnect - - iex> {:ok, ch} = GRPC.Client.Connection.connect("127.0.0.1:50051") - iex> GRPC.Client.Connection.disconnect(ch) - {:ok, %GRPC.Channel{...}} - - ## Notes - - * The orchestrator refreshes the LB pick every 15 seconds. - """ - use GenServer - alias GRPC.Channel - - require Logger - - @insecure_scheme "http" - @secure_scheme "https" - @refresh_interval 15_000 - - @type t :: %__MODULE__{ - virtual_channel: Channel.t(), - real_channels: %{String.t() => Channel.t()}, - lb_mod: module() | nil, - lb_state: term() | nil, - resolver: module() | nil, - adapter: module() - } - - defstruct virtual_channel: nil, - real_channels: %{}, - lb_mod: nil, - lb_state: nil, - resolver: nil, - adapter: GRPC.Client.Adapters.Gun - - def child_spec(initial_state) do - %{ - id: {__MODULE__, initial_state.virtual_channel.ref}, - start: - {GenServer, :start_link, - [__MODULE__, initial_state, [name: via(initial_state.virtual_channel.ref)]]}, - restart: :transient, - type: :worker, - shutdown: 5000 - } - end - - @impl GenServer - def init(%__MODULE__{} = state) do - Process.flag(:trap_exit, true) - - # only now persist the chosen channel (which should already have adapter_payload - # because build_initial_state connected real channels and set virtual_channel) - :persistent_term.put( - {__MODULE__, :lb_state, state.virtual_channel.ref}, - state.virtual_channel - ) - - Process.send_after(self(), :refresh, @refresh_interval) - {:ok, state} - end - - @doc """ - Establishes a new client connection to a gRPC server or set of servers. - - The `target` string determines how the endpoints are resolved - (see [Resolver](GRPC.Client.Resolver)). - - Options: - - * `:adapter` – transport adapter module (default: `GRPC.Client.Adapters.Gun`) - * `:adapter_opts` – options passed to the adapter - * `:resolver` – resolver module (default: `GRPC.Client.Resolver`) - * `:lb_policy` – load-balancing policy (`:pick_first`, `:round_robin`) - * `:interceptors` – list of client interceptors - * `:codec` – request/response codec (default: `GRPC.Codec.Proto`) - * `:compressor` / `:accepted_compressors` – message compression - * `:headers` – default metadata headers - - Returns: - - * `{:ok, channel}` – a `GRPC.Channel` usable with stubs - * `{:error, reason}` – if connection fails - - ## Examples - - iex> {:ok, ch} = GRPC.Client.Connection.connect("127.0.0.1:50051") - iex> Grpc.Testing.TestService.Stub.empty_call(ch, %{}) - """ - @spec connect(String.t(), keyword()) :: {:ok, Channel.t()} | {:error, any()} - def connect(target, opts \\ []) do - ref = make_ref() - initial_state = build_initial_state(target, Keyword.merge(opts, ref: ref)) - ch = initial_state.virtual_channel - - case DynamicSupervisor.start_child(GRPC.Client.Supervisor, child_spec(initial_state)) do - {:ok, _pid} -> - {:ok, ch} - - {:error, {:already_started, _pid}} -> - # race: someone else started it first, ask the running process for its current channel - case pick_channel(opts) do - {:ok, %Channel{} = channel} -> - {:ok, channel} - - _ -> - {:error, :no_connection} - end - - {:error, reason} -> - {:error, reason} - end - end - - @doc """ - Disconnects a channel previously returned by `connect/2`. - - This will close all underlying real connections for the orchestrator - and stop its process. - - Returns `{:ok, channel}` on success. - - ## Example - - iex> {:ok, ch} = GRPC.Client.Connection.connect("127.0.0.1:50051") - iex> GRPC.Client.Connection.disconnect(ch) - {:ok, %GRPC.Channel{}} - """ - @spec disconnect(Channel.t()) :: {:ok, Channel.t()} | {:error, any()} - def disconnect(%Channel{ref: ref} = channel) do - GenServer.call(via(ref), {:disconnect, channel}) - end - - @doc """ - Picks a channel from the orchestrator according to the active - load-balancing policy. - - Normally, you don’t need to call `pick/2` directly – client stubs do this - automatically – but it can be useful when debugging or testing. - - Returns: - - * `{:ok, channel}` – the chosen `GRPC.Channel` - * `{:error, :no_connection}` – if the orchestrator is not available - - ## Example - - iex> {:ok, ch} = GRPC.Client.Connection.connect("dns://my-service.local:50051") - iex> GRPC.Client.Connection.pick(ch) - {:ok, %GRPC.Channel{host: "192.168.1.1", port: 50051}} - """ - @spec pick_channel(Channel.t(), keyword()) :: {:ok, Channel.t()} | {:error, term()} - def pick_channel(%Channel{ref: ref} = _channel, _opts \\ []) do - case :persistent_term.get({__MODULE__, :lb_state, ref}, nil) do - nil -> - {:error, :no_connection} - - %Channel{} = channel -> - {:ok, channel} - end - end - - @impl GenServer - def handle_call({:disconnect, %Channel{adapter: adapter} = channel}, _from, state) do - resp = {:ok, %Channel{channel | adapter_payload: %{conn_pid: nil}}} - - if Map.has_key?(state, :real_channels) do - Enum.map(state.real_channels, fn {_key, ch} -> - adapter.disconnect(ch) - end) - - keys_to_delete = [:real_channels, :virtual_channel] - new_state = Map.drop(state, keys_to_delete) - - {:reply, resp, new_state, {:continue, :stop}} - else - {:reply, resp, state, {:continue, :stop}} - end - end - - @impl GenServer - def handle_info( - :refresh, - %{lb_mod: lb_mod, lb_state: lb_state, real_channels: channels, virtual_channel: vc} = - state - ) do - Logger.debug("refreshing LB pick, caller=#{inspect(self())}") - - {:ok, {prefer_host, prefer_port}, new_lb_state} = lb_mod.pick(lb_state) - - channel_key = "#{inspect(prefer_host)}:#{prefer_port}" - - case Map.get(channels, channel_key) do - nil -> - Logger.warning("LB picked #{channel_key}, but no channel found in pool") - - Process.send_after(self(), :refresh, @refresh_interval) - {:noreply, %{state | lb_state: new_lb_state}} - - %Channel{} = picked_channel -> - :persistent_term.put({__MODULE__, :lb_state, vc.ref}, picked_channel) - - Process.send_after(self(), :refresh, @refresh_interval) - - {:noreply, %{state | lb_state: new_lb_state, virtual_channel: picked_channel}} - end - end - - def handle_info({:DOWN, _ref, :process, pid, reason}, state) do - Logger.warning( - "#{inspect(__MODULE__)} received :DOWN from #{inspect(pid)} with reason: #{inspect(reason)}" - ) - - {:noreply, state} - end - - def handle_info(msg, state) do - Logger.warning("#{inspect(__MODULE__)} received unexpected message: #{inspect(msg)}") - - {:noreply, state} - end - - @impl GenServer - def handle_continue(:stop, state) do - Logger.info("#{inspect(__MODULE__)} stopping as requested") - {:stop, :normal, state} - end - - @impl GenServer - def terminate(_reason, _state), do: :ok - - defp via(ref) do - {:global, {__MODULE__, ref}} - end - - defp build_initial_state(target, opts) do - opts = - Keyword.validate!(opts, - cred: nil, - ref: nil, - adapter: GRPC.Client.Adapters.Gun, - adapter_opts: [], - interceptors: [], - codec: GRPC.Codec.Proto, - compressor: nil, - accepted_compressors: [], - headers: [] - ) - - resolver = Keyword.get(opts, :resolver, GRPC.Client.Resolver) - adapter = Keyword.get(opts, :adapter, GRPC.Client.Adapters.Gun) - lb_policy_opt = Keyword.get(opts, :lb_policy) - - {norm_target, norm_opts, scheme} = normalize_target_and_opts(target, opts) - - cred = - case norm_opts[:cred] do - nil when scheme == @secure_scheme -> - default_ssl_option() - - %GRPC.Credential{} = c -> - c - - nil -> - nil - - other -> - other - end - - intialized_interceptors = init_interceptors(norm_opts[:interceptors]) - codec = norm_opts[:codec] - compressor = norm_opts[:compressor] - headers = norm_opts[:headers] - - accepted_compressors = - [compressor | norm_opts[:accepted_compressors]] - |> Enum.reject(&is_nil/1) - |> Enum.uniq() - - adapter_opts = opts[:adapter_opts] - - unless is_list(adapter_opts), - do: raise(ArgumentError, ":adapter_opts must be a keyword list if present") - - virtual_channel = %Channel{ - scheme: scheme, - cred: cred, - ref: opts[:ref], - adapter: adapter, - interceptors: intialized_interceptors, - codec: codec, - compressor: compressor, - accepted_compressors: accepted_compressors, - headers: headers - } - - base_state = %__MODULE__{ - virtual_channel: virtual_channel, - resolver: resolver, - adapter: adapter - } - - case resolver.resolve(norm_target) do - {:ok, %{addresses: addresses, service_config: config}} -> - lb_policy = - cond do - is_map(config) and Map.has_key?(config, :load_balancing_policy) -> - config.load_balancing_policy - - lb_policy_opt -> - lb_policy_opt - - true -> - nil - end - - lb_mod = choose_lb(lb_policy) - {:ok, lb_state} = lb_mod.init(addresses: addresses) - - {:ok, {prefer_host, prefer_port}, new_lb_state} = lb_mod.pick(lb_state) - - real_channels = - Enum.into(addresses, %{}, fn %{port: port, address: host} -> - case connect_real_channel( - %Channel{virtual_channel | host: host, port: port}, - host, - port, - norm_opts, - adapter - ) do - {:ok, ch} -> - {"#{inspect(host)}:#{port}", ch} - - {:error, :timeout} -> - {"#{host}:#{port}", %Channel{virtual_channel | host: host, port: port}} - - {:error, reason} -> - raise "Failed to connect to #{inspect(host)}:#{port} - #{inspect(reason)}" - end - end) - - virtual_channel = - Map.get(real_channels, "#{inspect(prefer_host)}:#{prefer_port}") - - %__MODULE__{ - base_state - | lb_mod: lb_mod, - lb_state: new_lb_state, - virtual_channel: virtual_channel, - real_channels: real_channels - } - - {:error, _reason} -> - {host, port} = split_host_port(norm_target) - {:ok, ch} = connect_real_channel(virtual_channel, host, port, norm_opts, adapter) - - %__MODULE__{ - base_state - | virtual_channel: ch, - real_channels: %{"#{host}:#{port}" => ch} - } - end - end - - defp normalize_target_and_opts(target, opts) do - uri = URI.parse(target) - - cond do - uri.scheme == @secure_scheme and uri.host -> - opts = Keyword.put_new_lazy(opts, :cred, &default_ssl_option/0) - {"ipv4:#{uri.host}:#{uri.port}", opts, @secure_scheme} - - uri.scheme == @insecure_scheme and uri.host -> - if opts[:cred], - do: raise(ArgumentError, "invalid option for insecure (http) address: :cred") - - {"ipv4:#{uri.host}:#{uri.port}", opts, @insecure_scheme} - - # Compatibility mode: host:port or unix:path - uri.scheme in [nil, ""] -> - scheme = if opts[:cred], do: @secure_scheme, else: @insecure_scheme - - case String.split(target, ":") do - [host, port] -> - {"ipv4:#{host}:#{port}", opts, scheme} - - [path] -> - {"unix://#{path}", opts, "unix"} - end - - # Anything else (dns://, unix://, etc.) handled by resolver - true -> - {target, opts, if(opts[:cred], do: @secure_scheme, else: @insecure_scheme)} - end - end - - defp choose_lb(:round_robin), do: GRPC.Client.LoadBalancing.RoundRobin - defp choose_lb(_), do: GRPC.Client.LoadBalancing.PickFirst - - defp connect_real_channel(%Channel{scheme: "unix"} = vc, path, port, opts, adapter) do - %Channel{vc | host: path, port: port} - |> adapter.connect(opts[:adapter_opts]) - end - - defp connect_real_channel(vc, host, port, opts, adapter) do - %Channel{vc | host: host, port: port} - |> adapter.connect(opts[:adapter_opts]) - end - - defp split_host_port(target) do - case String.split(target, ":", trim: true) do - [h, p] -> {h, String.to_integer(p)} - [h] -> {h, default_port()} - end - end - - defp init_interceptors(interceptors) do - Enum.map(interceptors, fn - {interceptor, opts} -> {interceptor, interceptor.init(opts)} - interceptor -> {interceptor, interceptor.init([])} - end) - end - - if {:module, CAStore} == Code.ensure_loaded(CAStore) do - defp default_ssl_option do - %GRPC.Credential{ - ssl: [ - verify: :verify_peer, - depth: 99, - cacert_file: CAStore.file_path() - ] - } - end - else - defp default_ssl_option do - raise """ - no GRPC credentials provided. Please either: - - - Pass the `:cred` option to `GRPC.Stub.connect/2,3` - - Add `:castore` to your list of dependencies in `mix.exs` - """ - end - end - - defp default_port, do: 50051 -end diff --git a/lib/grpc/client/load_balacing.ex b/lib/grpc/client/load_balacing.ex deleted file mode 100644 index fb3a58d1c..000000000 --- a/lib/grpc/client/load_balacing.ex +++ /dev/null @@ -1,12 +0,0 @@ -defmodule GRPC.Client.LoadBalancing do - @moduledoc """ - Load balancing behaviour for gRPC clients. - - This module defines the behaviour that load balancing strategies must implement. - """ - @callback init(opts :: keyword()) :: {:ok, state :: any()} | {:error, reason :: any()} - - @callback pick(state :: any()) :: - {:ok, {host :: String.t(), port :: non_neg_integer()}, new_state :: any()} - | {:error, reason :: any()} -end diff --git a/lib/grpc/client/load_balacing/pick_first.ex b/lib/grpc/client/load_balacing/pick_first.ex deleted file mode 100644 index 14e17ca03..000000000 --- a/lib/grpc/client/load_balacing/pick_first.ex +++ /dev/null @@ -1,16 +0,0 @@ -defmodule GRPC.Client.LoadBalancing.PickFirst do - @behaviour GRPC.Client.LoadBalancing - - @impl true - def init(opts) do - case Keyword.get(opts, :addresses, []) do - [] -> {:error, :no_addresses} - addresses -> {:ok, %{addresses: addresses, current: hd(addresses)}} - end - end - - @impl true - def pick(%{current: %{address: host, port: port}} = state) do - {:ok, {host, port}, state} - end -end diff --git a/lib/grpc/client/load_balacing/round_robin.ex b/lib/grpc/client/load_balacing/round_robin.ex deleted file mode 100644 index af47bf5bb..000000000 --- a/lib/grpc/client/load_balacing/round_robin.ex +++ /dev/null @@ -1,22 +0,0 @@ -defmodule GRPC.Client.LoadBalancing.RoundRobin do - @behaviour GRPC.Client.LoadBalancing - - @impl true - def init(opts) do - addresses = Keyword.get(opts, :addresses, []) - - if addresses == [] do - {:error, :no_addresses} - else - {:ok, %{addresses: addresses, index: 0, n: length(addresses)}} - end - end - - @impl true - def pick(%{addresses: addresses, index: idx, n: n} = state) do - %{address: host, port: port} = Enum.fetch!(addresses, idx) - - new_state = %{state | index: rem(idx + 1, n)} - {:ok, {host, port}, new_state} - end -end diff --git a/lib/grpc/client/resolver.ex b/lib/grpc/client/resolver.ex deleted file mode 100644 index de605c468..000000000 --- a/lib/grpc/client/resolver.ex +++ /dev/null @@ -1,139 +0,0 @@ -defmodule GRPC.Client.Resolver do - @moduledoc """ - Behaviour for gRPC client resolvers. - - A gRPC resolver is responsible for translating a **target string** into - a list of connection endpoints (addresses) and an optional `ServiceConfig`. - - gRPC supports multiple naming schemes, allowing clients to connect - to servers via DNS, fixed IPs, Unix domain sockets, or through - service discovery/control planes like xDS. - - ## Target Syntax - - The gRPC target string uses URI-like syntax: - - :/// or : - - ### Supported schemes - - * `dns://[authority/]host[:port]` – resolves via DNS, including: - * A/AAAA records for IP addresses - * Optional TXT record `_grpc_config.` containing JSON ServiceConfig - * `ipv4:addr[:port][,addr[:port],...]` – fixed list of IPv4 addresses - * `ipv6:[addr][:port][,[addr][:port],...]` – fixed list of IPv6 addresses - * `unix:/absolute_path` – Unix domain socket - * `unix-abstract:name` – abstract Unix socket (Linux only) - * `vsock:cid:port` – VSOCK endpoint (Linux only) - * `xds:///name` – resolve via xDS control plane (Envoy/Istio/Traffic Director) - - If no scheme is specified, `dns` is assumed. - - ### Default ports - - * `dns`, `ipv4`, `ipv6` β†’ 50051 - * `xds` β†’ 443 - - ## Resolver Output - - Returns: - - * `{:ok, %{addresses: list(map()), service_config: GRPC.Client.ServiceConfig.t() | nil}}` - - `addresses` – list of endpoint maps containing the keys: - - `:address` – host, IP, or socket path - - `:port` – TCP port (if applicable) - - may include additional scheme-specific fields, e.g., `:cid` for vsock - - `service_config` – optional `ServiceConfig` parsed from DNS TXT or xDS - - * `{:error, reason}` on failure - - ## Purpose - - The resolver abstracts the underlying naming and service discovery mechanisms, - allowing the gRPC client to obtain endpoints and service configuration consistently, - regardless of whether the target is DNS, static IPs, a socket, or xDS. - - ## Reference - - For the official gRPC naming and resolver specification, see: - - [gRPC Naming Documentation](https://github.com/grpc/grpc/blob/master/doc/naming.md) - """ - - alias GRPC.Client.Resolver.DNS - alias GRPC.Client.Resolver.IPv4 - alias GRPC.Client.Resolver.IPv6 - alias GRPC.Client.Resolver.Unix - alias GRPC.Client.Resolver.XDS - - @type service_config :: GRPC.Client.ServiceConfig.t() | nil - - @callback resolve(String.t()) :: - {:ok, %{addresses: list(map()), service_config: service_config()}} - | {:error, term()} - - @doc """ - Resolves a gRPC target string into a list of connection endpoints and an optional ServiceConfig. - - The `target` string can use one of the supported URI schemes: - - * `dns://[authority/]host[:port]` – resolves via DNS; looks up both A/AAAA records and optional `_grpc_config.` TXT record. - * `ipv4:addr[:port][,addr[:port],...]` – uses a fixed list of IPv4 addresses. - * `ipv6:[addr][:port][,[addr][:port],...]` – uses a fixed list of IPv6 addresses. - * `unix:/absolute_path` – connects via Unix domain socket. - * `unix-abstract:name` – connects via abstract Unix socket (Linux only). - * `vsock:cid:port` – connects via VSOCK (Linux only). - * `xds:///name` – resolves via xDS control plane (Envoy/Istio/Traffic Director). - - If no scheme is specified, `dns` is assumed. Default ports: - - * `dns`, `ipv4`, `ipv6` β†’ 50051 - * `xds` β†’ 443 - - Returns: - - * `{:ok, %{addresses: list(map()), service_config: GRPC.Client.ServiceConfig.t() | nil}}` on success - * `{:error, reason}` on failure - - Each `address` map includes at least: - - * `:address` – host, IP, or socket path - * `:port` – TCP port (if applicable) - * additional fields may be present depending on the scheme (e.g., `:socket`, `:cid` for vsock). - - This function abstracts the resolution mechanism, allowing the gRPC client to obtain endpoints and service configuration regardless of the underlying target type. - """ - @spec resolve(String.t()) :: - {:ok, %{addresses: list(map()), service_config: GRPC.Client.ServiceConfig.t()}} - | {:error, term()} - def resolve(target) do - uri = URI.parse(target) - scheme = uri.scheme || "dns" - - case scheme do - "dns" -> - DNS.resolve(target) - - "ipv4" -> - IPv4.resolve(target) - - "ipv6" -> - IPv6.resolve(target) - - "unix" -> - Unix.resolve(target) - - "xds" -> - XDS.resolve(target) - - "localhost" -> - IPv4.resolve("ipv4:#{target}") - - nil -> - IPv4.resolve("ipv4:#{target}") - - _ -> - {:error, {:unknown_scheme, scheme}} - end - end -end diff --git a/lib/grpc/client/resolver/dns.ex b/lib/grpc/client/resolver/dns.ex deleted file mode 100644 index be033bbe9..000000000 --- a/lib/grpc/client/resolver/dns.ex +++ /dev/null @@ -1,81 +0,0 @@ -defmodule GRPC.Client.Resolver.DNS do - @moduledoc """ - DNS Resolver for gRPC targets, supporting dynamic updates via a GenServer. - - Resolves `dns://host[:port]` targets. Fetches A/AAAA records and optional - `_grpc_config.` TXT records for ServiceConfig. - - This implementation maintains an internal cache of addresses and service config, - and refreshes them periodically. - """ - @behaviour GRPC.Client.Resolver - - alias GRPC.Client.ServiceConfig - - @impl GRPC.Client.Resolver - def resolve(target) do - uri = URI.parse(target) - host = uri.host || target - port = uri.port || 50051 - - with {:ok, addresses} <- lookup_addresses(host) do - addrs = - Enum.map(addresses, fn ip -> - %{address: :inet.ntoa(ip) |> to_string(), port: port} - end) - - case lookup_service_config(host) do - {:ok, txt_records} -> - service_config_json = extract_service_config(txt_records) - - {:ok, - %{ - addresses: addrs, - service_config: ServiceConfig.parse(service_config_json) - }} - - :no_config -> - {:ok, %{addresses: addrs, service_config: nil}} - - {:error, reason} -> - {:error, {:dns_error, reason}} - end - else - {:error, reason} -> {:error, {:dns_error, reason}} - end - end - - defp lookup_addresses(host) do - case adapter().lookup(host, :a) do - {:ok, addrs} when is_list(addrs) -> {:ok, addrs} - addrs when is_list(addrs) -> {:ok, addrs} - other -> other - end - end - - defp lookup_service_config(host) do - name = "_grpc_config." <> host - - case adapter().lookup(name, :txt) do - {:ok, txt_records} -> {:ok, txt_records} - {:error, reason} -> {:error, reason} - _ -> :no_config - end - end - - defp extract_service_config(txt_records) do - Enum.find_value(txt_records, fn txt -> - txt - |> List.to_string() - |> String.split("grpc_config=", parts: 2) - |> case do - [_, json] -> json - _ -> nil - end - end) - end - - defp adapter() do - Application.get_env(:grpc, :dns_adapter, GRPC.Client.Resolver.DNS.Adapter) - end -end diff --git a/lib/grpc/client/resolver/dns/adapter.ex b/lib/grpc/client/resolver/dns/adapter.ex deleted file mode 100644 index 01e5204c1..000000000 --- a/lib/grpc/client/resolver/dns/adapter.ex +++ /dev/null @@ -1,12 +0,0 @@ -defmodule GRPC.Client.Resolver.DNS.Adapter do - @moduledoc """ - Adapter to resolve DNS (A and TXT). - """ - - @callback lookup(String.t(), :a | :txt) :: - {:ok, [tuple() | String.t()]} | {:error, term()} - - def lookup(name, type) do - :inet_res.lookup(String.to_charlist(name), :in, type) - end -end diff --git a/lib/grpc/client/resolver/ipv4.ex b/lib/grpc/client/resolver/ipv4.ex deleted file mode 100644 index 2aadbf7a8..000000000 --- a/lib/grpc/client/resolver/ipv4.ex +++ /dev/null @@ -1,55 +0,0 @@ -defmodule GRPC.Client.Resolver.IPv4 do - @moduledoc """ - Resolver for gRPC clients connecting to one or more IPv4 addresses. - - This resolver handles target strings using the `ipv4` URI scheme, which - allows specifying one or multiple IPv4 addresses with explicit ports. - - ## Target format - - ipv4:addr:port[,addr:port,...] - - - IPv4 addresses must include a port. - - Multiple addresses can be comma-separated. - - `service_config` is always `nil` as literal IPv4 addresses do not support DNS TXT or xDS. - - ## Examples - - # Single IPv4 - target = "ipv4:10.0.0.1:50051" - {:ok, %{addresses: addresses, service_config: nil}} = - GRPC.Client.Resolver.IPv4.resolve(target) - addresses - # => [%{address: "10.0.0.1", port: 50051}] - - # Multiple IPv4 addresses - target = "ipv4:10.0.0.1:50051,10.0.0.2:50052" - {:ok, %{addresses: addresses, service_config: nil}} = - GRPC.Client.Resolver.IPv4.resolve(target) - addresses - # => [ - # %{address: "10.0.0.1", port: 50051}, - # %{address: "10.0.0.2", port: 50052} - # ] - - See the gRPC naming documentation for more information: - https://github.com/grpc/grpc/blob/master/doc/naming.md - """ - - @behaviour GRPC.Client.Resolver - - @impl GRPC.Client.Resolver - def resolve(target) do - uri = URI.parse(target) - addrs_str = uri.path - - addresses = - String.split(addrs_str, ",", trim: true) - |> Enum.map(fn addr -> - [ip, port] = String.split(addr, ":", trim: true, parts: 2) - %{address: ip, port: String.to_integer(port)} - end) - - {:ok, %{addresses: addresses, service_config: nil}} - end -end diff --git a/lib/grpc/client/resolver/ipv6.ex b/lib/grpc/client/resolver/ipv6.ex deleted file mode 100644 index 14a5d7713..000000000 --- a/lib/grpc/client/resolver/ipv6.ex +++ /dev/null @@ -1,78 +0,0 @@ -defmodule GRPC.Client.Resolver.IPv6 do - @moduledoc """ - Resolver for gRPC clients connecting to one or more IPv6 addresses. - - This resolver handles target strings using the `ipv6` URI scheme, which - allows specifying one or multiple IPv6 addresses with optional ports. - - ## Target format - - ipv6:[addr][:port][,[addr][:port],...] - - - IPv6 addresses **must** be enclosed in square brackets (`[...]`). - - The port is optional; if not provided, the default port is `443`. - - Multiple addresses can be comma-separated. - - `service_config` is always `nil` as IPv6 literals do not support DNS TXT or xDS. - """ - - @behaviour GRPC.Client.Resolver - - @default_port 443 - - @impl GRPC.Client.Resolver - def resolve(target) do - uri = URI.parse(target) - addresses_str = uri.path || "" - - with {:ok, addresses} <- parse_entries(addresses_str) do - {:ok, %{addresses: addresses, service_config: nil}} - end - end - - ## Helpers - - defp parse_entries(entries_str) do - entries = - String.split(entries_str, ",", trim: true) - |> Enum.map(&parse_entry/1) - - case Enum.find(entries, &match?({:error, _}, &1)) do - {:error, reason} -> {:error, reason} - _ -> {:ok, entries} - end - end - - defp parse_entry("[" <> rest) do - case String.split(rest, "]", parts: 2) do - [addr, port_str] -> - case :inet.parse_address(String.to_charlist(addr)) do - {:ok, _tuple} -> - port = - port_str - |> String.trim_leading(":") - |> case do - "" -> - @default_port - - s -> - case Integer.parse(s) do - {int, ""} -> int - _ -> return_error(:invalid_port) - end - end - - %{address: addr, port: port} - - _ -> - return_error(:invalid_ipv6) - end - - _ -> - return_error(:invalid_format) - end - end - - defp parse_entry(_), do: return_error(:invalid_format) - - defp return_error(reason), do: {:error, reason} -end diff --git a/lib/grpc/client/resolver/unix.ex b/lib/grpc/client/resolver/unix.ex deleted file mode 100644 index 358e67a97..000000000 --- a/lib/grpc/client/resolver/unix.ex +++ /dev/null @@ -1,45 +0,0 @@ -defmodule GRPC.Client.Resolver.Unix do - @moduledoc """ - Resolver for gRPC clients connecting via Unix Domain Sockets (UDS). - - This resolver handles target strings using the `unix` URI scheme, which - allows a gRPC client to connect to a server via a Unix socket path. Unix - domain sockets are supported on Unix systems only. - - ## Target format - - unix:///absolute/path/to/socket - - - The scheme **must** be `unix`. - - The path must be absolute (`/var/run/my.sock`). - - The port is not used in Unix sockets; `:port` will be `nil`. - - The socket type is indicated via `:socket => :unix`. - - ## Example - - target = "unix:///var/run/my_grpc.sock" - - {:ok, %{addresses: addresses, service_config: nil}} = - GRPC.Client.Resolver.Unix.resolve(target) - - addresses - # => [%{address: "/var/run/my_grpc.sock", port: nil, socket: :unix}] - - This resolver always returns `nil` for the service config, as Unix - sockets do not provide DNS TXT records or xDS configuration. - - See the gRPC naming documentation for more information on URI-based - resolution: https://github.com/grpc/grpc/blob/master/doc/naming.md - """ - - @behaviour GRPC.Client.Resolver - - @impl GRPC.Client.Resolver - def resolve(target) do - # E.g.: "unix:///var/run/my.sock" - uri = URI.parse(target) - path = uri.path - - {:ok, %{addresses: [%{address: {:local, path}, port: 0, socket: :unix}], service_config: nil}} - end -end diff --git a/lib/grpc/client/resolver/xds.ex b/lib/grpc/client/resolver/xds.ex deleted file mode 100644 index b881b91ef..000000000 --- a/lib/grpc/client/resolver/xds.ex +++ /dev/null @@ -1,9 +0,0 @@ -defmodule GRPC.Client.Resolver.XDS do - @behaviour GRPC.Client.Resolver - - @impl GRPC.Client.Resolver - def resolve(_target) do - # E.g.: "xds:///myservice" - {:error, :not_implemented} - end -end diff --git a/lib/grpc/client/service_config.ex b/lib/grpc/client/service_config.ex deleted file mode 100644 index d81301af8..000000000 --- a/lib/grpc/client/service_config.ex +++ /dev/null @@ -1,103 +0,0 @@ -defmodule GRPC.Client.ServiceConfig do - @moduledoc """ - Represents the gRPC `ServiceConfig` parsed from JSON, which can come from DNS TXT records or xDS. - - The gRPC `ServiceConfig` allows a client to configure per-service and per-method - behaviors such as load balancing, timeouts, and retry policies. - - ## Spec - - According to the gRPC specification ([service_config.md](https://github.com/grpc/grpc/blob/master/doc/service_config.md)): - - - **loadBalancingConfig**: a list of load balancing policies. - The client should pick the first policy it supports. Common values are: - - `"pick_first"`: always pick the first server. - - `"round_robin"`: distribute calls across servers in round-robin. - - - **methodConfig**: a list of configurations applied to specific methods or services. - Each entry can include: - - `"name"`: a list of `{ "service": "", "method": "" }` - or `{ "service": "" }` to match all methods in the service. - - `"timeout"`: RPC timeout as a string (e.g., `"1.000000001s"`). - - `"retryPolicy"`: optional retry policy map. - - Other optional method-level settings. - - ## Example TXT record - - A DNS TXT record for a service `my-service.local` might look like this: - - _grpc_config.my-service.local 3600 TXT - "grpc_config={ - \"loadBalancingConfig\":[{\"round_robin\":{}}], - \"methodConfig\":[ - { - \"name\":[ - {\"service\":\"foo\",\"method\":\"bar\"}, - {\"service\":\"baz\"} - ], - \"timeout\":\"1.000000001s\" - } - ] - }" - - This JSON will be parsed into a `%GRPC.Client.ServiceConfig{}` struct with: - - %GRPC.Client.ServiceConfig{ - load_balancing_policy: :round_robin, - method_configs: [ - %{ - "name" => [ - %{"service" => "foo", "method" => "bar"}, - %{"service" => "baz"} - ], - "timeout" => "1.000000001s" - } - ], - raw: - } - - ## Usage - - ```elixir - {:ok, config} = GRPC.Client.ServiceConfig.parse(txt_json) - IO.inspect(config.load_balancing_policy) - IO.inspect(config.method_configs) - ``` - """ - defstruct load_balancing_policy: :pick_first, - method_configs: [], - raw: %{} - - @type t :: %__MODULE__{ - load_balancing_policy: atom(), - method_configs: list(), - raw: map() - } - - def parse(nil), do: {:ok, %__MODULE__{}} - - def parse(json) when is_binary(json) do - case Jason.decode(json) do - {:ok, map} -> from_map(map) - error -> error - end - end - - defp from_map(map) do - lb_policy = - map - |> Map.get("loadBalancingConfig", [%{"pick_first" => %{}}]) - |> List.first() - |> Map.keys() - |> case do - [key] -> String.to_existing_atom(key) - _ -> :pick_first - end - - %__MODULE__{ - load_balancing_policy: lb_policy, - method_configs: Map.get(map, "methodConfig", []), - raw: map - } - end -end diff --git a/lib/grpc/client/supervisor.ex b/lib/grpc/client/supervisor.ex deleted file mode 100644 index 07832e56d..000000000 --- a/lib/grpc/client/supervisor.ex +++ /dev/null @@ -1,54 +0,0 @@ -defmodule GRPC.Client.Supervisor do - @moduledoc """ - A DynamicSupervisor responsible for managing gRPC client connections (`GRPC.Client.Connection`). - - This supervisor allows you to dynamically start and stop gRPC client connections at runtime. - Each connection is run as a separate `GenServer` under this supervisor, which ensures proper - supervision and isolation between connections. - - ## Starting the Supervisor - - Typically, you start this supervisor as part of your application's supervision tree: - - children = [ - {GRPC.Client.Supervisor, []} - ] - - opts = [strategy: :one_for_one, name: MyApp.Supervisor] - Supervisor.start_link(children, opts) - - You can also start it manually in scripts or test environments: - - {:ok, _pid} = DynamicSupervisor.start_link(strategy: :one_for_one, name: GRPC.Client.Supervisor) - - ## Supervision Strategy - - This supervisor uses `:one_for_one` strategy: - - * If a connection process crashes, only that process is restarted. - * Other running connections remain unaffected. - - ## Establishing a gRPC Connection - - To create a new gRPC connection, you typically use the `GRPC.Stub.connect/1` function, - which internally starts a `GRPC.Client.Connection` process under this supervisor. For example: - - iex> {:ok, ch} = GRPC.Stub.connect("127.0.0.1:50051") - iex> Grpc.Testing.TestService.Stub.empty_call(ch, %{}) - - ## Notes - - * You can dynamically start multiple connections under the supervisor for different targets. - * Each connection runs in isolation as its own GenServer. - """ - use DynamicSupervisor - - def start_link(opts) do - DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__) - end - - @impl true - def init(_opts) do - DynamicSupervisor.init(strategy: :one_for_one) - end -end diff --git a/lib/grpc/rpc_error.ex b/lib/grpc/rpc_error.ex index aa85ebb82..f0766e7ab 100644 --- a/lib/grpc/rpc_error.ex +++ b/lib/grpc/rpc_error.ex @@ -58,7 +58,7 @@ defmodule GRPC.RPCError do @type t :: %__MODULE__{ status: GRPC.Status.t(), message: String.t(), - details: [%Google.Protobuf.Any{}] + details: [Google.Protobuf.Any.t()] } alias GRPC.Status diff --git a/lib/grpc/server/router.ex b/lib/grpc/server/router.ex index 30e0d22c7..38b003118 100644 --- a/lib/grpc/server/router.ex +++ b/lib/grpc/server/router.ex @@ -4,9 +4,7 @@ defmodule GRPC.Server.Router do alias __MODULE__.Template @type http_method :: :get | :put | :post | :patch | :delete - @type segment_match :: String.t() | {atom(), [segment_match]} - @type matchers :: [segment_match] - @type route :: {http_method(), String.t(), matchers()} + @type route :: {http_method(), String.t(), Template.matchers()} @wildcards [:_, :__] @@ -65,12 +63,12 @@ defmodule GRPC.Server.Router do false = GRPC.Server.Router.match("/v1/shelves/example-shelf/something-els/books/book", match) """ - @spec match(String.t() | [String.t()], matchers()) :: {true, map()} | false + @spec match(String.t() | [String.t()], Template.matchers()) :: {true, map()} | false def match(path, match) do match(path, match, %{}) end - @spec match(String.t() | [String.t()], matchers(), map()) :: {true, map()} | false + @spec match(String.t() | [String.t()], Template.matchers(), map()) :: {true, map()} | false def match(path, match, bindings) when is_binary(path) do path |> split_path() diff --git a/lib/grpc/server/supervisor.ex b/lib/grpc/server/supervisor.ex index ea486a1fd..392f92616 100644 --- a/lib/grpc/server/supervisor.ex +++ b/lib/grpc/server/supervisor.ex @@ -55,7 +55,7 @@ defmodule GRPC.Server.Supervisor do end def init(opts) when is_list(opts) do - if not is_nil(Application.get_env(:grpc, :start_server)) do + unless is_nil(Application.get_env(:grpc, :start_server)) do raise "the :start_server config key has been deprecated.\ The currently supported way is to configure it\ through the :start_server option for the GRPC.Server.Supervisor" diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex index 15ad53082..99b37abec 100644 --- a/lib/grpc/stream.ex +++ b/lib/grpc/stream.ex @@ -13,11 +13,7 @@ defmodule GRPC.Stream do - Supports joining with external producers (e.g., RabbitMQ, Kafka) for unbounded or fan-in stream sources. - Offers composable functional operators (`map/2`, `filter/2`, `flat_map/2`, etc.) on the stream. - See the [API section](#api) for more information. - - ## Examples - - ### Bidirectional Streaming + ## Example: Bidirectional Streaming defmodule MyGRPCService do use GRPC.Server, service: MyService.Service @@ -31,7 +27,7 @@ defmodule GRPC.Stream do defp process_note(note), do: %Response{message: "Received"} end - ### Joining with an External Producer + ## Example: Joining with an External Producer When integrating with external unbounded sources (e.g., message queues), you can pass a running `GenStage` producer using the `:join_with` option: @@ -75,7 +71,7 @@ defmodule GRPC.Stream do - `:dispatcher` β€” Specifies the `Flow` dispatcher (defaults to `GenStage.DemandDispatcher`). - `:propagate_context` - If `true`, the context from the `materializer` is propagated to the `Flow`. - `:materializer` - The `%GRPC.Server.Stream{}` struct representing the current gRPC stream context. - + And any other options supported by `Flow`. ## Returns @@ -152,11 +148,11 @@ defmodule GRPC.Stream do """ @spec run(t()) :: any() def run(%__MODULE__{flow: flow, options: opts}) do - if !Keyword.get(opts, :unary, false) do + unless Keyword.get(opts, :unary, false) do raise ArgumentError, "run/2 is not supported for non-unary streams" end - # We have to call `Enum.to_list` because we want to actually run and materialize the full stream. + # We have to call `Enum.to_list` because we want to actually run and materialize the full stream. # List.flatten and List.first are used so that we can obtain the first result of the materialized list. flow |> Enum.to_list() @@ -184,7 +180,7 @@ defmodule GRPC.Stream do GRPC.Stream.run_with(request, mat) """ - @spec run_with(t(), stream :: Materializer.t(), Keyword.t()) :: :ok + @spec run_with(t(), Stream.t(), Keyword.t()) :: :ok def run_with( %__MODULE__{flow: flow, options: flow_opts} = _stream, %Materializer{} = from, diff --git a/lib/grpc/stub.ex b/lib/grpc/stub.ex index a3533725c..75710f60b 100644 --- a/lib/grpc/stub.ex +++ b/lib/grpc/stub.ex @@ -37,14 +37,12 @@ defmodule GRPC.Stub do You can refer to `call/6` for doc of your RPC functions. """ - require Logger - alias GRPC.Channel - alias GRPC.Client.Connection - - @default_timeout 10_000 - + @insecure_scheme "http" + @secure_scheme "https" @canceled_error GRPC.RPCError.exception(GRPC.Status.cancelled(), "The operation was cancelled") + # 10 seconds + @default_timeout 10000 @type receive_data_return :: {:ok, struct()} @@ -57,6 +55,8 @@ defmodule GRPC.Stub do | {:error, GRPC.RPCError.t()} | receive_data_return + require Logger + defmacro __using__(opts) do opts = Keyword.validate!(opts, [:service]) @@ -105,29 +105,19 @@ defmodule GRPC.Stub do end @doc """ - Establishes a connection with a gRPC server and returns a `GRPC.Channel` required - for sending requests. - - Supports advanced connection resolution via the gRPC `Resolver` - and various target schemes (`dns`, `unix`, `xds`, `host:port`, etc). - - This function is part of the **connection orchestration layer**, which manages - connection setup, name resolution, and optional load balancing. + Establish a connection with gRPC server and return `GRPC.Channel` needed for + sending requests. - ## Target Syntax - - The `target` argument to `connect/2` accepts URI-like strings that are resolved - using the configured [Resolver](GRPC.Client.Resolver). + ## Examples - Supported formats: + iex> GRPC.Stub.connect("localhost:50051") + {:ok, channel} - * `"dns://example.com:50051"` β€” resolves via DNS (A/AAAA records and `_grpc_config` TXT) - * `"ipv4:10.0.0.5:50051"` β€” fixed IPv4 address - * `"unix:/tmp/my.sock"` β€” Unix domain socket - * `"xds:///my-service"` β€” resolves via xDS control plane (Envoy/Istio/Traffic Director) - * `"127.0.0.1:50051"` β€” implicit DNS (default port 50051) + iex> GRPC.Stub.connect("localhost:50051", accepted_compressors: [GRPC.Compressor.Gzip]) + {:ok, channel} - If no scheme is provided, the resolver assumes `dns` by default. + iex> GRPC.Stub.connect("/paht/to/unix.sock") + {:ok, channel} ## Options @@ -136,42 +126,64 @@ defmodule GRPC.Stub do * `:adapter` - custom client adapter * `:interceptors` - client interceptors * `:codec` - client will use this to encode and decode binary message - * `:compressor` - the client will use this to compress requests and decompress responses. - If this is set, accepted_compressors will be appended also, so this can be used safely - without `:accepted_compressors`. + * `:compressor` - the client will use this to compress requests and decompress responses. If this is set, accepted_compressors + will be appended also, so this can be used safely without `:accepted_compressors`. * `:accepted_compressors` - tell servers accepted compressors, this can be used without `:compressor` * `:headers` - headers to attach to each request + """ + @spec connect(String.t(), keyword()) :: {:ok, Channel.t()} | {:error, any()} + def connect(addr, opts \\ []) when is_binary(addr) and is_list(opts) do + # This works because we only accept `http` and `https` schemes (allowlisted below explicitly) + # addresses like "localhost:1234" parse as if `localhost` is the scheme for URI, and this falls through to + # the base case. Accepting only `http/https` is a trait of `connect/3`. + + case URI.parse(addr) do + %URI{scheme: @secure_scheme, host: host, port: port} -> + opts = Keyword.put_new_lazy(opts, :cred, &default_ssl_option/0) + connect(host, port, opts) + + %URI{scheme: @insecure_scheme, host: host, port: port} -> + if opts[:cred] do + raise ArgumentError, "invalid option for insecure (http) address: :cred" + end - ## Examples - - ### Basic Connection - - iex> GRPC.Stub.connect("localhost:50051") - {:ok, channel} - - iex> GRPC.Stub.connect("localhost:50051", accepted_compressors: [GRPC.Compressor.Gzip]) - {:ok, channel} - - ### DNS Target - - iex> {:ok, ch} = GRPC.Client.Connection.connect("dns://my-service.local:50051") - - ### Unix Socket - - iex> GRPC.Stub.connect("/path/to/unix.sock") - {:ok, channel} + connect(host, port, opts) + # For compatibility with previous versions, we accept URIs in + # the "#{address}:#{port}" format + _ -> + case String.split(addr, ":") do + [socket_path] -> + connect({:local, socket_path}, 0, opts) - ## Notes + [address, port] -> + port = String.to_integer(port) + connect(address, port, opts) + end + end + end - * When using DNS or xDS targets, the connection layer periodically refreshes endpoints. - """ - @spec connect(String.t(), keyword()) :: {:ok, Channel.t()} | {:error, any()} - def connect(addr, opts \\ []) when is_binary(addr) and is_list(opts) do - Connection.connect(addr, opts) + if {:module, CAStore} == Code.ensure_loaded(CAStore) do + defp default_ssl_option do + %GRPC.Credential{ + ssl: [ + verify: :verify_peer, + depth: 99, + cacert_file: CAStore.file_path() + ] + } + end + else + defp default_ssl_option do + raise """ + no GRPC credentials provided. Please either: + + - Pass the `:cred` option to `GRPC.Stub.connect/2,3` + - Add `:castore` to your list of dependencies in `mix.exs` + """ + end end - @deprecated "Use connect/2 instead" @spec connect( String.t() | {:local, String.t()}, binary() | non_neg_integer(), @@ -188,14 +200,54 @@ defmodule GRPC.Stub do through the :adapter option for GRPC.Stub.connect/3" end - ip_type = - case :inet.parse_address(to_charlist(host)) do - {:ok, {_, _, _, _}} -> "ipv4" - {:ok, {_, _, _, _, _, _, _, _}} -> "ipv6" - {:error, _} -> "ipv4" + opts = + Keyword.validate!(opts, + cred: nil, + adapter: GRPC.Client.Adapters.Gun, + adapter_opts: [], + interceptors: [], + codec: GRPC.Codec.Proto, + compressor: nil, + accepted_compressors: [], + headers: [] + ) + + adapter = opts[:adapter] + + cred = opts[:cred] + scheme = if cred, do: @secure_scheme, else: @insecure_scheme + interceptors = init_interceptors(opts[:interceptors]) + codec = opts[:codec] + compressor = opts[:compressor] + accepted_compressors = opts[:accepted_compressors] + headers = opts[:headers] + + accepted_compressors = + if compressor do + Enum.uniq([compressor | accepted_compressors]) + else + accepted_compressors end - connect("#{ip_type}:#{host}:#{port}", opts) + adapter_opts = opts[:adapter_opts] + + unless is_list(adapter_opts) do + raise ArgumentError, ":adapter_opts must be a keyword list if present" + end + + %Channel{ + host: host, + port: port, + scheme: scheme, + cred: cred, + adapter: adapter, + interceptors: interceptors, + codec: codec, + compressor: compressor, + accepted_compressors: accepted_compressors, + headers: headers + } + |> adapter.connect(adapter_opts) end def retry_timeout(curr) when curr < 11 do @@ -211,12 +263,19 @@ defmodule GRPC.Stub do round(timeout + jitter * timeout) end + defp init_interceptors(interceptors) do + Enum.map(interceptors, fn + {interceptor, opts} -> {interceptor, interceptor.init(opts)} + interceptor -> {interceptor, interceptor.init([])} + end) + end + @doc """ Disconnects the adapter and frees any resources the adapter is consuming """ @spec disconnect(Channel.t()) :: {:ok, Channel.t()} | {:error, any()} - def disconnect(%Channel{} = channel) do - Connection.disconnect(channel) + def disconnect(%Channel{adapter: adapter} = channel) do + adapter.disconnect(channel) end @doc false @@ -240,26 +299,7 @@ defmodule GRPC.Stub do def call(_service_mod, rpc, %{channel: channel} = stream, request, opts) do {_, {req_mod, req_stream}, {res_mod, response_stream}, _rpc_options} = rpc - ch = - case Connection.pick_channel(channel, opts) do - {:ok, ch} -> - if Process.alive?(ch.adapter_payload.conn_pid) do - ch - else - Logger.warning( - "The connection process #{inspect(ch.adapter_payload.conn_pid)} is not alive, " <> - "please create a new channel via GRPC.Stub.connect/2" - ) - - channel - end - - _ -> - # fallback to the channel in the stream - channel - end - - stream = %{stream | channel: ch, request_mod: req_mod, response_mod: res_mod} + stream = %{stream | request_mod: req_mod, response_mod: res_mod} opts = if req_stream || response_stream do @@ -268,8 +308,8 @@ defmodule GRPC.Stub do parse_req_opts([{:timeout, @default_timeout} | opts]) end - compressor = Keyword.get(opts, :compressor, ch.compressor) - accepted_compressors = Keyword.get(opts, :accepted_compressors, ch.accepted_compressors) + compressor = Keyword.get(opts, :compressor, channel.compressor) + accepted_compressors = Keyword.get(opts, :accepted_compressors, channel.accepted_compressors) if not is_list(accepted_compressors) do raise ArgumentError, "accepted_compressors is not a list" @@ -284,7 +324,7 @@ defmodule GRPC.Stub do stream = %{ stream - | codec: Keyword.get(opts, :codec, ch.codec), + | codec: Keyword.get(opts, :codec, channel.codec), compressor: compressor, accepted_compressors: accepted_compressors } diff --git a/mix.exs b/mix.exs index 0b401b32e..2da58c413 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule GRPC.Mixfile do use Mix.Project - @version "0.11.0" + @version "0.11.1" def project do [ @@ -39,7 +39,7 @@ defmodule GRPC.Mixfile do {:cowboy, "~> 2.10"}, {:flow, "~> 1.2"}, {:gun, "~> 2.0"}, - {:jason, ">= 0.0.0"}, + {:jason, ">= 0.0.0", optional: true}, {:cowlib, "~> 2.12"}, {:castore, "~> 0.1 or ~> 1.0", optional: true}, {:protobuf, "~> 0.14"}, @@ -47,7 +47,6 @@ defmodule GRPC.Mixfile do {:mint, "~> 1.5"}, {:ex_doc, "~> 0.29", only: :dev}, {:ex_parameterized, "~> 1.3.7", only: :test}, - {:mox, "~> 1.2", only: :test}, {:telemetry, "~> 1.0"} ] end diff --git a/mix.lock b/mix.lock index d56638390..eb3051284 100644 --- a/mix.lock +++ b/mix.lock @@ -14,10 +14,7 @@ "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.4", "29563475afa9b8a2add1b7a9c8fb68d06ca7737648f28398e04461f008b69521", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f4ed47ecda66de70dd817698a703f8816daa91272e7e45812469498614ae8b29"}, - "meck": {:hex, :meck, "1.0.0", "24676cb6ee6951530093a93edcd410cfe4cb59fe89444b875d35c9d3909a15d0", [:rebar3], [], "hexpm", "680a9bcfe52764350beb9fb0335fb75fee8e7329821416cee0a19fec35433882"}, "mint": {:hex, :mint, "1.5.2", "4805e059f96028948870d23d7783613b7e6b0e2fb4e98d720383852a760067fd", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "d77d9e9ce4eb35941907f1d3df38d8f750c357865353e21d335bdcdf6d892a02"}, - "mox": {:hex, :mox, "1.2.0", "a2cd96b4b80a3883e3100a221e8adc1b98e4c3a332a8fc434c39526babafd5b3", [:mix], [{:nimble_ownership, "~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}], "hexpm", "c7b92b3cc69ee24a7eeeaf944cd7be22013c52fcb580c1f33f50845ec821089a"}, - "nimble_ownership": {:hex, :nimble_ownership, "1.0.1", "f69fae0cdd451b1614364013544e66e4f5d25f36a2056a9698b793305c5aa3a6", [:mix], [], "hexpm", "3825e461025464f519f3f3e4a1f9b68c47dc151369611629ad08b636b73bb22d"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "protobuf": {:hex, :protobuf, "0.14.1", "9ac0582170df27669ccb2ef6cb0a3d55020d58896edbba330f20d0748881530a", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "39a9d49d346e3ed597e5ae3168a43d9603870fc159419617f584cdf6071f0e25"}, "protobuf_generate": {:hex, :protobuf_generate, "0.1.3", "57841bc60e2135e190748119d83f78669ee7820c0ad6555ada3cd3cd7df93143", [:mix], [{:protobuf, "~> 0.12", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "dae4139b00ba77a279251a0ceb5593b1bae745e333b4ce1ab7e81e8e4906016b"}, diff --git a/test/grpc/client/adapters/mint_test.exs b/test/grpc/client/adapters/mint_test.exs index 29c56ce2a..261235fd3 100644 --- a/test/grpc/client/adapters/mint_test.exs +++ b/test/grpc/client/adapters/mint_test.exs @@ -1,5 +1,5 @@ defmodule GRPC.Client.Adapters.MintTest do - use GRPC.DataCase, async: false + use GRPC.DataCase alias GRPC.Client.Adapters.Mint diff --git a/test/grpc/client/resolver/dns_test.exs b/test/grpc/client/resolver/dns_test.exs deleted file mode 100644 index a97982b79..000000000 --- a/test/grpc/client/resolver/dns_test.exs +++ /dev/null @@ -1,52 +0,0 @@ -defmodule GRPC.Client.Resolver.DNSTest do - use ExUnit.Case, async: true - import Mox - - alias GRPC.Client.Resolver.DNS - - setup :verify_on_exit! - - setup do - Mox.set_mox_global() - :ok - end - - test "resolves A record and parses service config from TXT via GenServer" do - host = "my-service.local" - config_name = "_grpc_config." <> host - - DNS.MockAdapter - |> expect(:lookup, fn ^host, :a -> - {:ok, [{127, 0, 0, 1}]} - end) - |> expect(:lookup, fn ^config_name, :txt -> - {:ok, - [ - ~c'grpc_config={ - "loadBalancingConfig":[{"round_robin":{}}], - "methodConfig":[ - { - "name":[ - {"service":"foo","method":"bar"}, - {"service":"baz"} - ], - "timeout":"1.000000001s" - } - ] - }' - ]} - end) - - assert {:ok, %{addresses: addrs, service_config: config}} = DNS.resolve(host) - assert [%{address: "127.0.0.1", port: 50051}] = addrs - assert config.load_balancing_policy == :round_robin - - method_names = - Enum.flat_map(config.method_configs, fn mc -> - Enum.map(mc["name"], fn n -> {n["service"], Map.get(n, "method")} end) - end) - - assert {"foo", "bar"} in method_names - assert {"baz", nil} in method_names - end -end diff --git a/test/grpc/client/resolver/ipv4_test.exs b/test/grpc/client/resolver/ipv4_test.exs deleted file mode 100644 index 4fd000da7..000000000 --- a/test/grpc/client/resolver/ipv4_test.exs +++ /dev/null @@ -1,26 +0,0 @@ -defmodule GRPC.Client.Resolver.IPv4Test do - use ExUnit.Case, async: true - - alias GRPC.Client.Resolver.IPv4 - - test "resolves multiple IPv4 addresses with ports" do - target = "ipv4:10.0.0.1:50051,10.0.0.2:50052" - - assert {:ok, %{addresses: addresses, service_config: nil}} = IPv4.resolve(target) - - assert addresses == [ - %{address: "10.0.0.1", port: 50051}, - %{address: "10.0.0.2", port: 50052} - ] - end - - test "resolves single IPv4 address" do - target = "ipv4:192.168.1.10:12345" - - assert {:ok, %{addresses: addresses, service_config: nil}} = IPv4.resolve(target) - - assert addresses == [ - %{address: "192.168.1.10", port: 12345} - ] - end -end diff --git a/test/grpc/client/resolver/ipv6_test.exs b/test/grpc/client/resolver/ipv6_test.exs deleted file mode 100644 index 6e555f453..000000000 --- a/test/grpc/client/resolver/ipv6_test.exs +++ /dev/null @@ -1,26 +0,0 @@ -defmodule GRPC.Client.Resolver.IPv6Test do - use ExUnit.Case, async: true - - alias GRPC.Client.Resolver.IPv6 - - test "resolves multiple IPv6 addresses with ports" do - target = "ipv6:[2607:f8b0:400e:c00::ef]:443,[::1]:50051" - - assert {:ok, %{addresses: addresses, service_config: nil}} = IPv6.resolve(target) - - assert addresses == [ - %{address: "2607:f8b0:400e:c00::ef", port: 443}, - %{address: "::1", port: 50051} - ] - end - - test "resolves single IPv6 address with default port" do - target = "ipv6:[::1]" - - assert {:ok, %{addresses: addresses, service_config: nil}} = IPv6.resolve(target) - - assert addresses == [ - %{address: "::1", port: 443} - ] - end -end diff --git a/test/grpc/client/resolver/unix_test.exs b/test/grpc/client/resolver/unix_test.exs deleted file mode 100644 index 7b2835aa3..000000000 --- a/test/grpc/client/resolver/unix_test.exs +++ /dev/null @@ -1,25 +0,0 @@ -defmodule GRPC.Client.Resolver.UnixTest do - use ExUnit.Case, async: true - - alias GRPC.Client.Resolver.Unix - - test "resolves unix socket path" do - target = "unix:///var/run/my.sock" - - assert {:ok, %{addresses: addresses, service_config: nil}} = Unix.resolve(target) - - assert addresses == [ - %{address: {:local, "/var/run/my.sock"}, port: 0, socket: :unix} - ] - end - - test "resolves unix socket with relative path" do - target = "unix:/tmp/test.sock" - - assert {:ok, %{addresses: addresses, service_config: nil}} = Unix.resolve(target) - - assert addresses == [ - %{address: {:local, "/tmp/test.sock"}, port: 0, socket: :unix} - ] - end -end diff --git a/test/grpc/integration/client_interceptor_test.exs b/test/grpc/integration/client_interceptor_test.exs index 98aa6e094..a699a0dc3 100644 --- a/test/grpc/integration/client_interceptor_test.exs +++ b/test/grpc/integration/client_interceptor_test.exs @@ -49,45 +49,45 @@ defmodule GRPC.Integration.ClientInterceptorTest do run(HelloServer) end - # test "client sends headers" do - # client_prefix = GRPC.Telemetry.client_rpc_prefix() - # stop_client_name = client_prefix ++ [:stop] - # service_name = Helloworld.Greeter.Service.__meta__(:name) - - # attach_events([ - # stop_client_name - # ]) - - # run_endpoint(HelloEndpoint, fn port -> - # {:ok, channel} = - # GRPC.Stub.connect("localhost:#{port}", - # interceptors: [ - # {AddHeadersClientInterceptor, "two"}, - # {AddHeadersClientInterceptor, "one"} - # ] - # ) - - # req = %Helloworld.HelloRequest{name: "Elixir"} - # {:ok, reply} = channel |> Helloworld.Greeter.Stub.say_hello(req) - # assert reply.message == "Hello, Elixir one two" - - # assert_received {^stop_client_name, _measurements, metadata} - # assert %{stream: stream, request: ^req} = metadata - - # assert %{ - # channel: ^channel, - # service_name: ^service_name, - # method_name: "SayHello" - # } = stream - # end) - # end + test "client sends headers" do + client_prefix = GRPC.Telemetry.client_rpc_prefix() + stop_client_name = client_prefix ++ [:stop] + service_name = Helloworld.Greeter.Service.__meta__(:name) + + attach_events([ + stop_client_name + ]) + + run_endpoint(HelloEndpoint, fn port -> + {:ok, channel} = + GRPC.Stub.connect("localhost:#{port}", + interceptors: [ + {AddHeadersClientInterceptor, "two"}, + {AddHeadersClientInterceptor, "one"} + ] + ) + + req = %Helloworld.HelloRequest{name: "Elixir"} + {:ok, reply} = channel |> Helloworld.Greeter.Stub.say_hello(req) + assert reply.message == "Hello, Elixir one two" + + assert_received {^stop_client_name, _measurements, metadata} + assert %{stream: stream, request: ^req} = metadata + + assert %{ + channel: ^channel, + service_name: ^service_name, + method_name: "SayHello" + } = stream + end) + end test "sends exception event upon client exception" do message = "exception-#{inspect(self())}" - for {function, _kind, _reason} <- [ + for {function, kind, reason} <- [ {&throw/1, :throw, message}, - {&:erlang.exit/1, :throw, message}, + {&:erlang.exit/1, :exit, message}, {&raise/1, :error, %RuntimeError{message: message}}, {&:erlang.error/1, :error, %ErlangError{original: message}} ] do @@ -126,9 +126,24 @@ defmodule GRPC.Integration.ClientInterceptorTest do assert_received {^exception_client_name, measurements, metadata} assert %{duration: duration} = measurements assert duration > delay - assert is_map(metadata) - assert is_list(Map.get(metadata, :stacktrace, [])) + + assert %{kind: ^kind, reason: ^reason, stacktrace: stacktrace} = metadata + + assert is_list(stacktrace) + + Enum.each(stacktrace, fn entry -> + # ensure stacktrace is a pure stacktrace + assert {mod, fun, arity, meta} = entry + assert is_atom(mod) + assert is_atom(fun) + assert is_integer(arity) + assert is_list(meta) + end) end) + + assert_receive {:gun_down, _, _, _, _} + + refute_receive _ end end end diff --git a/test/grpc/integration/stub_test.exs b/test/grpc/integration/stub_test.exs index bea07283f..8489a621f 100644 --- a/test/grpc/integration/stub_test.exs +++ b/test/grpc/integration/stub_test.exs @@ -33,7 +33,6 @@ defmodule GRPC.Integration.StubTest do test "you can disconnect stubs" do run_server(HelloServer, fn port -> {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") - Process.sleep(100) %{adapter_payload: %{conn_pid: gun_conn_pid}} = channel @@ -52,6 +51,7 @@ defmodule GRPC.Integration.StubTest do test "disconnecting a disconnected channel is a no-op" do run_server(HelloServer, fn port -> {:ok, channel} = GRPC.Stub.connect("localhost:#{port}") + {:ok, channel} = GRPC.Stub.disconnect(channel) {:ok, _channel} = GRPC.Stub.disconnect(channel) end) end diff --git a/test/test_helper.exs b/test/test_helper.exs index ffc78c755..805a2a642 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -7,15 +7,4 @@ codecs = [ ] Enum.each(codecs, &Code.ensure_loaded/1) - -Mox.defmock(GRPC.Client.Resolver.DNS.MockAdapter, - for: GRPC.Client.Resolver.DNS.Adapter -) - -{:ok, _pid} = - DynamicSupervisor.start_link( - strategy: :one_for_one, - name: GRPC.Client.Supervisor - ) - ExUnit.start(capture_log: true)