1
1
use std:: fmt:: Debug ;
2
2
use std:: hash:: Hash ;
3
3
use std:: io:: Write ;
4
- use std:: iter;
5
4
use std:: num:: NonZero ;
6
5
use std:: sync:: Arc ;
7
6
8
7
use parking_lot:: { Condvar , Mutex } ;
9
- use rustc_data_structures:: fx:: { FxHashMap , FxHashSet } ;
8
+ use rustc_data_structures:: fx:: FxHashMap ;
10
9
use rustc_errors:: { Diag , DiagCtxtHandle } ;
11
10
use rustc_hir:: def:: DefKind ;
12
11
use rustc_session:: Session ;
13
- use rustc_span:: { DUMMY_SP , Span } ;
12
+ use rustc_span:: Span ;
14
13
15
14
use super :: QueryStackFrameExtra ;
16
15
use crate :: dep_graph:: DepContext ;
@@ -45,18 +44,6 @@ impl QueryJobId {
45
44
fn query < I : Clone > ( self , map : & QueryMap < I > ) -> QueryStackFrame < I > {
46
45
map. get ( & self ) . unwrap ( ) . query . clone ( )
47
46
}
48
-
49
- fn span < I > ( self , map : & QueryMap < I > ) -> Span {
50
- map. get ( & self ) . unwrap ( ) . job . span
51
- }
52
-
53
- fn parent < I > ( self , map : & QueryMap < I > ) -> Option < QueryJobId > {
54
- map. get ( & self ) . unwrap ( ) . job . parent
55
- }
56
-
57
- fn latch < I > ( self , map : & QueryMap < I > ) -> Option < & QueryLatch < I > > {
58
- map. get ( & self ) . unwrap ( ) . job . latch . as_ref ( )
59
- }
60
47
}
61
48
62
49
#[ derive( Clone , Debug ) ]
@@ -173,9 +160,7 @@ impl QueryJobId {
173
160
174
161
#[ derive( Debug ) ]
175
162
struct QueryWaiter < I > {
176
- query : Option < QueryJobId > ,
177
163
condvar : Condvar ,
178
- span : Span ,
179
164
cycle : Mutex < Option < CycleError < I > > > ,
180
165
}
181
166
@@ -204,14 +189,8 @@ impl<I> QueryLatch<I> {
204
189
}
205
190
206
191
/// Awaits for the query job to complete.
207
- pub ( super ) fn wait_on (
208
- & self ,
209
- qcx : impl QueryContext ,
210
- query : Option < QueryJobId > ,
211
- span : Span ,
212
- ) -> Result < ( ) , CycleError < I > > {
213
- let waiter =
214
- Arc :: new ( QueryWaiter { query, span, cycle : Mutex :: new ( None ) , condvar : Condvar :: new ( ) } ) ;
192
+ pub ( super ) fn wait_on ( & self , qcx : impl QueryContext ) -> Result < ( ) , CycleError < I > > {
193
+ let waiter = Arc :: new ( QueryWaiter { cycle : Mutex :: new ( None ) , condvar : Condvar :: new ( ) } ) ;
215
194
self . wait_on_inner ( qcx, & waiter) ;
216
195
// FIXME: Get rid of this lock. We have ownership of the QueryWaiter
217
196
// although another thread may still have a Arc reference so we cannot
@@ -233,10 +212,6 @@ impl<I> QueryLatch<I> {
233
212
// this thread.
234
213
info. waiters . push ( Arc :: clone ( waiter) ) ;
235
214
236
- // If this detects a deadlock and the deadlock handler wants to resume this thread
237
- // we have to be in the `wait` call. This is ensured by the deadlock handler
238
- // getting the self.info lock.
239
- rayon_core:: mark_blocked ( ) ;
240
215
let proxy = qcx. jobserver_proxy ( ) ;
241
216
proxy. release_thread ( ) ;
242
217
waiter. condvar . wait ( & mut info) ;
@@ -251,304 +226,10 @@ impl<I> QueryLatch<I> {
251
226
let mut info = self . info . lock ( ) ;
252
227
debug_assert ! ( !info. complete) ;
253
228
info. complete = true ;
254
- let registry = rayon_core:: Registry :: current ( ) ;
255
229
for waiter in info. waiters . drain ( ..) {
256
- rayon_core:: mark_unblocked ( & registry) ;
257
230
waiter. condvar . notify_one ( ) ;
258
231
}
259
232
}
260
-
261
- /// Removes a single waiter from the list of waiters.
262
- /// This is used to break query cycles.
263
- fn extract_waiter ( & self , waiter : usize ) -> Arc < QueryWaiter < I > > {
264
- let mut info = self . info . lock ( ) ;
265
- debug_assert ! ( !info. complete) ;
266
- // Remove the waiter from the list of waiters
267
- info. waiters . remove ( waiter)
268
- }
269
- }
270
-
271
- /// A resumable waiter of a query. The usize is the index into waiters in the query's latch
272
- type Waiter = ( QueryJobId , usize ) ;
273
-
274
- /// Visits all the non-resumable and resumable waiters of a query.
275
- /// Only waiters in a query are visited.
276
- /// `visit` is called for every waiter and is passed a query waiting on `query_ref`
277
- /// and a span indicating the reason the query waited on `query_ref`.
278
- /// If `visit` returns Some, this function returns.
279
- /// For visits of non-resumable waiters it returns the return value of `visit`.
280
- /// For visits of resumable waiters it returns Some(Some(Waiter)) which has the
281
- /// required information to resume the waiter.
282
- /// If all `visit` calls returns None, this function also returns None.
283
- fn visit_waiters < I , F > (
284
- query_map : & QueryMap < I > ,
285
- query : QueryJobId ,
286
- mut visit : F ,
287
- ) -> Option < Option < Waiter > >
288
- where
289
- F : FnMut ( Span , QueryJobId ) -> Option < Option < Waiter > > ,
290
- {
291
- // Visit the parent query which is a non-resumable waiter since it's on the same stack
292
- if let Some ( parent) = query. parent ( query_map) {
293
- if let Some ( cycle) = visit ( query. span ( query_map) , parent) {
294
- return Some ( cycle) ;
295
- }
296
- }
297
-
298
- // Visit the explicit waiters which use condvars and are resumable
299
- if let Some ( latch) = query. latch ( query_map) {
300
- for ( i, waiter) in latch. info . lock ( ) . waiters . iter ( ) . enumerate ( ) {
301
- if let Some ( waiter_query) = waiter. query {
302
- if visit ( waiter. span , waiter_query) . is_some ( ) {
303
- // Return a value which indicates that this waiter can be resumed
304
- return Some ( Some ( ( query, i) ) ) ;
305
- }
306
- }
307
- }
308
- }
309
-
310
- None
311
- }
312
-
313
- /// Look for query cycles by doing a depth first search starting at `query`.
314
- /// `span` is the reason for the `query` to execute. This is initially DUMMY_SP.
315
- /// If a cycle is detected, this initial value is replaced with the span causing
316
- /// the cycle.
317
- fn cycle_check < I > (
318
- query_map : & QueryMap < I > ,
319
- query : QueryJobId ,
320
- span : Span ,
321
- stack : & mut Vec < ( Span , QueryJobId ) > ,
322
- visited : & mut FxHashSet < QueryJobId > ,
323
- ) -> Option < Option < Waiter > > {
324
- if !visited. insert ( query) {
325
- return if let Some ( p) = stack. iter ( ) . position ( |q| q. 1 == query) {
326
- // We detected a query cycle, fix up the initial span and return Some
327
-
328
- // Remove previous stack entries
329
- stack. drain ( 0 ..p) ;
330
- // Replace the span for the first query with the cycle cause
331
- stack[ 0 ] . 0 = span;
332
- Some ( None )
333
- } else {
334
- None
335
- } ;
336
- }
337
-
338
- // Query marked as visited is added it to the stack
339
- stack. push ( ( span, query) ) ;
340
-
341
- // Visit all the waiters
342
- let r = visit_waiters ( query_map, query, |span, successor| {
343
- cycle_check ( query_map, successor, span, stack, visited)
344
- } ) ;
345
-
346
- // Remove the entry in our stack if we didn't find a cycle
347
- if r. is_none ( ) {
348
- stack. pop ( ) ;
349
- }
350
-
351
- r
352
- }
353
-
354
- /// Finds out if there's a path to the compiler root (aka. code which isn't in a query)
355
- /// from `query` without going through any of the queries in `visited`.
356
- /// This is achieved with a depth first search.
357
- fn connected_to_root < I > (
358
- query_map : & QueryMap < I > ,
359
- query : QueryJobId ,
360
- visited : & mut FxHashSet < QueryJobId > ,
361
- ) -> bool {
362
- // We already visited this or we're deliberately ignoring it
363
- if !visited. insert ( query) {
364
- return false ;
365
- }
366
-
367
- // This query is connected to the root (it has no query parent), return true
368
- if query. parent ( query_map) . is_none ( ) {
369
- return true ;
370
- }
371
-
372
- visit_waiters ( query_map, query, |_, successor| {
373
- connected_to_root ( query_map, successor, visited) . then_some ( None )
374
- } )
375
- . is_some ( )
376
- }
377
-
378
- // Deterministically pick an query from a list
379
- fn pick_query < ' a , I : Clone , T , F > ( query_map : & QueryMap < I > , queries : & ' a [ T ] , f : F ) -> & ' a T
380
- where
381
- F : Fn ( & T ) -> ( Span , QueryJobId ) ,
382
- {
383
- // Deterministically pick an entry point
384
- // FIXME: Sort this instead
385
- queries
386
- . iter ( )
387
- . min_by_key ( |v| {
388
- let ( span, query) = f ( v) ;
389
- let hash = query. query ( query_map) . hash ;
390
- // Prefer entry points which have valid spans for nicer error messages
391
- // We add an integer to the tuple ensuring that entry points
392
- // with valid spans are picked first
393
- let span_cmp = if span == DUMMY_SP { 1 } else { 0 } ;
394
- ( span_cmp, hash)
395
- } )
396
- . unwrap ( )
397
- }
398
-
399
- /// Looks for query cycles starting from the last query in `jobs`.
400
- /// If a cycle is found, all queries in the cycle is removed from `jobs` and
401
- /// the function return true.
402
- /// If a cycle was not found, the starting query is removed from `jobs` and
403
- /// the function returns false.
404
- fn remove_cycle < I : Clone > (
405
- query_map : & QueryMap < I > ,
406
- jobs : & mut Vec < QueryJobId > ,
407
- wakelist : & mut Vec < Arc < QueryWaiter < I > > > ,
408
- ) -> bool {
409
- let mut visited = FxHashSet :: default ( ) ;
410
- let mut stack = Vec :: new ( ) ;
411
- // Look for a cycle starting with the last query in `jobs`
412
- if let Some ( waiter) =
413
- cycle_check ( query_map, jobs. pop ( ) . unwrap ( ) , DUMMY_SP , & mut stack, & mut visited)
414
- {
415
- // The stack is a vector of pairs of spans and queries; reverse it so that
416
- // the earlier entries require later entries
417
- let ( mut spans, queries) : ( Vec < _ > , Vec < _ > ) = stack. into_iter ( ) . rev ( ) . unzip ( ) ;
418
-
419
- // Shift the spans so that queries are matched with the span for their waitee
420
- spans. rotate_right ( 1 ) ;
421
-
422
- // Zip them back together
423
- let mut stack: Vec < _ > = iter:: zip ( spans, queries) . collect ( ) ;
424
-
425
- // Remove the queries in our cycle from the list of jobs to look at
426
- for r in & stack {
427
- if let Some ( pos) = jobs. iter ( ) . position ( |j| j == & r. 1 ) {
428
- jobs. remove ( pos) ;
429
- }
430
- }
431
-
432
- // Find the queries in the cycle which are
433
- // connected to queries outside the cycle
434
- let entry_points = stack
435
- . iter ( )
436
- . filter_map ( |& ( span, query) | {
437
- if query. parent ( query_map) . is_none ( ) {
438
- // This query is connected to the root (it has no query parent)
439
- Some ( ( span, query, None ) )
440
- } else {
441
- let mut waiters = Vec :: new ( ) ;
442
- // Find all the direct waiters who lead to the root
443
- visit_waiters ( query_map, query, |span, waiter| {
444
- // Mark all the other queries in the cycle as already visited
445
- let mut visited = FxHashSet :: from_iter ( stack. iter ( ) . map ( |q| q. 1 ) ) ;
446
-
447
- if connected_to_root ( query_map, waiter, & mut visited) {
448
- waiters. push ( ( span, waiter) ) ;
449
- }
450
-
451
- None
452
- } ) ;
453
- if waiters. is_empty ( ) {
454
- None
455
- } else {
456
- // Deterministically pick one of the waiters to show to the user
457
- let waiter = * pick_query ( query_map, & waiters, |s| * s) ;
458
- Some ( ( span, query, Some ( waiter) ) )
459
- }
460
- }
461
- } )
462
- . collect :: < Vec < ( Span , QueryJobId , Option < ( Span , QueryJobId ) > ) > > ( ) ;
463
-
464
- // Deterministically pick an entry point
465
- let ( _, entry_point, usage) = pick_query ( query_map, & entry_points, |e| ( e. 0 , e. 1 ) ) ;
466
-
467
- // Shift the stack so that our entry point is first
468
- let entry_point_pos = stack. iter ( ) . position ( |( _, query) | query == entry_point) ;
469
- if let Some ( pos) = entry_point_pos {
470
- stack. rotate_left ( pos) ;
471
- }
472
-
473
- let usage = usage. as_ref ( ) . map ( |( span, query) | ( * span, query. query ( query_map) ) ) ;
474
-
475
- // Create the cycle error
476
- let error = CycleError {
477
- usage,
478
- cycle : stack
479
- . iter ( )
480
- . map ( |& ( s, ref q) | QueryInfo { span : s, query : q. query ( query_map) } )
481
- . collect ( ) ,
482
- } ;
483
-
484
- // We unwrap `waiter` here since there must always be one
485
- // edge which is resumable / waited using a query latch
486
- let ( waitee_query, waiter_idx) = waiter. unwrap ( ) ;
487
-
488
- // Extract the waiter we want to resume
489
- let waiter = waitee_query. latch ( query_map) . unwrap ( ) . extract_waiter ( waiter_idx) ;
490
-
491
- // Set the cycle error so it will be picked up when resumed
492
- * waiter. cycle . lock ( ) = Some ( error) ;
493
-
494
- // Put the waiter on the list of things to resume
495
- wakelist. push ( waiter) ;
496
-
497
- true
498
- } else {
499
- false
500
- }
501
- }
502
-
503
- /// Detects query cycles by using depth first search over all active query jobs.
504
- /// If a query cycle is found it will break the cycle by finding an edge which
505
- /// uses a query latch and then resuming that waiter.
506
- /// There may be multiple cycles involved in a deadlock, so this searches
507
- /// all active queries for cycles before finally resuming all the waiters at once.
508
- pub fn break_query_cycles < I : Clone + Debug > (
509
- query_map : QueryMap < I > ,
510
- registry : & rayon_core:: Registry ,
511
- ) {
512
- let mut wakelist = Vec :: new ( ) ;
513
- // It is OK per the comments:
514
- // - https://github.com/rust-lang/rust/pull/131200#issuecomment-2798854932
515
- // - https://github.com/rust-lang/rust/pull/131200#issuecomment-2798866392
516
- #[ allow( rustc:: potential_query_instability) ]
517
- let mut jobs: Vec < QueryJobId > = query_map. keys ( ) . cloned ( ) . collect ( ) ;
518
-
519
- let mut found_cycle = false ;
520
-
521
- while jobs. len ( ) > 0 {
522
- if remove_cycle ( & query_map, & mut jobs, & mut wakelist) {
523
- found_cycle = true ;
524
- }
525
- }
526
-
527
- // Check that a cycle was found. It is possible for a deadlock to occur without
528
- // a query cycle if a query which can be waited on uses Rayon to do multithreading
529
- // internally. Such a query (X) may be executing on 2 threads (A and B) and A may
530
- // wait using Rayon on B. Rayon may then switch to executing another query (Y)
531
- // which in turn will wait on X causing a deadlock. We have a false dependency from
532
- // X to Y due to Rayon waiting and a true dependency from Y to X. The algorithm here
533
- // only considers the true dependency and won't detect a cycle.
534
- if !found_cycle {
535
- panic ! (
536
- "deadlock detected as we're unable to find a query cycle to break\n \
537
- current query map:\n {:#?}",
538
- query_map
539
- ) ;
540
- }
541
-
542
- // Mark all the thread we're about to wake up as unblocked. This needs to be done before
543
- // we wake the threads up as otherwise Rayon could detect a deadlock if a thread we
544
- // resumed fell asleep and this thread had yet to mark the remaining threads as unblocked.
545
- for _ in 0 ..wakelist. len ( ) {
546
- rayon_core:: mark_unblocked ( registry) ;
547
- }
548
-
549
- for waiter in wakelist. into_iter ( ) {
550
- waiter. condvar . notify_one ( ) ;
551
- }
552
233
}
553
234
554
235
#[ inline( never) ]
0 commit comments