Class: Homebrew::DownloadQueue Private

Inherits:
Object
  • Object
show all
Includes:
Utils::Output::Mixin
Defined in:
download_queue.rb

Overview

This class is part of a private API. This class may only be used in the Homebrew/brew repository. Third parties should avoid using this class if possible, as it may be removed or changed without warning.

Manages a queue of concurrent downloads with cooperative cancellation support.

Defined Under Namespace

Classes: Spinner

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils::Output::Mixin

#issue_reporting_message, #odebug, #odeprecated, #odie, #odisabled, #ofail, #oh1, #oh1_title, #ohai, #ohai_title, #onoe, #opoo, #opoo_outside_github_actions, #pretty_deprecated, #pretty_disabled, #pretty_duration, #pretty_install_status, #pretty_installed, #pretty_outdated, #pretty_uninstalled, #pretty_upgradable

Constructor Details

#initialize(retries: 1, force: false, pour: false) ⇒ void

This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.

Parameters:

  • retries (Integer) (defaults to: 1)
  • force (Boolean) (defaults to: false)
  • pour (Boolean) (defaults to: false)


22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'download_queue.rb', line 22

def initialize(retries: 1, force: false, pour: false)
  @concurrency = T.let(EnvConfig.download_concurrency, Integer)
  @quiet = T.let(@concurrency > 1, T::Boolean)
  @tries = T.let(retries + 1, Integer)
  @force = force
  @pour = pour
  @pool = T.let(Concurrent::FixedThreadPool.new(concurrency), Concurrent::FixedThreadPool)
  @tty = T.let($stdout.tty?, T::Boolean)
  @dumb_tty = T.let(ENV["TERM"] == "dumb", T::Boolean)
  @spinner = T.let(nil, T.nilable(Spinner))
  @symlink_targets = T.let({}, T::Hash[Pathname, T::Set[Downloadable]])
  @downloads_by_location = T.let({}, T::Hash[Pathname, Concurrent::Promises::Future])
  @cancelled = T.let(Concurrent::AtomicBoolean.new(false), Concurrent::AtomicBoolean)
  @download_threads = T.let(Concurrent::Set.new, Concurrent::Set)
  @fetch_failed = T.let(false, T::Boolean)
end

Instance Attribute Details

#fetch_failedBoolean (readonly)

This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.

Returns:

  • (Boolean)


211
212
213
# File 'download_queue.rb', line 211

def fetch_failed
  @fetch_failed
end

Instance Method Details

#enqueue(downloadable, check_attestation: false) ⇒ void

This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.

This method returns an undefined value.

Parameters:

  • downloadable (Downloadable)
  • check_attestation (Boolean) (defaults to: false)


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'download_queue.rb', line 45

def enqueue(downloadable, check_attestation: false)
  @cancelled.make_false
  cached_location = downloadable.cached_download

  @symlink_targets[cached_location] ||= Set.new
  targets = @symlink_targets.fetch(cached_location)
  targets << downloadable

  @downloads_by_location[cached_location] ||= Concurrent::Promises.future_on(
    pool, RetryableDownload.new(downloadable, tries:, pour:),
    @cancelled, force, quiet, check_attestation
  ) do |download, cancelled, force, quiet, check_attestation|
    raise CancelledDownloadError if cancelled.true?

    @download_threads.add(Thread.current)
    begin
      download.clear_cache if force
      download.fetch(quiet:)
      raise CancelledDownloadError if cancelled.true?

      if check_attestation && downloadable.is_a?(Bottle)
        Utils::Attestation.check_attestation(downloadable, quiet: true)
      end
      create_symlinks_for_shared_download(cached_location)
    rescue Interrupt
      raise CancelledDownloadError
    ensure
      @download_threads.delete(Thread.current)
    end
  end

  downloads[downloadable] = @downloads_by_location.fetch(cached_location)
end

#fetchvoid

This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.

This method returns an undefined value.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'download_queue.rb', line 80

def fetch
  @fetch_failed = false
  return if downloads.empty?

  context_before_fetch = Context.current

  if concurrency == 1
    downloads.each do |downloadable, promise|
      promise.wait!
    rescue CancelledDownloadError
      next
    rescue ChecksumMismatchError => e
      @fetch_failed = true
      ofail "#{downloadable.download_queue_type} reports different checksum: #{e.expected}"
    rescue => e
      raise e unless bottle_manifest_error?(downloadable, e)
    end
  else
    message_length_max = downloads.keys.map { |download| download.download_queue_message.length }.max || 0
    remaining_downloads = downloads.dup.to_a
    previous_pending_line_count = 0

    begin
      stdout_print_and_flush_if_tty Tty.hide_cursor

      output_message = lambda do |downloadable, future, last|
        status = status_from_future(future)
        exception = future.reason if future.rejected?
        next 1 if exception.is_a?(CancelledDownloadError)
        next 1 if bottle_manifest_error?(downloadable, exception)

        message = downloadable.download_queue_message
        if tty_with_cursor_move_support?
          message = message_with_progress(downloadable, future, message, message_length_max)
          stdout_print_and_flush "#{status} #{message}#{"\n" unless last}"
        elsif status
          $stderr.puts "#{status} #{message}"
        end

        if future.rejected?
          if exception.is_a?(ChecksumMismatchError)
            @fetch_failed = true
            actual = Digest::SHA256.file(downloadable.cached_download).hexdigest
            actual_message, expected_message = align_checksum_mismatch_message(downloadable.download_queue_type)

            ofail "#{actual_message} #{exception.expected}"
            puts "#{expected_message} #{actual}"
            next 2
          elsif exception.is_a?(CannotInstallFormulaError)
            cached_download = downloadable.cached_download
            cached_download.unlink if cached_download&.exist?
            raise exception
          else
            message = if exception.is_a?(DownloadError) && exception.cause.is_a?(ErrorDuringExecution)
              cause = T.cast(exception.cause, ErrorDuringExecution)
              if (stderr_output = cause.stderr.presence)
                "#{stderr_output}#{cause.message}"
              else
                cause.message
              end
            else
              future.reason.to_s
            end
            @fetch_failed = true
            ofail message
            next message.count("\n")
          end
        end

        1
      end

      until remaining_downloads.empty?
        begin
          finished_states = [:fulfilled, :rejected]

          finished_downloads, remaining_downloads = remaining_downloads.partition do |_, future|
            finished_states.include?(future.state)
          end

          finished_downloads.each do |downloadable, future|
            previous_pending_line_count -= 1
            output_message.call(downloadable, future, false)
            stdout_print_and_flush_if_tty Tty.clear_to_end
          end

          previous_pending_line_count = 0
          max_lines = [concurrency, Tty.height].min
          remaining_downloads.each_with_index do |(downloadable, future), i|
            break if previous_pending_line_count >= max_lines

            last = i == max_lines - 1 || i == remaining_downloads.count - 1
            previous_pending_line_count += output_message.call(downloadable, future, last)
            stdout_print_and_flush_if_tty Tty.clear_to_end
          end

          if previous_pending_line_count.positive?
            if (previous_pending_line_count - 1).zero?
              stdout_print_and_flush_if_tty Tty.move_cursor_beginning
            else
              stdout_print_and_flush_if_tty Tty.move_cursor_up_beginning(previous_pending_line_count - 1)
            end
          end

          sleep 0.05
        # We want to catch all exceptions to ensure we can cancel any
        # running downloads and flush the TTY.
        rescue Exception # rubocop:disable Lint/RescueException
          cancel

          if previous_pending_line_count.positive?
            stdout_print_and_flush_if_tty Tty.move_cursor_down(previous_pending_line_count - 1)
          end

          raise
        end
      end
    ensure
      stdout_print_and_flush_if_tty Tty.show_cursor
    end
  end

  # Restore the pre-parallel fetch context to avoid e.g. quiet state bleeding out from threads.
  Context.current = context_before_fetch

  downloads.clear
  @downloads_by_location.clear
  @symlink_targets.clear
end

#shutdownvoid

This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.

This method returns an undefined value.



225
226
227
228
# File 'download_queue.rb', line 225

def shutdown
  pool.shutdown
  pool.wait_for_termination
end

#stdout_print_and_flush(message) ⇒ void

This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.

This method returns an undefined value.

Parameters:



219
220
221
222
# File 'download_queue.rb', line 219

def stdout_print_and_flush(message)
  $stdout.print(message)
  $stdout.flush
end

#stdout_print_and_flush_if_tty(message) ⇒ void

This method is part of a private API. This method may only be used in the Homebrew/brew repository. Third parties should avoid using this method if possible, as it may be removed or changed without warning.

This method returns an undefined value.

Parameters:



214
215
216
# File 'download_queue.rb', line 214

def stdout_print_and_flush_if_tty(message)
  stdout_print_and_flush(message) if tty_with_cursor_move_support?
end