Class: Homebrew::DownloadQueue Private
- Includes:
- Utils::Output::Mixin
- Defined in:
- download_queue.rb
Overview
This class is part of a private API. This class may only be used in the Homebrew/brew repository. Third parties should avoid using this class if possible, as it may be removed or changed without warning.
Manages a queue of concurrent downloads with cooperative cancellation support.
Defined Under Namespace
Classes: Spinner
Class Method Summary collapse
Instance Method Summary collapse
- #enqueue(downloadable, check_attestation: false) ⇒ void private
- #fetch ⇒ void private
- #initialize(retries: 1, force: false, pour: false) ⇒ void constructor private
- #shutdown ⇒ void private
- #stdout_print_and_flush(message) ⇒ void private
- #stdout_print_and_flush_if_tty(message) ⇒ void private
Methods included from Utils::Output::Mixin
#odebug, #odeprecated, #odie, #odisabled, #ofail, #oh1, #oh1_title, #ohai, #ohai_title, #onoe, #opoo, #opoo_outside_github_actions, #pretty_deprecated, #pretty_disabled, #pretty_duration, #pretty_installed, #pretty_outdated, #pretty_uninstalled
Constructor Details
#initialize(retries: 1, force: false, pour: false) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'download_queue.rb', line 28 def initialize(retries: 1, force: false, pour: false) @concurrency = T.let(EnvConfig.download_concurrency, Integer) @quiet = T.let(@concurrency > 1, T::Boolean) @tries = T.let(retries + 1, Integer) @force = force @pour = pour @pool = T.let(Concurrent::FixedThreadPool.new(concurrency), Concurrent::FixedThreadPool) @tty = T.let($stdout.tty?, T::Boolean) @spinner = T.let(nil, T.nilable(Spinner)) @symlink_targets = T.let({}, T::Hash[Pathname, T::Set[Downloadable]]) @downloads_by_location = T.let({}, T::Hash[Pathname, Concurrent::Promises::Future]) @cancelled = T.let(Concurrent::AtomicBoolean.new(false), Concurrent::AtomicBoolean) end |
Class Method Details
.new_if_concurrency_enabled(retries: 1, force: false, pour: false) ⇒ DownloadQueue?
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
21 22 23 24 25 |
# File 'download_queue.rb', line 21 def self.new_if_concurrency_enabled(retries: 1, force: false, pour: false) return if Homebrew::EnvConfig.download_concurrency <= 1 new(retries:, force:, pour:) end |
Instance Method Details
#enqueue(downloadable, check_attestation: false) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'download_queue.rb', line 48 def enqueue(downloadable, check_attestation: false) @cancelled.make_false cached_location = downloadable.cached_download @symlink_targets[cached_location] ||= Set.new targets = @symlink_targets.fetch(cached_location) targets << downloadable @downloads_by_location[cached_location] ||= Concurrent::Promises.future_on( pool, RetryableDownload.new(downloadable, tries:, pour:), @cancelled, force, quiet, check_attestation ) do |download, cancelled, force, quiet, check_attestation| raise CancelledDownloadError if cancelled.true? download.clear_cache if force download.fetch(quiet:) raise CancelledDownloadError if cancelled.true? if check_attestation && downloadable.is_a?(Bottle) Utils::Attestation.check_attestation(downloadable, quiet: true) end create_symlinks_for_shared_download(cached_location) end downloads[downloadable] = @downloads_by_location.fetch(cached_location) end |
#fetch ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'download_queue.rb', line 76 def fetch return if downloads.empty? context_before_fetch = Context.current if concurrency == 1 downloads.each do |downloadable, promise| promise.wait! rescue CancelledDownloadError next rescue ChecksumMismatchError => e ofail "#{downloadable.download_queue_type} reports different checksum: #{e.expected}" rescue => e raise e unless bottle_manifest_error?(downloadable, e) end else = downloads.keys.map { |download| download..length }.max || 0 remaining_downloads = downloads.dup.to_a previous_pending_line_count = 0 begin stdout_print_and_flush_if_tty Tty.hide_cursor = lambda do |downloadable, future, last| status = status_from_future(future) exception = future.reason if future.rejected? next 1 if exception.is_a?(CancelledDownloadError) next 1 if bottle_manifest_error?(downloadable, exception) = downloadable. if tty = (downloadable, future, , ) stdout_print_and_flush "#{status} #{}#{"\n" unless last}" elsif status $stderr.puts "#{status} #{}" end if future.rejected? if exception.is_a?(ChecksumMismatchError) actual = Digest::SHA256.file(downloadable.cached_download).hexdigest , = (downloadable.download_queue_type) ofail "#{} #{exception.expected}" puts "#{} #{actual}" next 2 elsif exception.is_a?(CannotInstallFormulaError) cached_download = downloadable.cached_download cached_download.unlink if cached_download&.exist? raise exception else = if exception.is_a?(DownloadError) && exception.cause.is_a?(ErrorDuringExecution) if (stderr_output = exception.cause.stderr.presence) "#{stderr_output}#{exception.cause.}" else exception.cause. end else future.reason.to_s end ofail next .count("\n") end end 1 end until remaining_downloads.empty? begin finished_states = [:fulfilled, :rejected] finished_downloads, remaining_downloads = remaining_downloads.partition do |_, future| finished_states.include?(future.state) end finished_downloads.each do |downloadable, future| previous_pending_line_count -= 1 .call(downloadable, future, false) stdout_print_and_flush_if_tty Tty.clear_to_end end previous_pending_line_count = 0 max_lines = [concurrency, Tty.height].min remaining_downloads.each_with_index do |(downloadable, future), i| break if previous_pending_line_count >= max_lines last = i == max_lines - 1 || i == remaining_downloads.count - 1 previous_pending_line_count += .call(downloadable, future, last) stdout_print_and_flush_if_tty Tty.clear_to_end end if previous_pending_line_count.positive? if (previous_pending_line_count - 1).zero? stdout_print_and_flush_if_tty Tty.move_cursor_beginning else stdout_print_and_flush_if_tty Tty.move_cursor_up_beginning(previous_pending_line_count - 1) end end sleep 0.05 # We want to catch all exceptions to ensure we can cancel any # running downloads and flush the TTY. rescue Exception # rubocop:disable Lint/RescueException cancel if previous_pending_line_count.positive? stdout_print_and_flush_if_tty Tty.move_cursor_down(previous_pending_line_count - 1) end raise end end ensure stdout_print_and_flush_if_tty Tty.show_cursor end end # Restore the pre-parallel fetch context to avoid e.g. quiet state bleeding out from threads. Context.current = context_before_fetch downloads.clear @downloads_by_location.clear @symlink_targets.clear end |
#shutdown ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
213 214 215 216 |
# File 'download_queue.rb', line 213 def shutdown pool.shutdown pool.wait_for_termination end |
#stdout_print_and_flush(message) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
207 208 209 210 |
# File 'download_queue.rb', line 207 def stdout_print_and_flush() $stdout.print() $stdout.flush end |
#stdout_print_and_flush_if_tty(message) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
202 203 204 |
# File 'download_queue.rb', line 202 def stdout_print_and_flush_if_tty() stdout_print_and_flush() if $stdout.tty? end |