Skip to content

Commit 243cd93

Browse files
committed
support suite-based queueing
1 parent 9a1776f commit 243cd93

File tree

13 files changed

+1285
-24
lines changed

13 files changed

+1285
-24
lines changed

ruby/lib/ci/queue.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
require 'ci/queue/strategy/base'
1717
require 'ci/queue/strategy/random'
1818
require 'ci/queue/strategy/timing_based'
19+
require 'ci/queue/strategy/suite_bin_packing'
20+
require 'ci/queue/test_chunk'
1921

2022
module CI
2123
module Queue
@@ -62,6 +64,8 @@ def get_strategy(strategy_name)
6264
case strategy_name&.to_sym
6365
when :timing_based
6466
Strategy::TimingBased.new
67+
when :suite_bin_packing
68+
Strategy::SuiteBinPacking.new
6569
else
6670
Strategy::Random.new
6771
end

ruby/lib/ci/queue/configuration.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class Configuration
99
attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration
1010
attr_accessor :max_test_failed, :redis_ttl
1111
attr_accessor :strategy, :timing_file, :timing_fallback_duration, :export_timing_file
12+
attr_accessor :suite_max_duration, :suite_buffer_percent
1213
attr_reader :circuit_breakers
1314
attr_writer :seed, :build_id
1415
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
@@ -53,7 +54,8 @@ def initialize(
5354
max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil,
5455
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
5556
export_flaky_tests_file: nil, known_flaky_tests: [],
56-
strategy: :random, timing_file: nil, timing_fallback_duration: 100.0, export_timing_file: nil
57+
strategy: :random, timing_file: nil, timing_fallback_duration: 100.0, export_timing_file: nil,
58+
suite_max_duration: 120_000, suite_buffer_percent: 10
5759
)
5860
@build_id = build_id
5961
@circuit_breakers = [CircuitBreaker::Disabled]
@@ -83,6 +85,8 @@ def initialize(
8385
@timing_file = timing_file
8486
@timing_fallback_duration = timing_fallback_duration
8587
@export_timing_file = export_timing_file
88+
@suite_max_duration = suite_max_duration
89+
@suite_buffer_percent = suite_buffer_percent
8690
end
8791

8892
def queue_init_timeout

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 133 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,19 @@ def distributed?
3030

3131
def populate(tests, random: Random.new)
3232
@index = tests.map { |t| [t.id, t] }.to_h
33-
tests = Queue.shuffle(tests, random, config: config)
34-
push(tests.map(&:id))
33+
executables = Queue.shuffle(tests, random, config: config)
34+
35+
# Separate chunks from individual tests
36+
chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }
37+
individual_tests = executables.select { |e| !e.is_a?(CI::Queue::TestChunk) }
38+
39+
# Store chunk metadata in Redis (only master does this)
40+
store_chunk_metadata(chunks) if chunks.any?
41+
42+
# Push all IDs to queue (chunks + individual tests)
43+
all_ids = chunks.map(&:id) + individual_tests.map(&:id)
44+
push(all_ids)
45+
3546
self
3647
end
3748

@@ -60,9 +71,16 @@ def poll
6071
idle_since = nil
6172
idle_state_printed = false
6273
until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
63-
if test = reserve
74+
if id = reserve
6475
idle_since = nil
65-
yield index.fetch(test)
76+
executable = resolve_executable(id)
77+
78+
if executable
79+
yield executable
80+
else
81+
warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.")
82+
acknowledge(id)
83+
end
6684
else
6785
idle_since ||= Time.now
6886
if Time.now - idle_since > 120 && !idle_state_printed
@@ -121,8 +139,9 @@ def build
121139
@build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config)
122140
end
123141

124-
def acknowledge(test)
125-
test_key = test.id
142+
def acknowledge(test_or_id)
143+
# Accept either an object with .id or a string ID
144+
test_key = test_or_id.respond_to?(:id) ? test_or_id.id : test_or_id
126145
raise_on_mismatching_test(test_key)
127146
eval_script(
128147
:acknowledge,
@@ -253,6 +272,114 @@ def push(tests)
253272
def register
254273
redis.sadd(key('workers'), [worker_id])
255274
end
275+
276+
private
277+
278+
def store_chunk_metadata(chunks)
279+
redis.multi do |transaction|
280+
chunks.each do |chunk|
281+
# Store chunk metadata with TTL
282+
transaction.set(
283+
key('chunk', chunk.id),
284+
chunk.to_json
285+
)
286+
transaction.expire(key('chunk', chunk.id), config.redis_ttl)
287+
288+
# Track all chunks for cleanup
289+
transaction.sadd(key('chunks'), chunk.id)
290+
end
291+
transaction.expire(key('chunks'), config.redis_ttl)
292+
end
293+
end
294+
295+
def chunk_id?(id)
296+
id.include?(':full_suite') || id.include?(':chunk_')
297+
end
298+
299+
def resolve_executable(id)
300+
# Detect chunk by ID pattern
301+
if chunk_id?(id)
302+
resolve_chunk(id)
303+
else
304+
# Regular test - existing behavior
305+
index.fetch(id)
306+
end
307+
end
308+
309+
def resolve_chunk(chunk_id)
310+
# Fetch chunk metadata from Redis
311+
chunk_json = redis.get(key('chunk', chunk_id))
312+
unless chunk_json
313+
warn "Warning: Chunk metadata not found for #{chunk_id}"
314+
return nil
315+
end
316+
317+
chunk = CI::Queue::TestChunk.from_json(chunk_id, chunk_json)
318+
319+
# Resolve test objects based on chunk type
320+
test_objects = if chunk.full_suite?
321+
resolve_full_suite_tests(chunk.suite_name)
322+
else
323+
resolve_partial_suite_tests(chunk.test_ids)
324+
end
325+
326+
if test_objects.empty?
327+
warn "Warning: No tests found for chunk #{chunk_id}"
328+
return nil
329+
end
330+
331+
# Return enriched chunk with actual test objects
332+
ResolvedChunk.new(chunk, test_objects)
333+
rescue JSON::ParserError => e
334+
warn "Warning: Could not parse chunk metadata for #{chunk_id}: #{e.message}"
335+
nil
336+
rescue KeyError => e
337+
warn "Warning: Could not resolve test in chunk #{chunk_id}: #{e.message}"
338+
nil
339+
end
340+
341+
def resolve_full_suite_tests(suite_name)
342+
# Filter index for all tests from this suite
343+
# Tests are added to index during populate() with format "SuiteName#test_method"
344+
prefix = "#{suite_name}#"
345+
tests = index.select { |test_id, _| test_id.start_with?(prefix) }
346+
.values
347+
348+
# Sort to maintain consistent order (alphabetical by test name)
349+
tests.sort_by(&:id)
350+
end
351+
352+
def resolve_partial_suite_tests(test_ids)
353+
# Fetch specific tests from index
354+
test_ids.map { |test_id| index.fetch(test_id) }
355+
end
356+
357+
# Represents a chunk with resolved test objects
358+
class ResolvedChunk
359+
attr_reader :chunk_id, :suite_name, :tests
360+
361+
def initialize(chunk, tests)
362+
@chunk_id = chunk.id
363+
@suite_name = chunk.suite_name
364+
@tests = tests.freeze
365+
end
366+
367+
def id
368+
chunk_id
369+
end
370+
371+
def chunk?
372+
true
373+
end
374+
375+
def flaky?
376+
tests.any?(&:flaky?)
377+
end
378+
379+
def size
380+
tests.size
381+
end
382+
end
256383
end
257384
end
258385
end
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# frozen_string_literal: true
2+
require_relative 'base'
3+
require 'json'
4+
5+
module CI
6+
module Queue
7+
module Strategy
8+
class SuiteBinPacking < Base
9+
def order_tests(tests, random: Random.new, config: nil)
10+
timing_data = load_timing_data(config&.timing_file)
11+
max_duration = config&.suite_max_duration || 120_000
12+
fallback_duration = config&.timing_fallback_duration || 100.0
13+
buffer_percent = config&.suite_buffer_percent || 10
14+
15+
# Group tests by suite name
16+
suites = tests.group_by { |test| extract_suite_name(test.id) }
17+
18+
# Create chunks for each suite
19+
chunks = []
20+
suites.each do |suite_name, suite_tests|
21+
chunks.concat(
22+
create_chunks_for_suite(
23+
suite_name,
24+
suite_tests,
25+
max_duration,
26+
buffer_percent,
27+
timing_data,
28+
fallback_duration
29+
)
30+
)
31+
end
32+
33+
# Sort chunks by estimated duration (longest first)
34+
chunks.sort_by { |chunk| -chunk.estimated_duration }
35+
end
36+
37+
private
38+
39+
def extract_suite_name(test_id)
40+
test_id.split('#').first
41+
end
42+
43+
def load_timing_data(file_path)
44+
return {} unless file_path && ::File.exist?(file_path)
45+
46+
JSON.parse(::File.read(file_path))
47+
rescue JSON::ParserError => e
48+
warn "Warning: Could not parse timing file #{file_path}: #{e.message}"
49+
{}
50+
end
51+
52+
def get_test_duration(test_id, timing_data, fallback_duration)
53+
timing_data[test_id]&.to_f || fallback_duration
54+
end
55+
56+
def create_chunks_for_suite(suite_name, suite_tests, max_duration, buffer_percent, timing_data, fallback_duration)
57+
# Calculate total suite duration
58+
total_duration = suite_tests.sum do |test|
59+
get_test_duration(test.id, timing_data, fallback_duration)
60+
end
61+
62+
# If suite fits in max duration, create full_suite chunk
63+
if total_duration <= max_duration
64+
chunk_id = "#{suite_name}:full_suite"
65+
# Don't store test_ids - worker will resolve from index
66+
return [TestChunk.new(chunk_id, suite_name, :full_suite, [], total_duration)]
67+
end
68+
69+
# Suite too large - split into partial_suite chunks
70+
split_suite_into_chunks(
71+
suite_name,
72+
suite_tests,
73+
max_duration,
74+
buffer_percent,
75+
timing_data,
76+
fallback_duration
77+
)
78+
end
79+
80+
def split_suite_into_chunks(suite_name, suite_tests, max_duration, buffer_percent, timing_data, fallback_duration)
81+
# Apply buffer to max duration
82+
effective_max = max_duration * (1 - buffer_percent / 100.0)
83+
84+
# Sort tests by duration (longest first for better bin packing)
85+
sorted_tests = suite_tests.sort_by do |test|
86+
-get_test_duration(test.id, timing_data, fallback_duration)
87+
end
88+
89+
# First-fit decreasing bin packing
90+
chunks = []
91+
current_chunk_tests = []
92+
current_chunk_duration = 0.0
93+
chunk_index = 0
94+
95+
sorted_tests.each do |test|
96+
test_duration = get_test_duration(test.id, timing_data, fallback_duration)
97+
98+
if current_chunk_duration + test_duration > effective_max && current_chunk_tests.any?
99+
# Finalize current chunk and start new one
100+
chunk_id = "#{suite_name}:chunk_#{chunk_index}"
101+
test_ids = current_chunk_tests.map(&:id)
102+
chunks << TestChunk.new(
103+
chunk_id,
104+
suite_name,
105+
:partial_suite,
106+
test_ids,
107+
current_chunk_duration
108+
)
109+
110+
current_chunk_tests = [test]
111+
current_chunk_duration = test_duration
112+
chunk_index += 1
113+
else
114+
current_chunk_tests << test
115+
current_chunk_duration += test_duration
116+
end
117+
end
118+
119+
# Add final chunk
120+
if current_chunk_tests.any?
121+
chunk_id = "#{suite_name}:chunk_#{chunk_index}"
122+
test_ids = current_chunk_tests.map(&:id)
123+
chunks << TestChunk.new(
124+
chunk_id,
125+
suite_name,
126+
:partial_suite,
127+
test_ids,
128+
current_chunk_duration
129+
)
130+
end
131+
132+
chunks
133+
end
134+
end
135+
end
136+
end
137+
end

0 commit comments

Comments
 (0)