From f58348171a151672c6100e1e415a641e3560ea56 Mon Sep 17 00:00:00 2001 From: Peter Solnica Date: Fri, 4 Jul 2025 22:56:01 +0000 Subject: [PATCH] Introduce Telemetry operations extension --- lib/drops/operations/command.ex | 3 +- lib/drops/operations/extensions/telemetry.ex | 511 ++++++++ mix.exs | 1 + .../operations/extensions/telemetry_test.exs | 1030 +++++++++++++++++ 4 files changed, 1544 insertions(+), 1 deletion(-) create mode 100644 lib/drops/operations/extensions/telemetry.ex create mode 100644 test/drops/operations/extensions/telemetry_test.exs diff --git a/lib/drops/operations/command.ex b/lib/drops/operations/command.ex index 53102f0..173a41c 100644 --- a/lib/drops/operations/command.ex +++ b/lib/drops/operations/command.ex @@ -39,6 +39,7 @@ defmodule Drops.Operations.Command do extensions: [ Drops.Operations.Extensions.Command, Drops.Operations.Extensions.Params, - Drops.Operations.Extensions.Ecto + Drops.Operations.Extensions.Ecto, + Drops.Operations.Extensions.Telemetry ] end diff --git a/lib/drops/operations/extensions/telemetry.ex b/lib/drops/operations/extensions/telemetry.ex new file mode 100644 index 0000000..86193dd --- /dev/null +++ b/lib/drops/operations/extensions/telemetry.ex @@ -0,0 +1,511 @@ +defmodule Drops.Operations.Extensions.Telemetry do + @moduledoc """ + Telemetry extension for Operations framework. + + This extension provides telemetry instrumentation for Operations steps, + allowing you to monitor and observe the execution of your operations. + + ## Features + + - Automatic operation-level telemetry (instruments first and last steps) + - Configurable step-level instrumentation + - Integration with Elixir's :telemetry library + - Metadata includes operation module, step name, and execution context + + ## Usage + + ### Default Behavior + + Enable telemetry with default behavior (instruments operation start/stop using first/last steps): + + defmodule CreateUser do + use Drops.Operations.Command, telemetry: true + + steps do + @impl true + def execute(%{params: params}) do + {:ok, create_user(params)} + end + end + end + + ### Custom Step Configuration + + Instrument specific steps only: + + defmodule CreateUser do + use Drops.Operations.Command, telemetry: [steps: [:validate, :execute]] + + steps do + @impl true + def execute(%{params: params}) do + {:ok, create_user(params)} + end + end + end + + Instrument all steps: + + defmodule CreateUser do + use Drops.Operations.Command, telemetry: [steps: :all] + + steps do + @impl true + def execute(%{params: params}) do + {:ok, create_user(params)} + end + end + end + + ### Custom Event Identifier + + Configure a custom identifier for telemetry events (replaces `:drops` in event names): + + defmodule CreateUser do + use Drops.Operations.Command, telemetry: [identifier: :my_app] + + steps do + @impl true + def execute(%{params: params}) do + {:ok, create_user(params)} + end + end + end + + This will emit events like `[:my_app, :operations, :operation, :start]` instead of `[:drops, :operations, :operation, :start]`. + + ### Combined Configuration + + You can combine custom identifier with step configuration: + + defmodule CreateUser do + use Drops.Operations.Command, telemetry: [identifier: :my_app, steps: [:validate, :execute]] + + steps do + @impl true + def execute(%{params: params}) do + {:ok, create_user(params)} + end + end + end + + ### Step Error Instrumentation + + You can instrument step error events separately from regular step events using `telemetry_step_errors`: + + defmodule CreateUser do + use Drops.Operations.Command, + telemetry: true, + telemetry_step_errors: :all + + steps do + def validate(%{params: params}) do + # validation logic that might fail + {:error, "validation failed"} + end + + @impl true + def execute(%{params: params}) do + {:ok, create_user(params)} + end + end + end + + This will: + - Instrument operation boundaries (start/stop events) via `telemetry: true` + - Instrument error events for all steps when they return `{:error, reason}` via `telemetry_step_errors: :all` + + You can also specify specific steps for error instrumentation: + + defmodule CreateUser do + use Drops.Operations.Command, + telemetry: true, + telemetry_step_errors: [:validate, :execute] + end + + The `telemetry_step_errors` configuration uses the same identifier as the main `telemetry` configuration. + + ## Telemetry Events + + The extension emits the following telemetry events: + + ### Operation-Level Events (when `telemetry: true`) + + - `[, :operation, :start]` - Emitted before the first step executes + - `[, :operation, :stop]` - Emitted after the last step completes successfully + - `[, :operation, :exception]` - Emitted when the last step fails + + ### Step-Level Events (when `telemetry: [steps: [...]]`) + + - `[, :operation, :step, :start]` - Emitted before a specific step executes + - `[, :operation, :step, :stop]` - Emitted after a specific step completes successfully + - `[, :operation, :step, :exception]` - Emitted when a specific step fails + + ### Step Error Events (when `telemetry_step_errors: [...]`) + + - `[, :operation, :step, :exception]` - Emitted when a step returns `{:error, reason}` + + This is useful for capturing step failures without the overhead of instrumenting all step events. + The error events include the same metadata as regular step exception events but are only emitted + for error returns, not actual exceptions. + + Where `` defaults to `:drops` but can be customized using the `:identifier` option. + + ### Event Metadata + + All events include the following metadata: + + - `:operation` - The operation module name + - `:step` - The actual step name (atom) that was instrumented + - `:context` - The execution context (map) + + ### Event Measurements + + - `:start` events include `:system_time` (system time when step started) + - `:stop` events include `:duration` (step execution time in native units) + - `:exception` events include `:duration` and `:kind`, `:reason`, `:stacktrace` + + ## Example Usage with Telemetry Handlers + + # In your application startup (using default :drops identifier) + :telemetry.attach_many( + "operations-telemetry", + [ + [:drops, :operation, :start], + [:drops, :operation, :stop], + [:drops, :operation, :exception], + [:drops, :operation, :step, :start], + [:drops, :operation, :step, :stop], + [:drops, :operation, :step, :exception] + ], + &MyApp.TelemetryHandler.handle_event/4, + %{} + ) + + # Or with custom identifier + :telemetry.attach_many( + "my-app-operations-telemetry", + [ + [:my_app, :operation, :start], + [:my_app, :operation, :stop], + [:my_app, :operation, :exception], + [:my_app, :operation, :step, :start], + [:my_app, :operation, :step, :stop], + [:my_app, :operation, :step, :exception] + ], + &MyApp.TelemetryHandler.handle_event/4, + %{} + ) + + defmodule MyApp.TelemetryHandler do + require Logger + + def handle_event([_identifier, :operation, :start], measurements, metadata, _config) do + Logger.info("Starting operation \#{metadata.operation} with step \#{metadata.step}") + end + + def handle_event([_identifier, :operation, :stop], measurements, metadata, _config) do + duration_ms = System.convert_time_unit(measurements.duration, :native, :millisecond) + Logger.info("Completed operation \#{metadata.operation} in \#{duration_ms}ms") + end + + def handle_event([_identifier, :operation, :step, :start], measurements, metadata, _config) do + Logger.info("Starting step \#{metadata.step} in \#{metadata.operation}") + end + + def handle_event([_identifier, :operation, :step, :stop], measurements, metadata, _config) do + duration_ms = System.convert_time_unit(measurements.duration, :native, :millisecond) + Logger.info("Completed step \#{metadata.step} in \#{duration_ms}ms") + end + + def handle_event([_identifier, :operation, :exception], measurements, metadata, _config) do + Logger.error("Failed in \#{metadata.operation}: \#{inspect(metadata.reason)}") + end + + def handle_event([_identifier, :operation, :step, :exception], measurements, metadata, _config) do + Logger.error("Failed step \#{metadata.step} in \#{metadata.operation}: \#{inspect(metadata.reason)}") + end + end + """ + use Drops.Operations.Extension + + @depends_on [Drops.Operations.Extensions.Command, Drops.Operations.Extensions.Params] + + @impl true + @spec enable?(keyword()) :: boolean() + def enable?(opts) do + case opts[:telemetry] do + false -> false + nil -> opts[:debug] == true + _ -> true + end + end + + @impl true + @spec default_opts(keyword()) :: keyword() + def default_opts(_opts) do + [] + end + + @impl true + @spec unit_of_work(Drops.Operations.UnitOfWork.t(), keyword()) :: + Drops.Operations.UnitOfWork.t() + def unit_of_work(uow, opts) do + telemetry_config = Keyword.get(opts, :telemetry, false) + telemetry_step_errors = Keyword.get(opts, :telemetry_step_errors, false) + + uow = + case telemetry_config do + false -> + uow + + true -> + # Default behavior: instrument first and last steps for operation boundaries + identifier = :drops + instrument_operation_boundaries(uow, identifier) + + config when is_list(config) -> + # Custom configuration with specific steps and/or identifier + identifier = Keyword.get(config, :identifier, :drops) + steps_to_instrument = Keyword.get(config, :steps, []) + + cond do + steps_to_instrument == [] -> + # No specific steps configured, use operation boundaries + instrument_operation_boundaries(uow, identifier) + + steps_to_instrument == :all -> + # Instrument all available steps using step_order + all_steps = uow.step_order + instrument_specific_steps(uow, all_steps, identifier) + + true -> + # Instrument specific steps + instrument_specific_steps(uow, steps_to_instrument, identifier) + end + end + + # Handle step error instrumentation separately + case telemetry_step_errors do + false -> + uow + + :all -> + # Instrument error callbacks for all steps + # Use the same identifier as the main telemetry configuration + identifier = + case telemetry_config do + config when is_list(config) -> Keyword.get(config, :identifier, :drops) + _ -> :drops + end + + instrument_step_errors(uow, uow.step_order, identifier) + + steps when is_list(steps) -> + # Instrument error callbacks for specific steps + # Use the same identifier as the main telemetry configuration + identifier = + case telemetry_config do + config when is_list(config) -> Keyword.get(config, :identifier, :drops) + _ -> :drops + end + + instrument_step_errors(uow, steps, identifier) + end + end + + defp instrument_operation_boundaries(uow, identifier) do + case uow.step_order do + [] -> + uow + + [first_step | _] -> + uow + # Instrument first step for operation start (using actual step name) + |> register_before_callback( + first_step, + __MODULE__, + :emit_operation_start, + {first_step, identifier} + ) + # Instrument all steps for operation stop to capture failures at any step + |> instrument_operation_stop_for_all_steps(identifier) + end + end + + defp instrument_specific_steps(uow, step_events, identifier) do + Enum.reduce(step_events, uow, fn step, acc_uow -> + # Only instrument if the step exists in the pipeline + if Map.has_key?(acc_uow.steps, step) do + acc_uow + |> register_before_callback( + step, + __MODULE__, + :emit_step_start, + {step, identifier} + ) + |> register_after_callback(step, __MODULE__, :emit_step_stop, {step, identifier}) + else + acc_uow + end + end) + end + + defp instrument_operation_stop_for_all_steps(uow, identifier) do + Enum.reduce(uow.step_order, uow, fn step, acc_uow -> + register_after_callback( + acc_uow, + step, + __MODULE__, + :emit_operation_stop, + {step, identifier} + ) + end) + end + + defp instrument_step_errors(uow, step_events, identifier) do + Enum.reduce(step_events, uow, fn step, acc_uow -> + # Only instrument if the step exists in the pipeline + if Map.has_key?(acc_uow.steps, step) do + register_after_callback( + acc_uow, + step, + __MODULE__, + :emit_step_error, + {step, identifier} + ) + else + acc_uow + end + end) + end + + @doc false + def emit_operation_start(operation_module, _step, context, config) do + {actual_step, identifier} = config.original_config + start_time = config.trace.start_time + + :telemetry.execute( + [identifier, :operation, :start], + %{system_time: start_time}, + %{operation: operation_module, step: actual_step, context: context} + ) + + :ok + end + + @doc false + def emit_operation_stop(operation_module, step, context, result, config) do + {actual_step, identifier} = config.original_config + duration = Drops.Operations.Trace.total_duration(config.trace) || 0 + + case result do + {:ok, _} -> + # Only emit operation stop event if this is the last step + if is_last_step?(operation_module, step) do + :telemetry.execute( + [identifier, :operation, :stop], + %{duration: duration}, + %{operation: operation_module, step: actual_step, context: context} + ) + end + + {:error, reason} -> + # Always emit operation exception event when any step fails + :telemetry.execute( + [identifier, :operation, :exception], + %{duration: duration}, + %{ + operation: operation_module, + step: actual_step, + context: context, + kind: :error, + reason: reason, + stacktrace: [] + } + ) + end + + :ok + end + + defp is_last_step?(operation_module, step) do + List.last(operation_module.__unit_of_work__().step_order) == step + end + + @doc false + def emit_step_start(operation_module, _step, context, config) do + {actual_step, identifier} = config.original_config + + start_time = config.trace.step_timings[actual_step][:start_time] + + :telemetry.execute( + [identifier, :operation, :step, :start], + %{system_time: start_time}, + %{operation: operation_module, step: actual_step, context: context} + ) + + :ok + end + + @doc false + def emit_step_stop(operation_module, _step, context, result, config) do + {actual_step, identifier} = config.original_config + + duration = config.trace.step_timings[actual_step][:duration] + + case result do + {:ok, _} -> + :telemetry.execute( + [identifier, :operation, :step, :stop], + %{duration: duration}, + %{operation: operation_module, step: actual_step, context: context} + ) + + {:error, reason} -> + :telemetry.execute( + [identifier, :operation, :step, :exception], + %{duration: duration}, + %{ + operation: operation_module, + step: actual_step, + context: context, + kind: :error, + reason: reason, + stacktrace: [] + } + ) + end + + :ok + end + + @doc false + def emit_step_error(operation_module, _step, context, result, config) do + {actual_step, identifier} = config.original_config + + duration = config.trace.step_timings[actual_step][:duration] + + case result do + {:error, reason} -> + :telemetry.execute( + [identifier, :operation, :step, :exception], + %{duration: duration}, + %{ + operation: operation_module, + step: actual_step, + context: context, + kind: :error, + reason: reason, + stacktrace: [] + } + ) + + _ -> + # Not an error, do nothing + :ok + end + + :ok + end +end diff --git a/mix.exs b/mix.exs index e0b7a1a..dd63d7f 100644 --- a/mix.exs +++ b/mix.exs @@ -108,6 +108,7 @@ defmodule Drops.MixProject do defp deps do [ {:nimble_options, "~> 1.0"}, + {:telemetry, "~> 1.0"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:credo, "~> 1.6", only: [:dev, :test], runtime: false}, {:doctor, "~> 0.21.0", only: :dev}, diff --git a/test/drops/operations/extensions/telemetry_test.exs b/test/drops/operations/extensions/telemetry_test.exs new file mode 100644 index 0000000..d405a55 --- /dev/null +++ b/test/drops/operations/extensions/telemetry_test.exs @@ -0,0 +1,1030 @@ +defmodule Drops.Operations.Extensions.TelemetryTest do + use Drops.OperationCase, async: false + + alias Drops.Operations.Extensions.Telemetry + + defmodule TelemetryTestHandler do + def handle_event(event, measurements, metadata, test_pid) do + send(test_pid, {:telemetry_event, event, measurements, metadata}) + end + end + + describe "enable?/1" do + test "returns false when telemetry is not configured" do + refute Telemetry.enable?([]) + refute Telemetry.enable?(telemetry: false) + end + + test "returns true when telemetry is enabled with boolean" do + assert Telemetry.enable?(telemetry: true) + end + + test "returns true when telemetry is configured with options" do + assert Telemetry.enable?(telemetry: [steps: [:validate, :execute]]) + end + + test "returns true when telemetry is configured with custom identifier" do + assert Telemetry.enable?(telemetry: [identifier: :my_app]) + end + + test "returns true when telemetry is configured with both identifier and steps" do + assert Telemetry.enable?( + telemetry: [identifier: :my_app, steps: [:validate, :execute]] + ) + end + end + + describe "default_opts/1" do + test "returns empty list" do + assert Telemetry.default_opts([]) == [] + end + end + + describe "telemetry events" do + setup do + # Capture telemetry events + test_pid = self() + + :telemetry.attach_many( + "test-telemetry", + [ + [:drops, :operation, :start], + [:drops, :operation, :stop], + [:drops, :operation, :exception], + [:drops, :operation, :step, :start], + [:drops, :operation, :step, :stop], + [:drops, :operation, :step, :exception] + ], + &TelemetryTestHandler.handle_event/4, + test_pid + ) + + on_exit(fn -> + :telemetry.detach("test-telemetry") + end) + + :ok + end + + test "emits start and stop events for operation with default telemetry" do + defmodule TestOperationDefault do + use Drops.Operations.Command, telemetry: true + + steps do + @impl true + def execute(%{params: params}) do + {:ok, Map.put(params, :executed, true)} + end + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestOperationDefault.call(context) + + # Should receive operation start event (using first step name: prepare) + assert_receive {:telemetry_event, [:drops, :operation, :start], measurements, + metadata} + + assert %{system_time: _} = measurements + + assert %{operation: TestOperationDefault, step: :prepare, context: ^context} = + metadata + + # Should receive operation stop event (using last step name: execute) + assert_receive {:telemetry_event, [:drops, :operation, :stop], measurements, + metadata} + + assert %{duration: _} = measurements + + assert %{operation: TestOperationDefault, step: :execute, context: ^context} = + metadata + end + + test "emits events for specific steps when configured" do + defmodule TestOperationSpecific do + use Drops.Operations.Command, telemetry: [steps: [:execute]] + + steps do + def prepare(context), do: {:ok, context} + def validate(context), do: {:ok, context} + + @impl true + def execute(%{params: params}) do + {:ok, Map.put(params, :executed, true)} + end + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestOperationSpecific.call(context) + + # Should only receive events for the execute step + assert_receive {:telemetry_event, [:drops, :operation, :step, :start], measurements, + metadata} + + assert %{system_time: _} = measurements + assert %{operation: TestOperationSpecific, step: :execute, context: _} = metadata + + assert_receive {:telemetry_event, [:drops, :operation, :step, :stop], measurements, + metadata} + + assert %{duration: _} = measurements + assert %{operation: TestOperationSpecific, step: :execute, context: _} = metadata + + # Should not receive events for prepare or validate steps + refute_receive {:telemetry_event, [:drops, :operation, :step, :start], _, + %{step: :prepare}} + + refute_receive {:telemetry_event, [:drops, :operation, :step, :start], _, + %{step: :validate}} + end + + test "emits exception events when operation fails in last step" do + defmodule TestOperationErrorLastStep do + use Drops.Operations.Command, telemetry: true + + steps do + @impl true + def execute(_context) do + {:error, "something went wrong"} + end + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationErrorLastStep.call(context) + + # Should receive operation start event (first step: prepare) + assert_receive {:telemetry_event, [:drops, :operation, :start], _, + %{step: :prepare}} + + # Should receive exception event for operation stop (last step: execute) + assert_receive {:telemetry_event, [:drops, :operation, :exception], measurements, + metadata} + + assert %{duration: _} = measurements + + assert %{ + operation: TestOperationErrorLastStep, + step: :execute, + context: ^context, + kind: :error, + reason: "something went wrong" + } = metadata + end + + test "emits exception events when operation fails in middle step" do + defmodule TestOperationErrorMiddleStep do + use Drops.Operations.Command, telemetry: true + + steps do + @impl true + def validate(_context) do + {:error, "validation failed"} + end + + @impl true + def execute(_context) do + {:ok, "should not reach here"} + end + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationErrorMiddleStep.call(context) + + # Should receive operation start event (first step: prepare) + assert_receive {:telemetry_event, [:drops, :operation, :start], _, + %{step: :prepare}} + + # Should receive exception event for operation failure (from validate step) + assert_receive {:telemetry_event, [:drops, :operation, :exception], measurements, + metadata} + + assert %{duration: _} = measurements + + assert %{ + operation: TestOperationErrorMiddleStep, + step: :validate, + context: ^context, + kind: :error, + reason: "validation failed" + } = metadata + + # Should NOT receive operation stop event since operation failed + refute_receive {:telemetry_event, [:drops, :operation, :stop], _, _} + end + + test "emits exception events when operation fails in first step" do + defmodule TestOperationErrorFirstStep do + use Drops.Operations.Command, telemetry: true + + steps do + @impl true + def prepare(_context) do + {:error, "preparation failed"} + end + + @impl true + def validate(_context) do + {:ok, "should not reach here"} + end + + @impl true + def execute(_context) do + {:ok, "should not reach here"} + end + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationErrorFirstStep.call(context) + + # Should receive operation start event (first step: prepare) + assert_receive {:telemetry_event, [:drops, :operation, :start], _, + %{step: :prepare}} + + # Should receive exception event for operation failure (from prepare step) + assert_receive {:telemetry_event, [:drops, :operation, :exception], measurements, + metadata} + + assert %{duration: _} = measurements + + assert %{ + operation: TestOperationErrorFirstStep, + step: :prepare, + context: ^context, + kind: :error, + reason: "preparation failed" + } = metadata + + # Should NOT receive operation stop event since operation failed + refute_receive {:telemetry_event, [:drops, :operation, :stop], _, _} + end + + test "does not emit events when telemetry is disabled" do + defmodule TestOperationDisabled do + use Drops.Operations.Command + + steps do + @impl true + def execute(%{params: params}) do + {:ok, Map.put(params, :executed, true)} + end + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestOperationDisabled.call(context) + + # Should not receive any telemetry events + refute_receive {:telemetry_event, _, _, _}, 100 + end + + test "emits events for multiple configured steps" do + defmodule TestOperationMultiple do + use Drops.Operations.Command, telemetry: [steps: [:validate, :execute]] + + steps do + def prepare(context), do: {:ok, context} + + def validate(context) do + # Small delay to ensure measurable duration + Process.sleep(1) + {:ok, context} + end + + @impl true + def execute(%{params: params}) do + # Small delay to ensure measurable duration + Process.sleep(1) + {:ok, Map.put(params, :executed, true)} + end + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestOperationMultiple.call(context) + + # Should receive events for validate step + assert_receive {:telemetry_event, [:drops, :operation, :step, :start], _, + %{step: :validate}} + + assert_receive {:telemetry_event, [:drops, :operation, :step, :stop], measurements, + %{step: :validate}} + + assert measurements.duration > 0 + + # Should receive events for execute step + assert_receive {:telemetry_event, [:drops, :operation, :step, :start], _, + %{step: :execute}} + + assert_receive {:telemetry_event, [:drops, :operation, :step, :stop], measurements, + %{step: :execute}} + + assert measurements.duration > 0 + + # Should not receive events for prepare step + refute_receive {:telemetry_event, [:drops, :operation, :step, :start], _, + %{step: :prepare}} + end + + test "emits events for all steps when configured with :all" do + defmodule TestOperationAll do + use Drops.Operations.Command, telemetry: [steps: :all] + + schema do + %{ + required(:name) => string(:filled?) + } + end + + steps do + def prepare(context), do: {:ok, context} + def validate(context), do: {:ok, context} + + @impl true + def execute(%{params: params}) do + {:ok, Map.put(params, :executed, true)} + end + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestOperationAll.call(context) + + # Should receive events for all steps: conform, prepare, validate, execute + assert_receive {:telemetry_event, [:drops, :operation, :step, :start], _, + %{step: :conform}} + + assert_receive {:telemetry_event, [:drops, :operation, :step, :stop], _, + %{step: :conform}} + + assert_receive {:telemetry_event, [:drops, :operation, :step, :start], _, + %{step: :prepare}} + + assert_receive {:telemetry_event, [:drops, :operation, :step, :stop], _, + %{step: :prepare}} + + assert_receive {:telemetry_event, [:drops, :operation, :step, :start], _, + %{step: :validate}} + + assert_receive {:telemetry_event, [:drops, :operation, :step, :stop], _, + %{step: :validate}} + + assert_receive {:telemetry_event, [:drops, :operation, :step, :start], _, + %{step: :execute}} + + assert_receive {:telemetry_event, [:drops, :operation, :step, :stop], _, + %{step: :execute}} + end + + test "emits only one operation stop event for successful operations" do + defmodule TestOperationSingleStopEvent do + use Drops.Operations.Command, telemetry: true + + steps do + def prepare(context), do: {:ok, context} + def validate(context), do: {:ok, context} + + @impl true + def execute(%{params: params}) do + {:ok, Map.put(params, :executed, true)} + end + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestOperationSingleStopEvent.call(context) + + # Should receive operation start event + assert_receive {:telemetry_event, [:drops, :operation, :start], _, _} + + # Should receive exactly one operation stop event + assert_receive {:telemetry_event, [:drops, :operation, :stop], _, _} + + # Should NOT receive any additional operation stop events + refute_receive {:telemetry_event, [:drops, :operation, :stop], _, _} + + # Should NOT receive any operation exception events + refute_receive {:telemetry_event, [:drops, :operation, :exception], _, _} + end + end + + describe "integration with operation composition" do + setup do + # Capture telemetry events + test_pid = self() + + :telemetry.attach_many( + "test-composition-telemetry", + [ + [:drops, :operation, :start], + [:drops, :operation, :stop] + ], + &TelemetryTestHandler.handle_event/4, + test_pid + ) + + on_exit(fn -> + :telemetry.detach("test-composition-telemetry") + end) + + :ok + end + + test "emits events for composed operations" do + defmodule FirstOperation do + use Drops.Operations.Command, telemetry: true + + steps do + @impl true + def execute(%{params: params}) do + {:ok, Map.put(params, :first_done, true)} + end + end + end + + defmodule SecondOperation do + use Drops.Operations.Command, telemetry: true + + steps do + @impl true + def execute(%{execute_result: result, params: params}) do + {:ok, Map.merge(result, params)} + end + end + end + + context = %{params: %{name: "test"}} + + result = + FirstOperation.call(context) + |> SecondOperation.call(%{params: %{second: true}}) + + assert {:ok, %{name: "test", first_done: true, second: true}} = result + + # Should receive events for both operations + assert_receive {:telemetry_event, [:drops, :operation, :start], _, + %{operation: FirstOperation}} + + assert_receive {:telemetry_event, [:drops, :operation, :stop], _, + %{operation: FirstOperation}} + + assert_receive {:telemetry_event, [:drops, :operation, :start], _, + %{operation: SecondOperation}} + + assert_receive {:telemetry_event, [:drops, :operation, :stop], _, + %{operation: SecondOperation}} + end + + test "emits positive duration values for operation events" do + defmodule TestOperationDuration do + use Drops.Operations.Command, telemetry: true + + steps do + @impl true + def execute(%{params: params}) do + # Add a small delay to ensure measurable duration + Process.sleep(1) + {:ok, Map.put(params, :executed, true)} + end + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestOperationDuration.call(context) + + # Should receive operation stop event with positive duration (using last step name: execute) + assert_receive {:telemetry_event, [:drops, :operation, :stop], measurements, + metadata} + + assert %{duration: duration} = measurements + assert is_integer(duration) + assert duration > 0, "Duration should be positive, got: #{duration}" + + assert %{operation: TestOperationDuration, step: :execute, context: _} = metadata + end + + test "ensures telemetry relies on trace for duration calculations" do + # Test that telemetry always uses trace-calculated durations + # and never falls back to its own timing calculations + + defmodule TestTraceDuration do + use Drops.Operations.Command, telemetry: true + + steps do + @impl true + def execute(%{params: params}) do + # Add a small delay to ensure measurable duration + Process.sleep(1) + {:ok, Map.put(params, :executed, true)} + end + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestTraceDuration.call(context) + + # Collect telemetry events + events = collect_all_telemetry_events() + + # Check operation stop event + operation_events = + Enum.filter(events, fn {event, _measurements, _metadata} -> + match?([:drops, :operation, :stop], event) + end) + + assert length(operation_events) == 1 + {_event, measurements, _metadata} = hd(operation_events) + + # Duration should be positive and reasonable (> 0 since we have Process.sleep(1)) + assert measurements.duration > 0, + "Operation duration should be positive: #{measurements.duration}" + + # Duration should be reasonable (less than 1 second in native time units) + max_reasonable_duration = System.convert_time_unit(1, :second, :native) + + assert measurements.duration < max_reasonable_duration, + "Operation duration seems too large: #{measurements.duration}" + end + end + + # Helper function to collect all telemetry events from the mailbox + defp collect_all_telemetry_events(events \\ []) do + receive do + {:telemetry_event, event, measurements, metadata} -> + collect_all_telemetry_events([{event, measurements, metadata} | events]) + after + 100 -> Enum.reverse(events) + end + end + + describe "custom identifier configuration" do + setup do + # Capture telemetry events with custom identifier + test_pid = self() + + :telemetry.attach_many( + "test-custom-identifier-telemetry", + [ + [:my_app, :operation, :start], + [:my_app, :operation, :stop], + [:my_app, :operation, :exception], + [:my_app, :operation, :step, :start], + [:my_app, :operation, :step, :stop], + [:my_app, :operation, :step, :exception] + ], + &TelemetryTestHandler.handle_event/4, + test_pid + ) + + on_exit(fn -> + :telemetry.detach("test-custom-identifier-telemetry") + end) + + :ok + end + + test "emits events with custom identifier for operation boundaries" do + defmodule TestOperationCustomId do + use Drops.Operations.Command, telemetry: [identifier: :my_app] + + steps do + @impl true + def execute(%{params: params}) do + {:ok, Map.put(params, :executed, true)} + end + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestOperationCustomId.call(context) + + # Should receive operation start event with custom identifier + assert_receive {:telemetry_event, [:my_app, :operation, :start], measurements, + metadata} + + assert %{system_time: _} = measurements + + assert %{operation: TestOperationCustomId, step: :prepare, context: ^context} = + metadata + + # Should receive operation stop event with custom identifier + assert_receive {:telemetry_event, [:my_app, :operation, :stop], measurements, + metadata} + + assert %{duration: _} = measurements + + assert %{operation: TestOperationCustomId, step: :execute, context: ^context} = + metadata + end + + test "emits events with custom identifier for specific steps" do + defmodule TestOperationCustomIdSteps do + use Drops.Operations.Command, telemetry: [identifier: :my_app, steps: [:execute]] + + steps do + def prepare(context), do: {:ok, context} + def validate(context), do: {:ok, context} + + @impl true + def execute(%{params: params}) do + {:ok, Map.put(params, :executed, true)} + end + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestOperationCustomIdSteps.call(context) + + # Should only receive events for the execute step with custom identifier + assert_receive {:telemetry_event, [:my_app, :operation, :step, :start], + measurements, metadata} + + assert %{system_time: _} = measurements + + assert %{operation: TestOperationCustomIdSteps, step: :execute, context: _} = + metadata + + assert_receive {:telemetry_event, [:my_app, :operation, :step, :stop], measurements, + metadata} + + assert %{duration: _} = measurements + + assert %{operation: TestOperationCustomIdSteps, step: :execute, context: _} = + metadata + + # Should not receive events for prepare or validate steps + refute_receive {:telemetry_event, [:my_app, :operation, :step, :start], _, + %{step: :prepare}} + + refute_receive {:telemetry_event, [:my_app, :operation, :step, :start], _, + %{step: :validate}} + end + + test "emits exception events with custom identifier when failing in last step" do + defmodule TestOperationCustomIdErrorLastStep do + use Drops.Operations.Command, telemetry: [identifier: :my_app] + + steps do + @impl true + def execute(_context) do + {:error, "something went wrong"} + end + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationCustomIdErrorLastStep.call(context) + + # Should receive operation start event with custom identifier + assert_receive {:telemetry_event, [:my_app, :operation, :start], _, + %{step: :prepare}} + + # Should receive exception event with custom identifier + assert_receive {:telemetry_event, [:my_app, :operation, :exception], measurements, + metadata} + + assert %{duration: _} = measurements + + assert %{ + operation: TestOperationCustomIdErrorLastStep, + step: :execute, + context: ^context, + kind: :error, + reason: "something went wrong" + } = metadata + end + + test "emits exception events with custom identifier when failing in middle step" do + defmodule TestOperationCustomIdErrorMiddleStep do + use Drops.Operations.Command, telemetry: [identifier: :my_app] + + steps do + @impl true + def validate(_context) do + {:error, "validation failed"} + end + + @impl true + def execute(_context) do + {:ok, "should not reach here"} + end + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationCustomIdErrorMiddleStep.call(context) + + # Should receive operation start event with custom identifier + assert_receive {:telemetry_event, [:my_app, :operation, :start], _, + %{step: :prepare}} + + # Should receive exception event with custom identifier (from validate step) + assert_receive {:telemetry_event, [:my_app, :operation, :exception], measurements, + metadata} + + assert %{duration: _} = measurements + + assert %{ + operation: TestOperationCustomIdErrorMiddleStep, + step: :validate, + context: ^context, + kind: :error, + reason: "validation failed" + } = metadata + + # Should NOT receive operation stop event since operation failed + refute_receive {:telemetry_event, [:my_app, :operation, :stop], _, _} + end + + test "emits exception events with custom identifier when failing in first step" do + defmodule TestOperationCustomIdErrorFirstStep do + use Drops.Operations.Command, telemetry: [identifier: :my_app] + + steps do + @impl true + def prepare(_context) do + {:error, "preparation failed"} + end + + @impl true + def validate(_context) do + {:ok, "should not reach here"} + end + + @impl true + def execute(_context) do + {:ok, "should not reach here"} + end + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationCustomIdErrorFirstStep.call(context) + + # Should receive operation start event with custom identifier + assert_receive {:telemetry_event, [:my_app, :operation, :start], _, + %{step: :prepare}} + + # Should receive exception event with custom identifier (from prepare step) + assert_receive {:telemetry_event, [:my_app, :operation, :exception], measurements, + metadata} + + assert %{duration: _} = measurements + + assert %{ + operation: TestOperationCustomIdErrorFirstStep, + step: :prepare, + context: ^context, + kind: :error, + reason: "preparation failed" + } = metadata + + # Should NOT receive operation stop event since operation failed + refute_receive {:telemetry_event, [:my_app, :operation, :stop], _, _} + end + + test "does not emit events on default identifier when using custom identifier" do + defmodule TestOperationNoDefaultEvents do + use Drops.Operations.Command, telemetry: [identifier: :my_app] + + steps do + @impl true + def execute(%{params: params}) do + {:ok, Map.put(params, :executed, true)} + end + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestOperationNoDefaultEvents.call(context) + + # Should not receive any events with default :drops identifier + refute_receive {:telemetry_event, [:drops, :operation, _], _, _}, 100 + refute_receive {:telemetry_event, [:drops, :operation, _, _], _, _}, 100 + end + end + + describe "telemetry_step_errors configuration" do + setup do + # Capture telemetry events + test_pid = self() + + :telemetry.attach_many( + "test-step-errors", + [ + [:drops, :operation, :start], + [:drops, :operation, :stop], + [:drops, :operation, :exception], + [:drops, :operation, :step, :start], + [:drops, :operation, :step, :stop], + [:drops, :operation, :step, :exception], + [:my_app, :operation, :start], + [:my_app, :operation, :stop], + [:my_app, :operation, :exception], + [:my_app, :operation, :step, :start], + [:my_app, :operation, :step, :stop], + [:my_app, :operation, :step, :exception] + ], + &TelemetryTestHandler.handle_event/4, + test_pid + ) + + on_exit(fn -> + :telemetry.detach("test-step-errors") + end) + end + + test "emits step exception events for all steps when telemetry_step_errors: :all" do + defmodule TestOperationStepErrors do + use Drops.Operations.Command, telemetry: true, telemetry_step_errors: :all + + steps do + def prepare(_context), do: {:ok, %{prepared: true}} + def validate(_context), do: {:error, "validation failed"} + + @impl true + def execute(_context), do: {:ok, %{executed: true}} + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationStepErrors.call(context) + + # Should receive operation start event + assert_receive {:telemetry_event, [:drops, :operation, :start], _, + %{step: :prepare}} + + # Should receive step exception event for validate step + assert_receive {:telemetry_event, [:drops, :operation, :step, :exception], + measurements, metadata} + + assert metadata.operation == TestOperationStepErrors + assert metadata.step == :validate + assert metadata.kind == :error + assert metadata.reason == "validation failed" + assert is_integer(measurements.duration) + end + + test "emits step exception events for specific steps when telemetry_step_errors: [steps]" do + defmodule TestOperationSpecificStepErrors do + use Drops.Operations.Command, telemetry: true, telemetry_step_errors: [:validate] + + steps do + def prepare(_context), do: {:ok, %{prepared: true}} + def validate(_context), do: {:error, "validation failed"} + + @impl true + def execute(_context), do: {:ok, %{executed: true}} + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationSpecificStepErrors.call(context) + + # Should receive operation start event + assert_receive {:telemetry_event, [:drops, :operation, :start], _, + %{step: :prepare}} + + # Should receive step exception event for validate step (in list) + assert_receive {:telemetry_event, [:drops, :operation, :step, :exception], + measurements, metadata} + + assert metadata.operation == TestOperationSpecificStepErrors + assert metadata.step == :validate + assert metadata.kind == :error + assert metadata.reason == "validation failed" + assert is_integer(measurements.duration) + end + + test "does not emit step exception events for steps not in telemetry_step_errors list" do + defmodule TestOperationSpecificStepErrorsExclusion do + use Drops.Operations.Command, telemetry: true, telemetry_step_errors: [:execute] + + steps do + def validate(_context), do: {:error, "validation failed"} + + @impl true + def execute(_context), do: {:ok, %{executed: true}} + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationSpecificStepErrorsExclusion.call(context) + + # Should receive operation start event + assert_receive {:telemetry_event, [:drops, :operation, :start], _, + %{step: :prepare}} + + # Should NOT receive step exception event for validate step (not in list) + refute_receive {:telemetry_event, [:drops, :operation, :step, :exception], _, + %{step: :validate}} + end + + test "does not emit step exception events when telemetry_step_errors: false" do + defmodule TestOperationNoStepErrors do + use Drops.Operations.Command, telemetry: true, telemetry_step_errors: false + + steps do + def validate(_context), do: {:error, "validation failed"} + + @impl true + def execute(_context), do: {:ok, %{executed: true}} + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationNoStepErrors.call(context) + + # Should receive operation start event + assert_receive {:telemetry_event, [:drops, :operation, :start], _, + %{step: :prepare}} + + # Should NOT receive step exception event for validate step + refute_receive {:telemetry_event, [:drops, :operation, :step, :exception], _, + %{step: :validate}} + end + + test "works with custom identifier" do + defmodule TestOperationCustomIdentifierStepErrors do + use Drops.Operations.Command, + telemetry: [identifier: :my_app], + telemetry_step_errors: :all + + steps do + def validate(_context), do: {:error, "validation failed"} + + @impl true + def execute(_context), do: {:ok, %{executed: true}} + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationCustomIdentifierStepErrors.call(context) + + # Should receive operation start event with custom identifier + assert_receive {:telemetry_event, [:my_app, :operation, :start], _, + %{step: :prepare}} + + # Should receive step exception event with custom identifier + assert_receive {:telemetry_event, [:my_app, :operation, :step, :exception], + measurements, metadata} + + assert metadata.operation == TestOperationCustomIdentifierStepErrors + assert metadata.step == :validate + assert metadata.kind == :error + assert metadata.reason == "validation failed" + assert is_integer(measurements.duration) + end + + test "only emits step exception events for error returns, not success" do + defmodule TestOperationSuccessStepErrors do + use Drops.Operations.Command, telemetry: true, telemetry_step_errors: :all + + steps do + def validate(_context), do: {:ok, %{validated: true}} + + @impl true + def execute(_context), do: {:ok, %{executed: true}} + end + end + + context = %{params: %{name: "test"}} + {:ok, _result} = TestOperationSuccessStepErrors.call(context) + + # Should receive operation start event + assert_receive {:telemetry_event, [:drops, :operation, :start], _, + %{step: :prepare}} + + # Should receive operation stop event + assert_receive {:telemetry_event, [:drops, :operation, :stop], _, _} + + # Should NOT receive any step exception events for successful steps + refute_receive {:telemetry_event, [:drops, :operation, :step, :exception], _, _} + end + + test "does not emit any events when both telemetry and telemetry_step_errors are disabled" do + defmodule TestOperationNoTelemetry do + use Drops.Operations.Command, telemetry: false, telemetry_step_errors: false + + steps do + def validate(_context), do: {:error, "validation failed"} + + @impl true + def execute(_context), do: {:ok, %{executed: true}} + end + end + + context = %{params: %{name: "test"}} + {:error, _reason} = TestOperationNoTelemetry.call(context) + + # Should NOT receive any telemetry events + refute_receive {:telemetry_event, [:drops, :operation, :start], _, _} + refute_receive {:telemetry_event, [:drops, :operation, :stop], _, _} + refute_receive {:telemetry_event, [:drops, :operation, :exception], _, _} + refute_receive {:telemetry_event, [:drops, :operation, :step, :exception], _, _} + end + end +end