Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

import FsHistoryProvider._

private case class InvalidHistorySnapshotException(snapshot: Path, cause: Throwable)
extends IOException(s"History snapshot $snapshot is invalid.", cause)

// Interval between safemode checks.
private val SAFEMODE_CHECK_INTERVAL_S = conf.get(History.SAFEMODE_CHECK_INTERVAL_S)

Expand Down Expand Up @@ -423,7 +426,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
loadDiskStore(sm, appId, attempt)

case _ =>
createInMemoryStore(attempt)
createInMemoryStore(appId, attempt)
}
} catch {
case _: FileNotFoundException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) =>
Expand Down Expand Up @@ -1385,10 +1388,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

try {
val eventLogFiles = reader.listEventLogFiles
val startNs = System.nanoTime()
logInfo(log"Parsing ${MDC(PATH, reader.rootPath)} to re-build UI...")
parseAppEventLogs(eventLogFiles, replayBus, !reader.completed)
trackingStore.close(false)
logInfo(log"Finished parsing ${MDC(PATH, reader.rootPath)}")
val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs)
logInfo(s"Finished parsing ${reader.rootPath} in ${durationMs} ms")
} catch {
case e: Exception =>
Utils.tryLogNonFatalError {
Expand Down Expand Up @@ -1494,8 +1499,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

// At this point the disk data either does not exist or was deleted because it failed to
// load, so the event log needs to be replayed.
// At this point the disk data either does not exist or was deleted because it failed to load.
// Prefer a pre-materialized snapshot before falling back to event log replay.
HistorySnapshotStore.findSnapshot(conf, appId, attempt.info.attemptId).foreach { snapshot =>
try {
return createDiskStoreFromSnapshot(dm, appId, attempt, metadata, snapshot)
} catch {
case e: InvalidHistorySnapshotException =>
logInfo(s"Failed to import invalid snapshot for $appId/${attempt.info.attemptId}.", e)
HistorySnapshotStore.invalidateSnapshot(conf, appId, attempt.info.attemptId, snapshot)
case e: Exception =>
logInfo(s"Failed to import snapshot for $appId/${attempt.info.attemptId}.", e)
}
}

// No usable local store or snapshot exists, so the event log needs to be replayed.

// If the hybrid store is enabled, try it first and fail back to RocksDB store.
if (hybridStoreEnabled) {
Expand Down Expand Up @@ -1632,7 +1650,74 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
KVUtils.open(newStorePath, metadata, conf, live = false)
}

private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
private def createDiskStoreFromSnapshot(
dm: HistoryServerDiskManager,
appId: String,
attempt: AttemptInfoWrapper,
metadata: AppStatusStoreMetadata,
manifestPath: Path): KVStore = {
var newStorePath: File = null
val snapshotSize = try {
HistorySnapshotStore.snapshotSize(conf, manifestPath)
} catch {
case e: Exception =>
throw InvalidHistorySnapshotException(manifestPath, e)
}
while (newStorePath == null) {
val lease = dm.leaseExact(snapshotSize)
val startNs = System.nanoTime()
try {
Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf, live = false)) { store =>
try {
HistorySnapshotStore.restoreSnapshot(conf, store, manifestPath)
} catch {
case e: Exception =>
throw InvalidHistorySnapshotException(manifestPath, e)
}
}
newStorePath = lease.commit(appId, attempt.info.attemptId)
val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs)
logInfo(
s"Restored history snapshot $manifestPath for $appId/${attempt.info.attemptId} " +
s"into disk store in ${durationMs} ms " +
s"(${Utils.bytesToString(snapshotSize)})")
} catch {
case e: Exception =>
lease.rollback()
throw e
}
}

KVUtils.open(newStorePath, metadata, conf, live = false)
}

private def createInMemoryStore(appId: String, attempt: AttemptInfoWrapper): KVStore = {
HistorySnapshotStore.findSnapshot(conf, appId, attempt.info.attemptId).foreach { snapshot =>
val store = new InMemoryStore()
store.setMetadata(new AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION))
val startNs = System.nanoTime()
try {
HistorySnapshotStore.restoreSnapshot(conf, store, snapshot)
val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs)
val sizeString = try {
Utils.bytesToString(HistorySnapshotStore.snapshotSize(conf, snapshot))
} catch {
case NonFatal(e) =>
logWarning(s"Failed to measure history snapshot size at $snapshot.", e)
"unknown size"
}
logInfo(
s"Restored history snapshot $snapshot for $appId/${attempt.info.attemptId} " +
s"into in-memory store in ${durationMs} ms ($sizeString)")
return store
} catch {
case e: Exception =>
logInfo(s"Failed to import snapshot for $appId/${attempt.info.attemptId}.", e)
store.close()
HistorySnapshotStore.invalidateSnapshot(conf, appId, attempt.info.attemptId, snapshot)
}
}

var retried = false
var store: KVStore = null
while (store == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,30 @@ private class HistoryServerDiskManager(
* will still return `None` for the application.
*/
def lease(eventLogSize: Long, isCompressed: Boolean = false): Lease = {
val needed = approximateSize(eventLogSize, isCompressed)
makeRoom(needed)
leaseExact(approximateSize(eventLogSize, isCompressed))
}

/**
* Lease an exact number of bytes from the store.
*
* This is meant for callers that already know the number of bytes they need to reserve, rather
* than having to infer it from event log size heuristics.
*/
def leaseExact(size: Long): Lease = {
makeRoom(size)

val tmp = Utils.createTempDir(tmpStoreDir.getPath(), "appstore")
Utils.chmod700(tmp)

updateUsage(needed)
updateUsage(size)
val current = currentUsage.get()
if (current > maxUsage) {
logInfo(log"Lease of ${MDC(NUM_BYTES, Utils.bytesToString(needed))} may cause" +
logInfo(log"Lease of ${MDC(NUM_BYTES, Utils.bytesToString(size))} may cause" +
log" usage to exceed max (${MDC(NUM_BYTES_CURRENT, Utils.bytesToString(current))}" +
log" > ${MDC(NUM_BYTES_MAX, Utils.bytesToString(maxUsage))})")
}

new Lease(tmp, needed)
new Lease(tmp, size)
}

/**
Expand Down
Loading