Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] split should not happen immediately after fallback on memory contention (OOM state machine bug) #12158

Open
binmahone opened this issue Feb 18, 2025 · 4 comments · May be fixed by NVIDIA/spark-rapids-jni#2976
Assignees
Labels
bug Something isn't working reliability Features to improve reliability or bugs that severly impact the reliability of the plugin

Comments

@binmahone
Copy link
Collaborator

binmahone commented Feb 18, 2025

Describe the bug

Suppose we have two threads T0 and T1, both running a withRetryNoSplit block.
From my obersevation from logs, below is what seems to be happening:

  1. T0 and T1 simultanerously running, let's say they both greedy and exactly requires 100% of the memory
  2. T0 turn to THREAD_BLOCKED state, since T1 is contending memory with it
  3. T1 turn to THREAD_BLOCKED state, since T0 is contending memory with it.
  4. T1 turn to THREAD_BUFN state
  5. T0 turn to THREAD_BUFN state
  6. T0 turn to THREAD_SPLIT_THROW and do split and retry
  7. Since it's withRetryNoSplit, attempting to do split will fail the whole task

However, the expected behavior should be: "If we are at a point where everything is rolled back except for a single thread and it cannot make progress, then we ask it to try and make the input smaller and try again." (quoted from the design doc), so at step 6, shouldn't T0 at least try to run the code block again without doing split? At this time T0 is exlusively running, so it should be able to make progress (T0 can succeed if T1 is not contending memory with it).

Split should be the last resort if T0 fails again in the solo run.

Steps/Code to reproduce bug

The problem can be reproduced by adding a new test case into HostAllocSuite. It's worth mentioning that although this example is for HostAlloc, the same logic applies for GPU memory, as they both share the same underlying OOM state machines.


  test("split should not happen immediately after fallback on memory contention") {

    // It allocates a small piece of memory (1024) as a warmup, sleep 1s,
    // then allocates a large piece of memory
    class AllocOnAnotherThreadWithWarmup(override val thread: TaskThread,
        override val size: Long) extends AllocOnAnotherThread(thread, size) {

      override def doAlloc(): Void = {
        RmmRapidsRetryIterator.withRetryNoSplit {
          val warmup = HostAlloc.alloc(1024, preferPinned)
          Thread.sleep(1000)
          val tmp = HostAlloc.alloc(size, preferPinned)
          warmup.close()

          synchronized {
            closeOnExcept(tmp) { _ =>
              assert(b.isEmpty)
              b = Option(tmp)
            }
          }
        }
        null
      }
    }

    PinnedMemoryPool.initialize(0)
    HostAlloc.initialize(10 * 1024) // memory only big enough for the one concurrent thread to pass

    failAfter(Span(10, Seconds)) {
      val thread1 = new TaskThread("thread1", 1)
      thread1.initialize()
      val thread2 = new TaskThread("thread2", 2)
      thread2.initialize()

      try {
        // Actually it will require 10 * 1024 memory
        withResource(new AllocOnAnotherThreadWithWarmup(thread2, 9 * 1024)) { a =>

          // Actually it will require 10 * 1024 memory
          withResource(new AllocOnAnotherThreadWithWarmup(thread1, 9 * 1024)) { b =>

            // At this point, both threads are blocked because of memory contention,
            // we expect both thread1 and thread2 to fall back to BUFN state, and then thread1
            // should attempt another try before doing splitting

            b.waitForAlloc()
            b.assertAllocSize(9 * 1024)
            b.freeAndWait()
          }

          a.waitForAlloc()
          a.assertAllocSize(9 * 1024)
          a.freeAndWait()
        }


      } finally {
        thread1.done.get(1, TimeUnit.SECONDS)
        thread2.done.get(1, TimeUnit.SECONDS)
      }
    }
  }

will throw:

Image

Expected behavior

We expect both thread1 and thread2 to fall back to BUFN state, and then thread1 should attempt another try before doing splitting, succeed, and then thread2 also make a try and succeed.

Internally, we might need to add a state called THREAD_EXLU_RUNNING or THREAD_SOLO_RUNNING to the OOM state machine.

@binmahone binmahone added ? - Needs Triage Need team to review and classify bug Something isn't working reliability Features to improve reliability or bugs that severly impact the reliability of the plugin labels Feb 18, 2025
@binmahone binmahone self-assigned this Feb 18, 2025
@revans2
Copy link
Collaborator

revans2 commented Feb 18, 2025

Okay I reproduced the error with the transition log enabled and I understand the problem now. It looks like it is an issue with the test. When a rollback happens all of the memory needs to be freed or made spillable. AllocOnAnotherThreadWithWarnmup does not free warmup until after the larger alloc succeeds. If a rollback happens that memory is leaked.

Here is a summary of the log transitions with some notes.

THREAD_1 -> RUNNING // INIT THREAD
THREAD_2 -> RUNNING // INIT THREAD
THREAD_2 -> ALLOC // ALLOC 1k
THREAD_1 -> ALLOC // ALLOC 1k
THREAD_1 -> RUNNING // ALLOC 1k COMPLETE 9k free
THREAD_2 -> RUNNING // ALLOC 1k COMPLETE 8k free
THREAD_1 -> ALLOC // ALLOC 9k
THREAD_2 -> ALLOC // ALLOC 9k
THREAD_2 -> BLOCKED // ALLOC FAILED FOR 9k
THREAD_1 -> BLOCKED // ALLOC FAILED FOR 9k
THREAD_2 -> BUFN_THROW // ROLL BACK LOWEST PRIORITY THREAD
THREAD_2 -> BUFN_WAIT // WAIT FOR ROLL BACK TO COMPLETE
// THEAD 2 did not release anything here.
// There is a DEALLOC logged when a release happens.
// If a free had happened, then thread 1 should have been woken up
THREAD_2 -> BUFN // LOWEST PRIORITY THREAD IS BLOCKED
THREAD_1 -> BUFN_THROW // THREAD 1 CANNOT COMPLETE AS THERE IS NOT ENOUGH MEMORY SO ROLL BACK
THREAD_1 -> BUFN_WAIT // WAITING FOR THE FREEs to COMPLETE, BUT NOTHING IS FREED HERE EITHER
THREAD_1 -> BUFN // ALL THREADS ARE BLOCKED AND ROLLED BACK
THREAD_1 -> SPLIT_THROW // NEED TO DO A SPLIT AND RETRY NOW...

If I update the test to free tmp as a part of the rollback code, then it no longer throws a SPLIT_N_RETRY. Instead there is a timeout because thread 2 is in BUFN and that is not woken up until another task completes.

  test("split should not happen immediately after fallback on memory contention") {

    // It allocates a small piece of memory (1024) as a warmup, sleep 1s,
    // then allocates a large piece of memory
    class AllocOnAnotherThreadWithWarmup(override val thread: TaskThread,
                                         override val size: Long)
      extends AllocOnAnotherThread(thread, size) {

      override def doAlloc(): Void = {
        RmmRapidsRetryIterator.withRetryNoSplit {
          val warmup = HostAlloc.alloc(1024, preferPinned)
          val tmp = try {
            Thread.sleep(500)
            HostAlloc.alloc(size, preferPinned)
          } finally {
            warmup.close()
          }

          synchronized {
            closeOnExcept(tmp) { _ =>
              assert(b.isEmpty)
              b = Option(tmp)
            }
          }
        }
        null
      }
    }

    PinnedMemoryPool.initialize(0)
    HostAlloc.initialize(10 * 1024) // memory only big enough for the one concurrent thread to pass

    failAfter(Span(30, Seconds)) {
      val thread1 = new TaskThread("thread1", 1)
      thread1.initialize()
      val thread2 = new TaskThread("thread2", 2)
      thread2.initialize()

      try {
        // Actually it will require 10 * 1024 memory
        withResource(new AllocOnAnotherThreadWithWarmup(thread2, 9 * 1024)) { a =>

          // Actually it will require 10 * 1024 memory
          withResource(new AllocOnAnotherThreadWithWarmup(thread1, 9 * 1024)) { b =>

            // At this point, both threads are blocked because of memory contention,
            // we expect both thread1 and thread2 to fall back to BUFN state, and then thread1
            // should attempt another try before doing splitting


            System.err.println("WAIT FOR THREAD 1 ALLOC")
            b.waitForAlloc()
            b.assertAllocSize(9 * 1024)
            System.err.println("FREE THREAD 1")
            b.freeAndWait()
            // Thread 2 BUFN is not woken up until another task completes
            thread1.done.get(1, TimeUnit.SECONDS)
          }

          System.err.println("WAIT FOR THREAD 2 ALLOC")
          a.waitForAlloc()
          a.assertAllocSize(9 * 1024)
          System.err.println("FREE THREAD 2")
          a.freeAndWait()
        }


      } finally {
        //thread1.done.get(1, TimeUnit.SECONDS)
        thread2.done.get(1, TimeUnit.SECONDS)
      }
    }
  }

@revans2
Copy link
Collaborator

revans2 commented Feb 18, 2025

The reason this works is because THREAD_BUFN_THROW and THREAD_BUFN_WAIT are not treated as "blocked" states. So the task will not be blocked if it is in the middle of rolling back. Only when it completes rolling back and is in the THREAD_BUFN state are more threads considered for rollback, with the assumption that memory was freed when the rollback happened.

@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Feb 18, 2025
@binmahone
Copy link
Collaborator Author

binmahone commented Feb 18, 2025

Hi Bobby, the test case is an abstraction from my real world case. try {...} finally { warmup.close()} differs from my real case, the following modification might be a better showcase:

override def doAlloc(): Void = {
  RmmRapidsRetryIterator.withRetryNoSplit {

    // read shuffle data from remote and put it into Spillalble Framework
    val w = SpillableHostBuffer.apply(HostAlloc.alloc(1024, preferPinned),
      1024, SpillPriorities.ACTIVE_BATCHING_PRIORITY)

    Thread.sleep(1000)

    // let's say we will do sth with the shuffle data read, e.g. concat them
    // for simplicity, we just have one SpillableHostBuffer as input, in real case there
    // should be multiple SpillableHostBuffer as input
    withResource(w.getHostBuffer()) { _ =>
      // create another buffer as destination for concat
      val tmp = HostAlloc.alloc(size, preferPinned)
      synchronized {
        closeOnExcept(tmp) { _ =>
          assert(b.isEmpty)
          b = Option(tmp)
        }
      }
    }
    // at some point close w
    w.close()
  }
  null
}


The above code will thrown the same exception, the reason is that when thread2 fall back, it will not necessarily trigger its content in the SpillFramework, so thread1 will never see the DEALLOC event. One workaround would be making sure w is always closed when fallback happens, but it won't work for shuffle cases because you can't read it again after fallback

@revans2 revans2 self-assigned this Feb 19, 2025
@revans2
Copy link
Collaborator

revans2 commented Feb 19, 2025

I spoke with @binmahone and the code is complex enough that I will try and make some changes to the state machine to let us retry the allocation before we go to BUFN_THROW for the last thread. The alternative would be to update the state machine each time we make a buffer spillable.

@binmahone binmahone changed the title [BUG] split should not happen immediately after fallback on memory contention [BUG] split should not happen immediately after fallback on memory contention (OOM state machine bug) Feb 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants