Class: Homebrew::DownloadQueue Private
- Includes:
- Utils::Output::Mixin
- Defined in:
- download_queue.rb
This class is part of a private API. This class may only be used in the Homebrew/brew repository. Third parties should avoid using this class if possible, as it may be removed or changed without warning.
Defined Under Namespace
Classes: Spinner
Class Method Summary collapse
Instance Method Summary collapse
- #enqueue(downloadable, check_attestation: false) ⇒ void private
- #fetch ⇒ void private
- #initialize(retries: 1, force: false, pour: false) ⇒ void constructor private
- #shutdown ⇒ void private
- #stdout_print_and_flush(message) ⇒ void private
- #stdout_print_and_flush_if_tty(message) ⇒ void private
Methods included from Utils::Output::Mixin
#odebug, #odeprecated, #odie, #odisabled, #ofail, #oh1, #oh1_title, #ohai, #ohai_title, #onoe, #opoo, #opoo_outside_github_actions, #pretty_duration, #pretty_installed, #pretty_outdated, #pretty_uninstalled
Constructor Details
#initialize(retries: 1, force: false, pour: false) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
23 24 25 26 27 28 29 30 31 32 |
# File 'download_queue.rb', line 23 def initialize(retries: 1, force: false, pour: false) @concurrency = T.let(EnvConfig.download_concurrency, Integer) @quiet = T.let(@concurrency > 1, T::Boolean) @tries = T.let(retries + 1, Integer) @force = force @pour = pour @pool = T.let(Concurrent::FixedThreadPool.new(concurrency), Concurrent::FixedThreadPool) @tty = T.let($stdout.tty?, T::Boolean) @spinner = T.let(nil, T.nilable(Spinner)) end |
Class Method Details
.new_if_concurrency_enabled(retries: 1, force: false, pour: false) ⇒ DownloadQueue?
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
16 17 18 19 20 |
# File 'download_queue.rb', line 16 def self.new_if_concurrency_enabled(retries: 1, force: false, pour: false) return if Homebrew::EnvConfig.download_concurrency <= 1 new(retries:, force:, pour:) end |
Instance Method Details
#enqueue(downloadable, check_attestation: false) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'download_queue.rb', line 40 def enqueue(downloadable, check_attestation: false) downloads[downloadable] ||= Concurrent::Promises.future_on( pool, RetryableDownload.new(downloadable, tries:, pour:), force, quiet, check_attestation ) do |download, force, quiet, check_attestation| download.clear_cache if force download.fetch(quiet:) if check_attestation bottle = T.cast(downloadable, Bottle) Utils::Attestation.check_attestation(bottle, quiet: true) end end end |
#fetch ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'download_queue.rb', line 55 def fetch return if downloads.empty? context_before_fetch = Context.current if concurrency == 1 downloads.each do |downloadable, promise| promise.wait! rescue ChecksumMismatchError => e ofail "#{downloadable.download_queue_type} reports different checksum: #{e.expected}" rescue => e raise e unless bottle_manifest_error?(downloadable, e) end else remaining_downloads = downloads.dup.to_a previous_pending_line_count = 0 begin stdout_print_and_flush_if_tty Tty.hide_cursor = lambda do |downloadable, future, last| status = status_from_future(future) exception = future.reason if future.rejected? next 1 if bottle_manifest_error?(downloadable, exception) = "#{downloadable.download_queue_type} #{downloadable.download_queue_name}" if tty = (downloadable, future, ) stdout_print_and_flush "#{status} #{}#{"\n" unless last}" elsif status $stderr.puts "#{status} #{}" end if future.rejected? if exception.is_a?(ChecksumMismatchError) actual = Digest::SHA256.file(downloadable.cached_download).hexdigest , = (downloadable.download_queue_type) ofail "#{} #{exception.expected}" puts "#{} #{actual}" next 2 elsif exception.is_a?(CannotInstallFormulaError) cached_download = downloadable.cached_download cached_download.unlink if cached_download&.exist? raise exception else = future.reason.to_s ofail next .count("\n") end end 1 end until remaining_downloads.empty? begin finished_states = [:fulfilled, :rejected] finished_downloads, remaining_downloads = remaining_downloads.partition do |_, future| finished_states.include?(future.state) end finished_downloads.each do |downloadable, future| previous_pending_line_count -= 1 stdout_print_and_flush_if_tty Tty.clear_to_end .call(downloadable, future, false) end previous_pending_line_count = 0 max_lines = [concurrency, Tty.height].min remaining_downloads.each_with_index do |(downloadable, future), i| break if previous_pending_line_count >= max_lines stdout_print_and_flush_if_tty Tty.clear_to_end last = i == max_lines - 1 || i == remaining_downloads.count - 1 previous_pending_line_count += .call(downloadable, future, last) end if previous_pending_line_count.positive? if (previous_pending_line_count - 1).zero? stdout_print_and_flush_if_tty Tty.move_cursor_beginning else stdout_print_and_flush_if_tty Tty.move_cursor_up_beginning(previous_pending_line_count - 1) end end sleep 0.05 # We want to catch all exceptions to ensure we can cancel any # running downloads and flush the TTY. rescue Exception # rubocop:disable Lint/RescueException remaining_downloads.each do |_, future| # FIXME: Implement cancellation of running downloads. end cancel if previous_pending_line_count.positive? stdout_print_and_flush_if_tty Tty.move_cursor_down(previous_pending_line_count - 1) end raise end end ensure stdout_print_and_flush_if_tty Tty.show_cursor end end # Restore the pre-parallel fetch context to avoid e.g. quiet state bleeding out from threads. Context.current = context_before_fetch downloads.clear end |
#shutdown ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
182 183 184 185 |
# File 'download_queue.rb', line 182 def shutdown pool.shutdown pool.wait_for_termination end |
#stdout_print_and_flush(message) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
176 177 178 179 |
# File 'download_queue.rb', line 176 def stdout_print_and_flush() $stdout.print() $stdout.flush end |
#stdout_print_and_flush_if_tty(message) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
171 172 173 |
# File 'download_queue.rb', line 171 def stdout_print_and_flush_if_tty() stdout_print_and_flush() if $stdout.tty? end |