Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bundler/lib/bundler/installer/parallel_installer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ def enqueued?
state == :enqueued
end

def enqueue_with_priority?
state == :installable && spec.extensions.any?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since installed is defined above, I think we should use that here.

Suggested change
state == :installable && spec.extensions.any?
installed? && spec.extensions.any?

Copy link
Collaborator Author

@Edouard-chin Edouard-chin Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

installed is a different state that installable.

installed means that the spec is fully installed and there is nothing else to do.
installable means that the spec is downloaded and can be installed immediately (either because its a pure ruby gem, or because it's a native extension gem and its dependencies are installed)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh whoops I read it wrong. 🤦🏼‍♀️

end

def failed?
state == :failed
end
Expand Down Expand Up @@ -194,7 +198,7 @@ def process_specs(installed_specs)
spec.state = :installable
end

worker_pool.enq(spec)
worker_pool.enq(spec, priority: spec.enqueue_with_priority?)
end

def finished_installing?
Expand Down
14 changes: 11 additions & 3 deletions bundler/lib/bundler/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def initialize(exn)
def initialize(size, name, func)
@name = name
@request_queue = Thread::Queue.new
@request_queue_with_priority = Thread::Queue.new
@response_queue = Thread::Queue.new
@func = func
@size = size
Expand All @@ -32,9 +33,10 @@ def initialize(size, name, func)
# Enqueue a request to be executed in the worker pool
#
# @param obj [String] mostly it is name of spec that should be downloaded
def enq(obj)
def enq(obj, priority: false)
queue = priority ? @request_queue_with_priority : @request_queue
create_threads unless @threads
@request_queue.enq obj
queue.enq obj
end

# Retrieves results of job function being executed in worker pool
Expand All @@ -52,7 +54,13 @@ def stop

def process_queue(i)
loop do
obj = @request_queue.deq
obj = begin
@request_queue_with_priority.deq(true)
rescue ThreadError
@request_queue.deq(false, timeout: 0.05)
end

next if obj.nil?
break if obj.equal? POISON
@response_queue.enq apply_func(obj, i)
end
Expand Down
79 changes: 79 additions & 0 deletions bundler/spec/bundler/installer/parallel_installer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# frozen_string_literal: true

require "bundler/installer/parallel_installer"
require "bundler/rubygems_gem_installer"
require "rubygems/remote_fetcher"
require "bundler"

RSpec.describe Bundler::ParallelInstaller do
describe "priority queue" do
before do
require "support/artifice/compact_index"

@previous_client = Gem::Request::ConnectionPools.client
Gem::Request::ConnectionPools.client = Gem::Net::HTTP
Gem::RemoteFetcher.fetcher.close_all

build_repo2 do
build_gem "gem_with_extension", &:add_c_extension
build_gem "gem_without_extension"
end

gemfile <<~G
source "https://gem.repo2"

gem "gem_with_extension"
gem "gem_without_extension"
G
lockfile <<~L
GEM
remote: https://gem.repo2/
specs:
gem_with_extension (1.0)
gem_without_extension (1.0)

DEPENDENCIES
gem_with_extension
gem_without_extension
L

@old_ui = Bundler.ui
Bundler.ui = Bundler::UI::Silent.new
end

after do
Bundler.ui = @old_ui
Gem::Request::ConnectionPools.client = @previous_client
Artifice.deactivate
end

let(:definition) do
allow(Bundler).to receive(:root) { bundled_app }

definition = Bundler::Definition.build(bundled_app.join("Gemfile"), bundled_app.join("Gemfile.lock"), false)
definition.tap(&:setup_domain!)
end
let(:installer) { Bundler::Installer.new(bundled_app, definition) }

it "queues native extensions in priority" do
parallel_installer = Bundler::ParallelInstaller.new(installer, definition.specs, 2, false, true)
worker_pool = parallel_installer.send(:worker_pool)
expected = 6 # Enqueue to download bundler and the 2 gems. Enqueue to install Bundler and the 2 gems.

expect(worker_pool).to receive(:enq).exactly(expected).times.and_wrap_original do |original_enq, spec, opts|
unless opts.nil? # Enqueued for download, no priority
if spec.name == "gem_with_extension"
expect(opts).to eq({ priority: true })
else
expect(opts).to eq({ priority: false })
end
end

opts ||= {}
original_enq.call(spec, **opts)
end

parallel_installer.call
end
end
end
20 changes: 20 additions & 0 deletions bundler/spec/bundler/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@
end
end

describe "priority queue" do
it "process elements from the priority queue first" do
processed_elements = []

function = proc do |element, _|
processed_elements << element
end

worker = described_class.new(1, "Spec Worker", function)
worker.instance_variable_set(:@threads, []) # Prevent the enqueueing from starting work.
worker.enq("Normal element")
worker.enq("Priority element", priority: true)
worker.send(:create_threads)

worker.stop

expect(processed_elements).to eq(["Priority element", "Normal element"])
end
end

describe "handling interrupts" do
let(:status) do
pid = Process.fork do
Expand Down
1 change: 1 addition & 0 deletions bundler/spec/support/windows_tag_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ module WindowsTagGroup
"spec/bundler/build_metadata_spec.rb",
"spec/bundler/current_ruby_spec.rb",
"spec/bundler/installer/gem_installer_spec.rb",
"spec/bundler/installer/parallel_installer_spec.rb",
"spec/bundler/cli_common_spec.rb",
"spec/bundler/ci_detector_spec.rb",
],
Expand Down
Loading