Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,28 @@ private class HistoryServerDiskManager(
}

oldSizeOpt.foreach { oldSize =>
val path = appStorePath(appId, attemptId)
updateUsage(-oldSize, committed = true)
if (path.isDirectory()) {
if (delete) {
deleteStore(path)
} else {
val newSize = sizeOf(path)
val newInfo = listing.read(classOf[ApplicationStoreInfo], path.getAbsolutePath())
.copy(size = newSize)
listing.write(newInfo)
updateUsage(newSize, committed = true)
}

// Apply the store operation regardless of whether the app was in the active map, since
// the store directory may still exist on disk (e.g., when the app was never opened after
// a restart).
val path = appStorePath(appId, attemptId)
if (path.isDirectory()) {
if (delete) {
// If the app was not actively tracked, its size was not deducted above; do it now.
if (oldSizeOpt.isEmpty) {
val size = sizeOf(path)
updateUsage(-size, committed = true)
}
deleteStore(path)
} else if (oldSizeOpt.isDefined) {
// Re-measure the size since the store may have changed while it was open.
val newSize = sizeOf(path)
val newInfo = listing.read(classOf[ApplicationStoreInfo], path.getAbsolutePath())
.copy(size = newSize)
listing.write(newInfo)
updateUsage(newSize, committed = true)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,38 @@ abstract class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAn
assert(store.read(classOf[ApplicationStoreInfo], dstC.getAbsolutePath).size === 2)
}

test("SPARK-56044: release with delete cleans up store when app is not in active map") {
val manager = mockManager()
val leaseA = manager.lease(2)
doReturn(2L).when(manager).sizeOf(meq(leaseA.tmpPath))
val dstPathA = manager.appStorePath("app1", None)
doReturn(2L).when(manager).sizeOf(meq(dstPathA))
val dst = leaseA.commit("app1", None)
assert(dst.exists())
assert(manager.committed() === 2)

// Release without deleting; the store remains on disk and in the listing.
doReturn(2L).when(manager).sizeOf(meq(dst))
manager.release("app1", None)
assert(dst.exists())
assert(manager.committed() === 2)

// Simulate: service restarts, new disk manager is initialized with an empty active map.
val manager2 = mockManager()
doReturn(2L).when(manager2).sizeOf(meq(dst))
doReturn(2L).when(manager2).sizeOf(meq(new File(testDir, "apps")))
manager2.initialize()
assert(dst.exists())
assert(store.read(classOf[ApplicationStoreInfo], dst.getAbsolutePath).size === 2)

// Delete via release; the app was never opened in manager2 so it is not in the active map.
manager2.release("app1", None, delete = true)
assert(!dst.exists())
assert(!store.view(classOf[ApplicationStoreInfo]).iterator().hasNext)
assert(manager2.committed() === 0)
assert(manager2.free() === MAX_USAGE)
}

test("SPARK-38095: appStorePath should use backend extensions") {
val conf = new SparkConf().set(HYBRID_STORE_DISK_BACKEND, backend.toString)
val manager = new HistoryServerDiskManager(conf, testDir, store, new ManualClock())
Expand Down