@@ -56,11 +56,11 @@ internal class TaskScheduler(RunnerEventsProcessor nodeEventProcessor, Microsoft
56
56
, ITaskScheduler
57
57
{
58
58
private static readonly TimeSpan blobRunInterval = TimeSpan . FromSeconds ( 15 ) ;
59
- private static readonly TimeSpan queuedRunInterval = TimeSpan . FromMilliseconds ( 100 ) ;
59
+ private static readonly TimeSpan queuedRunInterval = TimeSpan . FromSeconds ( 15 ) ;
60
60
private static readonly TimeSpan queuedRepositoryInterval = TimeSpan . FromMinutes ( 1 ) ;
61
61
internal static readonly TimeSpan BatchRunInterval = TimeSpan . FromSeconds ( 30 ) ; // The very fastest processes inside of Azure Batch accessing anything within pools or jobs appears to use a 30 second polling interval
62
- private static readonly TimeSpan shortBackgroundRunInterval = TimeSpan . FromSeconds ( 1 ) ;
63
- private static readonly TimeSpan longBackgroundRunInterval = TimeSpan . FromSeconds ( 2.5 ) ;
62
+ private static readonly TimeSpan shortBackgroundRunInterval = TimeSpan . FromMilliseconds ( 75 ) ;
63
+ private static readonly TimeSpan longBackgroundRunInterval = TimeSpan . FromSeconds ( 15 ) ;
64
64
private static readonly TimeSpan orphanedTaskInterval = TimeSpan . FromMinutes ( 10 ) ;
65
65
private readonly RunnerEventsProcessor nodeEventProcessor = nodeEventProcessor ;
66
66
@@ -209,9 +209,31 @@ private async ValueTask ProcessQueuedTesTasksAsync(CancellationToken cancellatio
209
209
. OrderByDescending ( t => t . CreationTime )
210
210
. ToAsyncEnumerable ( ) ) ;
211
211
212
- while ( ! cancellationToken . IsCancellationRequested && ( queuedTesTasks . TryDequeue ( out var tesTask ) || ( DateTimeOffset . UtcNow >= nextQueuedRepository && await RefillFromRepository ( ) && queuedTesTasks . TryDequeue ( out tesTask ) ) ) )
212
+ cancellationToken . ThrowIfCancellationRequested ( ) ;
213
+ var now = DateTimeOffset . UtcNow ;
214
+
215
+ HashSet < TesTask > tasks = new ( new TesTasByIdComparer ( ) ) ;
216
+
217
+ while ( queuedTesTasks . TryDequeue ( out var tesTask ) )
213
218
{
214
- await ProcessOrchestratedTesTaskAsync ( "Queued" , new ( BatchScheduler . ProcessQueuedTesTaskAsync ( tesTask , cancellationToken ) , tesTask ) , Requeue , cancellationToken ) ;
219
+ _ = tasks . Add ( tesTask ) ;
220
+ }
221
+
222
+ // Catch any tasks reset back to Queued
223
+ if ( nextQueuedRepository <= now )
224
+ {
225
+ nextQueuedRepository = now + queuedRepositoryInterval ;
226
+ await ( await query ( cancellationToken ) ) . ForEachAsync ( task => _ = tasks . Add ( task ) , cancellationToken ) ;
227
+ }
228
+
229
+ tasks . ForEach ( QueueTesTask ) ;
230
+
231
+ void QueueTesTask ( TesTask tesTask )
232
+ {
233
+ _ = BatchScheduler . ProcessQueuedTesTaskAsync ( tesTask , cancellationToken )
234
+ . ContinueWith ( task => ProcessOrchestratedTesTaskAsync ( "Queued" , new ( task , tesTask ) , Requeue , cancellationToken ) . AsTask ( ) )
235
+ . Unwrap ( )
236
+ . ContinueWith ( task => Logger . LogError ( task . Exception , "Failure to queue TesTask {TesTask}" , tesTask . Id ) , TaskContinuationOptions . OnlyOnFaulted ) ;
215
237
}
216
238
217
239
async ValueTask Requeue ( RepositoryCollisionException < TesTask > exception )
@@ -223,14 +245,6 @@ async ValueTask Requeue(RepositoryCollisionException<TesTask> exception)
223
245
queuedTesTasks . Enqueue ( tesTask ) ;
224
246
}
225
247
}
226
-
227
- // Catch any tasks reset back to Queued
228
- async ValueTask < bool > RefillFromRepository ( )
229
- {
230
- nextQueuedRepository = DateTimeOffset . UtcNow + queuedRepositoryInterval ;
231
- await ( await query ( cancellationToken ) ) . ForEachAsync ( queuedTesTasks . Enqueue , cancellationToken ) ;
232
- return ! queuedTesTasks . IsEmpty ;
233
- }
234
248
}
235
249
236
250
/// <summary>
@@ -538,6 +552,12 @@ await OrchestrateTesTasksOnBatchAsync(
538
552
cancellationToken ) ;
539
553
}
540
554
555
+ private sealed class TesTasByIdComparer : IEqualityComparer < TesTask >
556
+ {
557
+ bool IEqualityComparer < TesTask > . Equals ( TesTask x , TesTask y ) => x ? . Id . Equals ( y . Id ) ?? false ;
558
+ int IEqualityComparer < TesTask > . GetHashCode ( TesTask obj ) => obj . Id ? . GetHashCode ( ) ?? 0 ;
559
+ }
560
+
541
561
/// <inheritdoc/>
542
562
void ITaskScheduler . QueueTesTask ( TesTask tesTask )
543
563
{
0 commit comments