Class: Homebrew::DownloadQueue Private
- Includes:
- Utils::Output::Mixin
- Defined in:
- download_queue.rb
This class is part of a private API. This class may only be used in the Homebrew/brew repository. Third parties should avoid using this class if possible, as it may be removed or changed without warning.
Defined Under Namespace
Classes: Spinner
Class Method Summary collapse
Instance Method Summary collapse
- #enqueue(downloadable, check_attestation: false) ⇒ void private
- #fetch ⇒ void private
- #initialize(retries: 1, force: false, pour: false) ⇒ void constructor private
- #shutdown ⇒ void private
- #stdout_print_and_flush(message) ⇒ void private
- #stdout_print_and_flush_if_tty(message) ⇒ void private
Methods included from Utils::Output::Mixin
#odebug, #odeprecated, #odie, #odisabled, #ofail, #oh1, #oh1_title, #ohai, #ohai_title, #onoe, #opoo, #opoo_outside_github_actions, #pretty_duration, #pretty_installed, #pretty_outdated, #pretty_uninstalled
Constructor Details
#initialize(retries: 1, force: false, pour: false) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
23 24 25 26 27 28 29 30 31 32 33 |
# File 'download_queue.rb', line 23 def initialize(retries: 1, force: false, pour: false) @concurrency = T.let(EnvConfig.download_concurrency, Integer) @quiet = T.let(@concurrency > 1, T::Boolean) @tries = T.let(retries + 1, Integer) @force = force @pour = pour @pool = T.let(Concurrent::FixedThreadPool.new(concurrency), Concurrent::FixedThreadPool) @tty = T.let($stdout.tty?, T::Boolean) @spinner = T.let(nil, T.nilable(Spinner)) @symlink_targets = T.let({}, T::Hash[Pathname, T::Set[Downloadable]]) end |
Class Method Details
.new_if_concurrency_enabled(retries: 1, force: false, pour: false) ⇒ DownloadQueue?
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
16 17 18 19 20 |
# File 'download_queue.rb', line 16 def self.new_if_concurrency_enabled(retries: 1, force: false, pour: false) return if Homebrew::EnvConfig.download_concurrency <= 1 new(retries:, force:, pour:) end |
Instance Method Details
#enqueue(downloadable, check_attestation: false) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'download_queue.rb', line 41 def enqueue(downloadable, check_attestation: false) cached_location = downloadable.cached_download @symlink_targets[cached_location] ||= Set.new targets = @symlink_targets.fetch(cached_location) targets << downloadable downloads[downloadable] ||= Concurrent::Promises.future_on( pool, RetryableDownload.new(downloadable, tries:, pour:), force, quiet, check_attestation ) do |download, force, quiet, check_attestation| download.clear_cache if force download.fetch(quiet:) if check_attestation && downloadable.is_a?(Bottle) Utils::Attestation.check_attestation(downloadable, quiet: true) end create_symlinks_for_shared_download(cached_location) end end |
#fetch ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'download_queue.rb', line 62 def fetch return if downloads.empty? context_before_fetch = Context.current if concurrency == 1 downloads.each do |downloadable, promise| promise.wait! rescue ChecksumMismatchError => e ofail "#{downloadable.download_queue_type} reports different checksum: #{e.expected}" rescue => e raise e unless bottle_manifest_error?(downloadable, e) end else = downloads.keys.map { |download| download..length }.max || 0 remaining_downloads = downloads.dup.to_a previous_pending_line_count = 0 begin stdout_print_and_flush_if_tty Tty.hide_cursor = lambda do |downloadable, future, last| status = status_from_future(future) exception = future.reason if future.rejected? next 1 if bottle_manifest_error?(downloadable, exception) = downloadable. if tty = (downloadable, future, , ) stdout_print_and_flush "#{status} #{}#{"\n" unless last}" elsif status $stderr.puts "#{status} #{}" end if future.rejected? if exception.is_a?(ChecksumMismatchError) actual = Digest::SHA256.file(downloadable.cached_download).hexdigest , = (downloadable.download_queue_type) ofail "#{} #{exception.expected}" puts "#{} #{actual}" next 2 elsif exception.is_a?(CannotInstallFormulaError) cached_download = downloadable.cached_download cached_download.unlink if cached_download&.exist? raise exception else = future.reason.to_s ofail next .count("\n") end end 1 end until remaining_downloads.empty? begin finished_states = [:fulfilled, :rejected] finished_downloads, remaining_downloads = remaining_downloads.partition do |_, future| finished_states.include?(future.state) end finished_downloads.each do |downloadable, future| previous_pending_line_count -= 1 stdout_print_and_flush_if_tty Tty.clear_to_end .call(downloadable, future, false) end previous_pending_line_count = 0 max_lines = [concurrency, Tty.height].min remaining_downloads.each_with_index do |(downloadable, future), i| break if previous_pending_line_count >= max_lines stdout_print_and_flush_if_tty Tty.clear_to_end last = i == max_lines - 1 || i == remaining_downloads.count - 1 previous_pending_line_count += .call(downloadable, future, last) end if previous_pending_line_count.positive? if (previous_pending_line_count - 1).zero? stdout_print_and_flush_if_tty Tty.move_cursor_beginning else stdout_print_and_flush_if_tty Tty.move_cursor_up_beginning(previous_pending_line_count - 1) end end sleep 0.05 # We want to catch all exceptions to ensure we can cancel any # running downloads and flush the TTY. rescue Exception # rubocop:disable Lint/RescueException remaining_downloads.each do |_, future| # FIXME: Implement cancellation of running downloads. end cancel if previous_pending_line_count.positive? stdout_print_and_flush_if_tty Tty.move_cursor_down(previous_pending_line_count - 1) end raise end end ensure stdout_print_and_flush_if_tty Tty.show_cursor end end # Restore the pre-parallel fetch context to avoid e.g. quiet state bleeding out from threads. Context.current = context_before_fetch downloads.clear end |
#shutdown ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
190 191 192 193 |
# File 'download_queue.rb', line 190 def shutdown pool.shutdown pool.wait_for_termination end |
#stdout_print_and_flush(message) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
184 185 186 187 |
# File 'download_queue.rb', line 184 def stdout_print_and_flush() $stdout.print() $stdout.flush end |
#stdout_print_and_flush_if_tty(message) ⇒ void
This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.
This method returns an undefined value.
179 180 181 |
# File 'download_queue.rb', line 179 def stdout_print_and_flush_if_tty() stdout_print_and_flush() if $stdout.tty? end |