Class: Homebrew::Bundle::ParallelInstaller Private
- Defined in:
- bundle/parallel_installer.rb
This class is part of a private API. This class may only be used in the Homebrew/brew repository. Third parties should avoid using this class if possible, as it may be removed or changed without warning.
Instance Method Summary collapse
- #initialize(entries, jobs:, no_upgrade:, verbose:, force:, quiet:) ⇒ void constructor private
- #run! ⇒ Array<(Integer, Integer)> private
Constructor Details
#initialize(entries, jobs:, no_upgrade:, verbose:, force:, quiet:) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'bundle/parallel_installer.rb', line 22 def initialize(entries, jobs:, no_upgrade:, verbose:, force:, quiet:) @entries = entries @jobs = jobs @no_upgrade = no_upgrade @verbose = verbose @force = force @quiet = quiet @pool = T.let(Concurrent::FixedThreadPool.new(jobs), Concurrent::FixedThreadPool) @output_mutex = T.let(Monitor.new, Monitor) # Cask installs may trigger interactive sudo prompts that write # directly to the terminal. Serialize them so Password: prompts # don't interleave with status output from other workers. @cask_install_mutex = T.let(Mutex.new, Mutex) end |
Instance Method Details
#run! ⇒ Array<(Integer, Integer)>
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'bundle/parallel_installer.rb', line 38 def run! dependency_map = build_dependency_map success = 0 failure = 0 pending_entries = T.let(@entries.dup, T::Array[Installer::InstallableEntry]) completed = T.let(Set.new, T::Set[String]) until pending_entries.empty? ready_entries = pending_entries.select do |entry| dependency_map.fetch(entry.name, Set.new).all? { |dependency| completed.include?(dependency) } end if ready_entries.empty? pending_entries.each do |entry| installed = install_entry!(entry) completed << entry.name if installed success += 1 else failure += 1 end end break end batch = ready_entries.take(@jobs) futures = batch.to_h do |entry| [entry, Concurrent::Promises.future_on(@pool, entry) do |install_entry| install_entry!(install_entry) end] end batch.each do |entry| installed = begin !futures.fetch(entry).value!.nil? rescue => e write_output(Formatter.error("Installing #{entry.name} has failed!"), stream: $stderr) write_output("[#{entry.name}] #{e.}", stream: $stderr) if @verbose false end pending_entries.delete(entry) completed << entry.name if installed success += 1 else failure += 1 end end end [success, failure] ensure @pool.shutdown @pool.wait_for_termination end |