Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8c7ca45
Add executor support
jterapin Oct 7, 2025
c21969a
Add changelog entry
jterapin Oct 7, 2025
39ecf0a
Update TM with executor changes
jterapin Oct 7, 2025
a3f2b9f
Remove thread count support from MPU
jterapin Oct 7, 2025
3156f7c
Update Object usage of executor
jterapin Oct 7, 2025
84c9966
Add documentation/remove unused methods from DefaultExecutor
jterapin Oct 8, 2025
8e16a3b
Add Default Executor specs
jterapin Oct 8, 2025
db1cb62
Update TM docs and impl
jterapin Oct 8, 2025
f907c3b
Update streaming MPU to use executor
jterapin Oct 9, 2025
7cb940a
More MP Stream updates
jterapin Oct 9, 2025
4003536
Update specs
jterapin Oct 9, 2025
7dddda9
Update interfaces
jterapin Oct 9, 2025
481f198
Update specs
jterapin Oct 9, 2025
88bf44a
Update changelog
jterapin Oct 9, 2025
c1a25cd
Minor updates
jterapin Oct 9, 2025
7522a16
Fix failing specs
jterapin Oct 9, 2025
89cffe7
Merge branch 'version-3' into s3-executor-support
jterapin Oct 10, 2025
9eea233
Feedback - address sleep in specs
jterapin Oct 10, 2025
75b0d96
Feedback - update method name for cleanup_team_file
jterapin Oct 10, 2025
ad943ee
Feedback - wrap checksum callback
jterapin Oct 10, 2025
f1fc86a
Feedback - update method name in MPU
jterapin Oct 10, 2025
09eae68
Feedback - streamline handling of progress callbacks
jterapin Oct 10, 2025
e824de0
Feedback - streamline docs
jterapin Oct 10, 2025
c073349
Merge branch 'version-3' into s3-executor-support
jterapin Oct 13, 2025
cd91eb7
Feedback - streamline opts
jterapin Oct 13, 2025
abf78d6
Feedback - remove sleep from specs when possible
jterapin Oct 13, 2025
04a287f
Feedback - update to use 10 threads only
jterapin Oct 13, 2025
54b9add
Add directory features
jterapin Oct 13, 2025
ca6c2ae
Add temp changelog entry
jterapin Oct 13, 2025
c9bf8ed
Minor updates
jterapin Oct 13, 2025
5c6caa7
Improve directory uploader
jterapin Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions gems/aws-sdk-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
Unreleased Changes
------------------

* Feature - TODO

* Feature - Add lightweight thread pool executor for multipart `download_file`, `upload_file` and `upload_stream`.

1.199.1 (2025-09-25)
------------------

Expand Down
12 changes: 10 additions & 2 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@ module S3
autoload :BucketRegionCache, 'aws-sdk-s3/bucket_region_cache'
autoload :Encryption, 'aws-sdk-s3/encryption'
autoload :EncryptionV2, 'aws-sdk-s3/encryption_v2'
autoload :LegacySigner, 'aws-sdk-s3/legacy_signer'

# transfer manager + multipart upload/download utilities
autoload :DefaultExecutor, 'aws-sdk-s3/default_executor'
autoload :FilePart, 'aws-sdk-s3/file_part'
autoload :FileUploader, 'aws-sdk-s3/file_uploader'
autoload :FileDownloader, 'aws-sdk-s3/file_downloader'
autoload :LegacySigner, 'aws-sdk-s3/legacy_signer'
autoload :FileUploader, 'aws-sdk-s3/file_uploader'
autoload :MultipartDownloadError, 'aws-sdk-s3/multipart_download_error'
autoload :MultipartFileUploader, 'aws-sdk-s3/multipart_file_uploader'
autoload :MultipartStreamUploader, 'aws-sdk-s3/multipart_stream_uploader'
autoload :MultipartUploadError, 'aws-sdk-s3/multipart_upload_error'
autoload :DirectoryProgress, 'aws-sdk-s3/directory_progress'
autoload :DirectoryDownloadError, 'aws-sdk-s3/directory_download_error'
autoload :DirectoryDownloader, '.aws-sdk-s3/directory_downloader'
autoload :DirectoryUploadError, 'aws-sdk-s3/directory_upload_error'
autoload :DirectoryUploader, 'aws-sdk-s3/directory_uploader'
autoload :ObjectCopier, 'aws-sdk-s3/object_copier'
autoload :ObjectMultipartCopier, 'aws-sdk-s3/object_multipart_copier'
autoload :PresignedPost, 'aws-sdk-s3/presigned_post'
Expand Down
38 changes: 21 additions & 17 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -383,19 +383,18 @@ def public_url(options = {})
# @see Client#complete_multipart_upload
# @see Client#upload_part
def upload_stream(options = {}, &block)
uploading_options = options.dup
upload_opts = options.merge(bucket: bucket_name, key: key)
executor = DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count))
uploader = MultipartStreamUploader.new(
client: client,
thread_count: uploading_options.delete(:thread_count),
tempfile: uploading_options.delete(:tempfile),
part_size: uploading_options.delete(:part_size)
executor: executor,
tempfile: upload_opts.delete(:tempfile),
part_size: upload_opts.delete(:part_size)
)
Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do
uploader.upload(
uploading_options.merge(bucket: bucket_name, key: key),
&block
)
uploader.upload(upload_opts, &block)
end
executor.shutdown
true
end
deprecated(:upload_stream, use: 'Aws::S3::TransferManager#upload_stream', version: 'next major version')
Expand Down Expand Up @@ -458,12 +457,18 @@ def upload_stream(options = {}, &block)
# @see Client#complete_multipart_upload
# @see Client#upload_part
def upload_file(source, options = {})
uploading_options = options.dup
uploader = FileUploader.new(multipart_threshold: uploading_options.delete(:multipart_threshold), client: client)
upload_opts = options.merge(bucket: bucket_name, key: key)
executor = DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count))
uploader = FileUploader.new(
client: client,
executor: executor,
multipart_threshold: upload_opts.delete(:multipart_threshold)
)
response = Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do
uploader.upload(source, uploading_options.merge(bucket: bucket_name, key: key))
uploader.upload(source, upload_opts)
end
yield response if block_given?
executor.shutdown
true
end
deprecated(:upload_file, use: 'Aws::S3::TransferManager#upload_file', version: 'next major version')
Expand Down Expand Up @@ -512,10 +517,6 @@ def upload_file(source, options = {})
#
# @option options [Integer] :thread_count (10) Customize threads used in the multipart download.
#
# @option options [String] :version_id The object version id used to retrieve the object.
#
# @see https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html ObjectVersioning
#
# @option options [String] :checksum_mode ("ENABLED")
# When `"ENABLED"` and the object has a stored checksum, it will be used to validate the download and will
# raise an `Aws::Errors::ChecksumError` if checksum validation fails. You may provide a `on_checksum_validated`
Expand All @@ -539,10 +540,13 @@ def upload_file(source, options = {})
# @see Client#get_object
# @see Client#head_object
def download_file(destination, options = {})
downloader = FileDownloader.new(client: client)
download_opts = options.merge(bucket: bucket_name, key: key)
executor = DefaultExecutor.new(max_threads: download_opts.delete([:thread_count]))
downloader = FileDownloader.new(client: client, executor: executor)
Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do
downloader.download(destination, options.merge(bucket: bucket_name, key: key))
downloader.download(destination, download_opts)
end
executor.shutdown
true
end
deprecated(:download_file, use: 'Aws::S3::TransferManager#download_file', version: 'next major version')
Expand Down
101 changes: 101 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/default_executor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# frozen_string_literal: true

module Aws
module S3
# @api private
class DefaultExecutor
DEFAULT_MAX_THREADS = 10
RUNNING = :running
SHUTTING_DOWN = :shutting_down
SHUTDOWN = :shutdown

def initialize(options = {})
@max_threads = options[:max_threads] || DEFAULT_MAX_THREADS
@state = RUNNING
@queue = Queue.new
@pool = []
@mutex = Mutex.new
end

# Submits a task for execution.
# @param [Object] args Variable number of arguments to pass to the block
# @param [Proc] block The block to be executed
# @return [Boolean] Returns true if the task was submitted successfully
def post(*args, &block)
@mutex.synchronize do
raise 'Executor has been shutdown and is no longer accepting tasks' unless @state == RUNNING

@queue << [args, block]
ensure_worker_available
end
true
end

# Immediately terminates all worker threads and clears pending tasks.
# This is a forceful shutdown that doesn't wait for running tasks to complete.
#
# @return [Boolean] true when termination is complete
def kill
@mutex.synchronize do
@state = SHUTDOWN
@pool.each(&:kill)
@pool.clear
@queue.clear
end
true
end

# Gracefully shuts down the executor, optionally with a timeout.
# Stops accepting new tasks and waits for running tasks to complete.
#
# @param timeout [Numeric, nil] Maximum time in seconds to wait for shutdown.
# If nil, waits indefinitely. If timeout expires, remaining threads are killed.
# @return [Boolean] true when shutdown is complete
def shutdown(timeout = nil)
@mutex.synchronize do
return true if @state == SHUTDOWN

@state = SHUTTING_DOWN
@pool.size.times { @queue << :shutdown }
end

if timeout
deadline = Time.now + timeout
@pool.each do |thread|
remaining = deadline - Time.now
break if remaining <= 0

thread.join([remaining, 0].max)
end
@pool.select(&:alive?).each(&:kill)
else
@pool.each(&:join)
end

@pool.clear
@state = SHUTDOWN
true
end

private

def ensure_worker_available
return unless @state == RUNNING

@pool.select!(&:alive?)
@pool << spawn_worker if @pool.size < @max_threads
end

def spawn_worker
Thread.new do
while (job = @queue.shift)
break if job == :shutdown

args, block = job
block.call(*args)
end
end
end
end
end
end
16 changes: 16 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/directory_download_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

module Aws
module S3
# Raised when DirectoryDownloader fails to download objects from S3 bucket
class DirectoryDownloadError < StandardError
def initialize(message, errors = [])
@errors = errors
super(message)
end

# @return [Array<StandardError>] The list of errors encountered when downloading objects
attr_reader :errors
end
end
end
162 changes: 162 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/directory_downloader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# frozen_string_literal: true

module Aws
module S3
# @api private
class DirectoryDownloader
def initialize(options = {})
@client = options[:client]
@executor = options[:executor]
@abort_requested = false
@mutex = Mutex.new
end

attr_reader :abort_requested

def download(destination, bucket:, **options)
if File.exist?(destination)
raise ArgumentError, 'invalid destination, expected a directory' unless File.directory?(destination)
else
FileUtils.mkdir_p(destination)
end

download_opts = build_download_opts(destination, bucket, options.dup)
downloader = FileDownloader.new(client: @client, executor: @executor)
producer = ObjectProducer.new(download_opts.merge(client: @client, directory_downloader: self))
downloads, errors = process_download_queue(producer, downloader, download_opts)
build_result(downloads, errors)
ensure
@abort_requested = false
end

private

def request_abort
@mutex.synchronize { @abort_requested = true }
end
def build_download_opts(destination, bucket, opts)
{
destination: destination,
bucket: bucket,
s3_prefix: opts.delete(:s3_prefix),
ignore_failure: opts.delete(:ignore_failure) || false,
filter_callback: opts.delete(:filter_callback),
progress_callback: opts.delete(:progress_callback)
}
end

def build_result(download_count, errors)
if @abort_requested
msg = "directory download failed: #{errors.map(&:message).join('; ')}"
raise DirectoryDownloadError.new(msg, errors)
else
{
completed_downloads: [download_count - errors.count, 0].max,
failed_downloads: errors.count,
errors: errors.any? ? errors : nil
}.compact
end
end

def handle_error(executor, opts)
return if opts[:ignore_failure]

request_abort
executor.kill
end

def process_download_queue(producer, downloader, opts)
# Separate executor for lightweight queuing tasks, avoiding interference with main @executor lifecycle
queue_executor = DefaultExecutor.new
progress = DirectoryProgress.new(opts[:progress_callback]) if opts[:progress_callback]
download_attempts = 0
errors = []
begin
producer.each do |object|
break if @abort_requested

download_attempts += 1
queue_executor.post(object) do |o|
dir_path = File.dirname(o[:path])
FileUtils.mkdir_p(dir_path) unless dir_path == opts[:destination] || Dir.exist?(dir_path)

downloader.download(o[:path], bucket: opts[:bucket], key: o[:key])
progress&.call(File.size(o[:path]))
rescue StandardError => e
errors << e
handle_error(queue_executor, opts)
end
end
rescue StandardError => e
errors << e
handle_error(queue_executor, opts)
end
queue_executor.shutdown
[download_attempts, errors]
end

# @api private
class ObjectProducer
include Enumerable

DEFAULT_QUEUE_SIZE = 100

def initialize(options = {})
@destination_dir = options[:destination]
@client = options[:client]
@bucket = options[:bucket]
@s3_prefix = options[:s3_prefix]
@filter_callback = options[:filter_callback]
@directory_downloader = options[:directory_downloader]
@object_queue = SizedQueue.new(DEFAULT_QUEUE_SIZE)
end

def each
producer_thread = Thread.new do
stream_objects
ensure
@object_queue << :done
end

# Yield objects from internal queue
while (object = @object_queue.shift) != :done
break if @directory_downloader.abort_requested

yield object
end
ensure
producer_thread.join
end

private

def build_object_entry(key)
{ path: File.join(@destination_dir, normalize_key(key)), key: key }
end

def stream_objects(continuation_token: nil)
resp = @client.list_objects_v2(bucket: @bucket, prefix: @s3_prefix, continuation_token: continuation_token)
resp.contents.each do |o|
break if @directory_downloader.abort_requested
next if o.key.end_with?('/') && o.size.zero?
next unless include_object?(o.key)

@object_queue << build_object_entry(o.key)
end
stream_objects(continuation_token: resp.next_continuation_token) if resp.next_continuation_token
end

def include_object?(key)
return true unless @filter_callback

@filter_callback.call(key)
end

def normalize_key(key)
key = key.delete_prefix(@s3_prefix) if @s3_prefix
File::SEPARATOR == '/' ? key : key.tr('/', File::SEPARATOR)
end
end
end
end
end
Loading
Loading