Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ruby/lib/ci/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
require 'ci/queue/strategy/base'
require 'ci/queue/strategy/random'
require 'ci/queue/strategy/timing_based'
require 'ci/queue/strategy/suite_bin_packing'
require 'ci/queue/test_chunk'

module CI
module Queue
Expand Down Expand Up @@ -62,6 +64,8 @@ def get_strategy(strategy_name)
case strategy_name&.to_sym
when :timing_based
Strategy::TimingBased.new
when :suite_bin_packing
Strategy::SuiteBinPacking.new
else
Strategy::Random.new
end
Expand Down
6 changes: 5 additions & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class Configuration
attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration
attr_accessor :max_test_failed, :redis_ttl
attr_accessor :strategy, :timing_file, :timing_fallback_duration, :export_timing_file
attr_accessor :suite_max_duration, :suite_buffer_percent
attr_reader :circuit_breakers
attr_writer :seed, :build_id
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
Expand Down Expand Up @@ -53,7 +54,8 @@ def initialize(
max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil,
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
export_flaky_tests_file: nil, known_flaky_tests: [],
strategy: :random, timing_file: nil, timing_fallback_duration: 100.0, export_timing_file: nil
strategy: :random, timing_file: nil, timing_fallback_duration: 100.0, export_timing_file: nil,
suite_max_duration: 120_000, suite_buffer_percent: 10
)
@build_id = build_id
@circuit_breakers = [CircuitBreaker::Disabled]
Expand Down Expand Up @@ -83,6 +85,8 @@ def initialize(
@timing_file = timing_file
@timing_fallback_duration = timing_fallback_duration
@export_timing_file = export_timing_file
@suite_max_duration = suite_max_duration
@suite_buffer_percent = suite_buffer_percent
end

def queue_init_timeout
Expand Down
145 changes: 139 additions & 6 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,19 @@ def distributed?

def populate(tests, random: Random.new)
@index = tests.map { |t| [t.id, t] }.to_h
tests = Queue.shuffle(tests, random, config: config)
push(tests.map(&:id))
executables = Queue.shuffle(tests, random, config: config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think "shuffle" (and the underlying "order_tests") now does more than reordering. Maybe we should rename this?
Or maybe separate to two function calls?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^the above comment is irrelevant now given there will be a refactor following up.


# Separate chunks from individual tests
chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }
individual_tests = executables.select { |e| !e.is_a?(CI::Queue::TestChunk) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, wondering if we should just stop using individual tests as a first class element in the queue, and just always use TestChunk. (An individual test can also be wrapped into a TestChunk).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benefit is whenever we interact with an item from the queue, we have a single interface to deal with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. I started down that path but there is a lot of places in the reporter code that expect a SingleExample

Once we validate that this is a good approach, I'll come back and clean this up a bit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. That makes sense!


# Store chunk metadata in Redis (only master does this)
store_chunk_metadata(chunks) if chunks.any?

# Push all IDs to queue (chunks + individual tests)
all_ids = chunks.map(&:id) + individual_tests.map(&:id)
push(all_ids)

self
end

Expand Down Expand Up @@ -60,9 +71,16 @@ def poll
idle_since = nil
idle_state_printed = false
until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
if test = reserve
if id = reserve
idle_since = nil
yield index.fetch(test)
executable = resolve_executable(id)

if executable
yield executable
else
warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.")
acknowledge(id)
end
else
idle_since ||= Time.now
if Time.now - idle_since > 120 && !idle_state_printed
Expand Down Expand Up @@ -121,8 +139,9 @@ def build
@build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config)
end

def acknowledge(test)
test_key = test.id
def acknowledge(test_or_id)
# Accept either an object with .id or a string ID
test_key = test_or_id.respond_to?(:id) ? test_or_id.id : test_or_id
raise_on_mismatching_test(test_key)
eval_script(
:acknowledge,
Expand Down Expand Up @@ -253,6 +272,120 @@ def push(tests)
def register
redis.sadd(key('workers'), [worker_id])
end

private

def store_chunk_metadata(chunks)
# Batch operations to avoid exceeding Redis multi operation limits
# Each chunk requires 3 commands (set, expire, sadd), so batch conservatively
batch_size = 7 # 7 chunks = 21 commands + 1 expire = 22 commands per batch

chunks.each_slice(batch_size) do |chunk_batch|
redis.multi do |transaction|
chunk_batch.each do |chunk|
# Store chunk metadata with TTL
transaction.set(
key('chunk', chunk.id),
chunk.to_json
)
transaction.expire(key('chunk', chunk.id), config.redis_ttl)

# Track all chunks for cleanup
transaction.sadd(key('chunks'), chunk.id)
end
transaction.expire(key('chunks'), config.redis_ttl)
end
end
end

def chunk_id?(id)
id.include?(':full_suite') || id.include?(':chunk_')
end

def resolve_executable(id)
# Detect chunk by ID pattern
if chunk_id?(id)
resolve_chunk(id)
else
# Regular test - existing behavior
index.fetch(id)
end
end

def resolve_chunk(chunk_id)
# Fetch chunk metadata from Redis
chunk_json = redis.get(key('chunk', chunk_id))
unless chunk_json
warn "Warning: Chunk metadata not found for #{chunk_id}"
return nil
end

chunk = CI::Queue::TestChunk.from_json(chunk_id, chunk_json)

# Resolve test objects based on chunk type
test_objects = if chunk.full_suite?
resolve_full_suite_tests(chunk.suite_name)
else
resolve_partial_suite_tests(chunk.test_ids)
end

if test_objects.empty?
warn "Warning: No tests found for chunk #{chunk_id}"
return nil
end

# Return enriched chunk with actual test objects
ResolvedChunk.new(chunk, test_objects)
rescue JSON::ParserError => e
warn "Warning: Could not parse chunk metadata for #{chunk_id}: #{e.message}"
nil
rescue KeyError => e
warn "Warning: Could not resolve test in chunk #{chunk_id}: #{e.message}"
nil
end

def resolve_full_suite_tests(suite_name)
# Filter index for all tests from this suite
# Tests are added to index during populate() with format "SuiteName#test_method"
prefix = "#{suite_name}#"
tests = index.select { |test_id, _| test_id.start_with?(prefix) }
.values

# Sort to maintain consistent order (alphabetical by test name)
tests.sort_by(&:id)
end

def resolve_partial_suite_tests(test_ids)
# Fetch specific tests from index
test_ids.map { |test_id| index.fetch(test_id) }
end

# Represents a chunk with resolved test objects
class ResolvedChunk
attr_reader :chunk_id, :suite_name, :tests

def initialize(chunk, tests)
@chunk_id = chunk.id
@suite_name = chunk.suite_name
@tests = tests.freeze
end

def id
chunk_id
end

def chunk?
true
end

def flaky?
tests.any?(&:flaky?)
end

def size
tests.size
end
end
end
end
end
Expand Down
137 changes: 137 additions & 0 deletions ruby/lib/ci/queue/strategy/suite_bin_packing.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# frozen_string_literal: true
require_relative 'base'
require 'json'

module CI
module Queue
module Strategy
class SuiteBinPacking < Base
def order_tests(tests, random: Random.new, config: nil)
timing_data = load_timing_data(config&.timing_file)
max_duration = config&.suite_max_duration || 120_000
fallback_duration = config&.timing_fallback_duration || 100.0
buffer_percent = config&.suite_buffer_percent || 10

# Group tests by suite name
suites = tests.group_by { |test| extract_suite_name(test.id) }

# Create chunks for each suite
chunks = []
suites.each do |suite_name, suite_tests|
chunks.concat(
create_chunks_for_suite(
suite_name,
suite_tests,
max_duration,
buffer_percent,
timing_data,
fallback_duration
)
)
end

# Sort chunks by estimated duration (longest first)
chunks.sort_by { |chunk| -chunk.estimated_duration }
end

private

def extract_suite_name(test_id)
test_id.split('#').first
end

def load_timing_data(file_path)
return {} unless file_path && ::File.exist?(file_path)

JSON.parse(::File.read(file_path))
rescue JSON::ParserError => e
warn "Warning: Could not parse timing file #{file_path}: #{e.message}"
{}
end

def get_test_duration(test_id, timing_data, fallback_duration)
timing_data[test_id]&.to_f || fallback_duration
end

def create_chunks_for_suite(suite_name, suite_tests, max_duration, buffer_percent, timing_data, fallback_duration)
# Calculate total suite duration
total_duration = suite_tests.sum do |test|
get_test_duration(test.id, timing_data, fallback_duration)
end

# If suite fits in max duration, create full_suite chunk
if total_duration <= max_duration
chunk_id = "#{suite_name}:full_suite"
# Don't store test_ids - worker will resolve from index
return [TestChunk.new(chunk_id, suite_name, :full_suite, [], total_duration)]
end

# Suite too large - split into partial_suite chunks
split_suite_into_chunks(
suite_name,
suite_tests,
max_duration,
buffer_percent,
timing_data,
fallback_duration
)
end

def split_suite_into_chunks(suite_name, suite_tests, max_duration, buffer_percent, timing_data, fallback_duration)
# Apply buffer to max duration
effective_max = max_duration * (1 - buffer_percent / 100.0)

# Sort tests by duration (longest first for better bin packing)
sorted_tests = suite_tests.sort_by do |test|
-get_test_duration(test.id, timing_data, fallback_duration)
end

# First-fit decreasing bin packing
chunks = []
current_chunk_tests = []
current_chunk_duration = 0.0
chunk_index = 0

sorted_tests.each do |test|
test_duration = get_test_duration(test.id, timing_data, fallback_duration)

if current_chunk_duration + test_duration > effective_max && current_chunk_tests.any?
# Finalize current chunk and start new one
chunk_id = "#{suite_name}:chunk_#{chunk_index}"
test_ids = current_chunk_tests.map(&:id)
chunks << TestChunk.new(
chunk_id,
suite_name,
:partial_suite,
test_ids,
current_chunk_duration
)

current_chunk_tests = [test]
current_chunk_duration = test_duration
chunk_index += 1
else
current_chunk_tests << test
current_chunk_duration += test_duration
end
end

# Add final chunk
if current_chunk_tests.any?
chunk_id = "#{suite_name}:chunk_#{chunk_index}"
test_ids = current_chunk_tests.map(&:id)
chunks << TestChunk.new(
chunk_id,
suite_name,
:partial_suite,
test_ids,
current_chunk_duration
)
end

chunks
end
end
end
end
end
Loading
Loading