Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ private void init(OzoneConfiguration configuration, SnapshotChainManager chainMa
this.scheduler.scheduleWithFixedDelay(
() -> {
try {
checkOrphanSnapshotVersions(omMetadataManager, chainManager);
checkQueuedOrphanSnapshotVersions(omMetadataManager, chainManager);
} catch (Exception e) {
LOG.error("Exception while checking orphan snapshot versions", e);
}
Expand All @@ -392,25 +392,135 @@ private void init(OzoneConfiguration configuration, SnapshotChainManager chainMa

}

private void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChainManager chainManager)
/**
* Drains the snapshots currently queued in {@link #snapshotToBeCheckedForOrphans}.
* Each queued snapshot becomes a seed for a same-run ancestor cascade.
*/
private void checkQueuedOrphanSnapshotVersions(OMMetadataManager metadataManager,
SnapshotChainManager chainManager)
throws IOException {
for (Map.Entry<UUID, Integer> entry : snapshotToBeCheckedForOrphans.entrySet()) {
UUID snapshotId = entry.getKey();
int countBeforeCheck = entry.getValue();
checkOrphanSnapshotVersions(metadataManager, chainManager, snapshotId);
decrementOrphanCheckCount(snapshotId, countBeforeCheck);
Set<UUID> processedSnapshotIds = new HashSet<>();
List<UUID> queuedSnapshotIds = getQueuedSnapshotIdsByCreationTimeDesc(metadataManager, chainManager);
if (queuedSnapshotIds.isEmpty()) {
return;
}
LOG.info("Draining orphan snapshot cleanup queue with {} seed snapshots",
queuedSnapshotIds.size());
for (UUID snapshotId : queuedSnapshotIds) {
cascadeOrphanSnapshotChecksFrom(metadataManager, chainManager, snapshotId, processedSnapshotIds);
}
LOG.debug("Finished orphan snapshot cleanup drain; {} snapshots remain queued for future runs",
snapshotToBeCheckedForOrphans.size());
}

@VisibleForTesting
void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChainManager chainManager,
UUID snapshotId) throws IOException {
/**
* Sorts the currently queued snapshot ids from newest to oldest using snapshot creation
* time when that metadata is available. This is only a seed-order optimization for the
* outer batch drain; the inner cascade still handles ancestors discovered during the run.
*/
private List<UUID> getQueuedSnapshotIdsByCreationTimeDesc(OMMetadataManager metadataManager,
SnapshotChainManager chainManager) throws IOException {
List<UUID> snapshotIds = new ArrayList<>(snapshotToBeCheckedForOrphans.keySet());
if (chainManager == null) {
LOG.debug("Snapshot chain manager unavailable; using queued orphan snapshot iteration order for {} snapshots",
snapshotIds.size());
return snapshotIds;
}
Table<String, SnapshotInfo> snapshotInfoTable = metadataManager.getSnapshotInfoTable();
if (snapshotInfoTable == null) {
LOG.debug("Snapshot info table unavailable; using queued orphan snapshot iteration order for {} snapshots",
snapshotIds.size());
return snapshotIds;
}

Map<UUID, Long> creationTimeBySnapshotId = new HashMap<>();
for (UUID snapshotId : snapshotIds) {
String tableKey = chainManager.getTableKey(snapshotId);
if (tableKey == null) {
LOG.debug("Snapshot {} is queued for orphan cleanup but has no snapshot info table key; "
+ "leaving it in fallback iteration order", snapshotId);
continue;
}
SnapshotInfo snapshotInfo = snapshotInfoTable.get(tableKey);
if (snapshotInfo != null) {
creationTimeBySnapshotId.put(snapshotId, snapshotInfo.getCreationTime());
} else {
LOG.debug("Snapshot {} is queued for orphan cleanup but snapshot info is missing for table key {};"
+ " leaving it in fallback iteration order", snapshotId, tableKey);
}
}

// Prefer newer queued snapshots as seeds so descendants already waiting in the batch
// are more likely to run before their ancestors.
snapshotIds.sort(Comparator.comparingLong(
snapshotId -> creationTimeBySnapshotId.getOrDefault(snapshotId, Long.MIN_VALUE)).reversed());
LOG.debug("Ordered {} queued orphan snapshot seeds by creation time using metadata for {} snapshots",
snapshotIds.size(), creationTimeBySnapshotId.size());
return snapshotIds;
}

/**
* Runs orphan cleanup starting from {@code snapshotId} and cascades toward older snapshots
* in the same call whenever removing data from the current snapshot may have unblocked its
* {@code previousSnapshotId}.
*
* <p>Callers that want an isolated one-off check should pass a fresh empty
* {@code processedSnapshotIds} set. {@link #checkQueuedOrphanSnapshotVersions(
* OMMetadataManager, SnapshotChainManager)} passes a shared set for the whole scheduler drain
* so a snapshot re-queued for retry is deferred to the next run instead of being checked twice
* in the same batch.
*
* <p>For example, if the chain is {@code S3 -> S2 -> S1} and checking {@code S3} removes
* orphan versions that make {@code S2} eligible, a single call seeded with {@code S3}
* will check {@code S3}, then {@code S2}, then {@code S1} in that order.
*/
void cascadeOrphanSnapshotChecksFrom(OMMetadataManager metadataManager,
SnapshotChainManager chainManager, UUID snapshotId, Set<UUID> processedSnapshotIds)
throws IOException {
for (UUID currentSnapshotId = snapshotId; currentSnapshotId != null;) {
// A descendant processed earlier in the same drain may already have checked
// this snapshot as an ancestor. If it re-queued itself for retry, leave that
// retry for the next scheduler iteration instead of checking it twice now.
if (!processedSnapshotIds.add(currentSnapshotId)) {
LOG.debug("Skipping snapshot {} because it was already processed earlier in this orphan cleanup pass",
currentSnapshotId);
return;
}
// Some callers invoke this method directly without first adding the snapshot to the
// orphan-check map, so only consume a queued count when one was present for this pass.
Integer countBeforeCheck = snapshotToBeCheckedForOrphans.get(currentSnapshotId);
if (countBeforeCheck == null) {
LOG.debug("Processing snapshot {} outside the shared orphan queue drain", currentSnapshotId);
}
UUID previousSnapshotIdToCheck =
checkSnapshotForOrphanVersionsOnce(metadataManager, chainManager, currentSnapshotId);
if (countBeforeCheck != null) {
decrementOrphanCheckCount(currentSnapshotId, countBeforeCheck);
}
// Only walk to the ancestor when this pass actually removed data from the current
// snapshot; otherwise we leave the ancestor to a later run or a direct trigger.
if (previousSnapshotIdToCheck != null) {
LOG.debug("Queueing previous snapshot {} after orphan cleanup removed data from snapshot {}",
previousSnapshotIdToCheck, currentSnapshotId);
}
currentSnapshotId = previousSnapshotIdToCheck;
}
}

/**
* Runs orphan cleanup for a single snapshot and returns the previous snapshot id when
* this pass removed at least one version, signaling that the ancestor may now be removable.
*/
private UUID checkSnapshotForOrphanVersionsOnce(OMMetadataManager metadataManager,
SnapshotChainManager chainManager, UUID snapshotId) throws IOException {
LOG.info("Checking orphan snapshot versions for snapshot {}", snapshotId);
try (WritableOmSnapshotLocalDataProvider snapshotLocalDataProvider = new WritableOmSnapshotLocalDataProvider(
snapshotId)) {
OmSnapshotLocalData snapshotLocalData = snapshotLocalDataProvider.getSnapshotLocalData();
UUID previousSnapshotId = snapshotLocalData.getPreviousSnapshotId();
boolean isSnapshotPurged = OmSnapshotManager.isSnapshotPurged(chainManager, metadataManager, snapshotId,
snapshotLocalData.getTransactionInfo());
boolean removedVersion = false;
for (Map.Entry<Integer, LocalDataVersionNode> integerLocalDataVersionNodeEntry : getVersionNodeMap()
.get(snapshotId).getSnapshotVersions().entrySet()) {
LocalDataVersionNode versionEntry = integerLocalDataVersionNodeEntry.getValue();
Expand All @@ -427,6 +537,7 @@ void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChai
"snapshotPurged: {}, inDegree : {}", snapshotId, versionEntry.getVersion(),
snapshotLocalData.getVersion(), isSnapshotPurged, localDataGraph.inDegree(versionEntry));
snapshotLocalDataProvider.removeVersion(versionEntry.getVersion());
removedVersion = true;
}
} finally {
internalLock.readLock().unlock();
Expand All @@ -435,9 +546,12 @@ void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChai
// If Snapshot is purged but not flushed completely to disk then this needs to wait for the next iteration
// which can be done by incrementing the orphan check count for the snapshotId.
if (!snapshotLocalData.getVersionSstFileInfos().isEmpty() && snapshotLocalData.getTransactionInfo() != null) {
LOG.debug("Snapshot {} still has {} local versions after orphan cleanup; queueing it for a future retry",
snapshotId, snapshotLocalData.getVersionSstFileInfos().size());
incrementOrphanCheckCount(snapshotId);
}
snapshotLocalDataProvider.commit();
return removedVersion ? previousSnapshotId : null;
}
}

Expand Down
Loading