Skip to content

Commit 1004c10

Browse files
committed
Job metrics and subscribers
1 parent 298c690 commit 1004c10

File tree

4 files changed

+95
-0
lines changed

4 files changed

+95
-0
lines changed

docs/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,32 @@ Que.job_middleware.push(
798798
)
799799
```
800800

801+
#### Existing Middleware
802+
803+
Que ships with middleware to expose job metrics using ActiveSupport notifications to subscribe to it you can implelent the following
804+
805+
```ruby
806+
::ActiveSupport::Notifications.subscribe("que_job.worked") do |message, started, finished, labels|
807+
# do something with notification.
808+
end
809+
```
810+
811+
`started` and `finished` are numeric values representing a monotonic clock so can
812+
be used for timing calculations without concerning ourselves with the system clock.
813+
814+
`labels` is a hash containing the following keys
815+
816+
* `job_class` - the class of the job.
817+
* `queue` - the queue this job was queued into.
818+
* `priority` - the priority of this job.
819+
* `latency` - the amount of time this job was waiting in the queue for.
820+
821+
To use this middleware you will have to initialize it with Que
822+
823+
```ruby
824+
Que.job_middleware.push(Que::ActiveSupport::JobMiddleware)
825+
```
826+
801827
### Defining Middleware For SQL statements
802828

803829
SQL middleware wraps queries that Que executes, or which you might decide to execute via Que.execute(). You can use hook this into NewRelic or a similar service to instrument how long SQL queries take, for example.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# frozen_string_literal: true
2+
3+
module Que
4+
module ActiveSupport
5+
module JobMiddleware
6+
def self.call(job)
7+
labels = {
8+
job_class: job.que_attrs[:job_class],
9+
priority: job.que_attrs[:priority],
10+
queue: job.que_attrs[:queue],
11+
latency: job.que_attrs[:latency],
12+
}
13+
14+
started = Process.clock_gettime(Process::CLOCK_MONOTONIC)
15+
yield
16+
ensure
17+
::ActiveSupport::Notifications.publish(
18+
"que_job.worked",
19+
started,
20+
Process.clock_gettime(Process::CLOCK_MONOTONIC),
21+
labels.merge(error: !!job.que_error.present?),
22+
)
23+
end
24+
end
25+
end
26+
end
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# frozen_string_literal: true
2+
3+
require 'spec_helper'
4+
5+
if defined?(::ActiveSupport)
6+
require 'que/active_support/job_middleware'
7+
8+
describe Que::ActiveSupport::JobMiddleware do
9+
let(:job) do
10+
Que::Job.new(
11+
job_class: "Foo",
12+
priority: 100,
13+
queue: "foo_queue",
14+
latency: 100,
15+
)
16+
end
17+
18+
let(:labels) do
19+
{
20+
job_class: "Foo",
21+
priority: 100,
22+
queue: "foo_queue",
23+
latency: 100,
24+
}
25+
end
26+
27+
it "records metrics" do
28+
called = false
29+
::ActiveSupport::Notifications.subscribe("que_job.worked") do |message, started, finished, metric_labels|
30+
assert_equal "que_job.worked", message
31+
assert started != nil
32+
assert finished != nil
33+
assert_equal labels.merge(error: false), metric_labels
34+
called = true
35+
end
36+
37+
Que::ActiveSupport::JobMiddleware.call(job) { }
38+
39+
assert_equal true, called
40+
end
41+
end
42+
end

spec/spec_helper.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
# in some spec runs.
99
if ENV['USE_RAILS'] == 'true'
1010
require 'active_record'
11+
require 'active_support'
1112
require 'active_job'
1213

1314
ActiveJob::Base.queue_adapter = :que

0 commit comments

Comments
 (0)