Skip to content

[FLINK-37008] [runtime-web] Flink UI should show the type of checkpoint (full vs incremental) #25899

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
3 changes: 3 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,9 @@
"additionalProperties" : {
"type" : "integer"
}
},
"pending-operators" : {
"type" : "integer"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export interface Checkpoint {
id: number;
status: string;
is_savepoint: boolean;
is_full: boolean;
trigger_timestamp: number;
latest_ack_timestamp: number;
state_size: number;
Expand All @@ -55,6 +56,7 @@ export interface Checkpoint {
id: number;
restore_timestamp: number;
is_savepoint: boolean;
is_full: boolean;
external_path: string;
};
history: CheckpointHistory;
Expand All @@ -65,6 +67,7 @@ export interface CheckpointHistory {
id: number;
status: string;
is_savepoint: boolean;
is_full: boolean;
trigger_timestamp: number;
latest_ack_timestamp: number;
state_size: number;
Expand All @@ -91,6 +94,7 @@ export interface CheckpointCompletedStatistics {
id: number;
status: string;
is_savepoint: boolean;
is_full: boolean;
trigger_timestamp: number;
latest_ack_timestamp: number;
state_size: number;
Expand Down Expand Up @@ -142,6 +146,7 @@ export interface CheckpointDetail {
id: number;
status: string;
is_savepoint: boolean;
is_full: boolean;
savepointFormat: string;
trigger_timestamp: number;
latest_ack_timestamp: number;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
{{ checkPointDetail?.discarded || '-' }}
<nz-divider nzType="vertical"></nz-divider>
<strong>Checkpoint Type:</strong>
{{ checkPointType }}
{{ checkPointType }} ({{ checkPointDetail?.is_full ? 'full' : 'incremental' }})
<ng-container *ngIf="checkPointDetail?.savepointFormat">
<nz-divider nzType="vertical"></nz-divider>
<strong>Savepoint Format:</strong>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,8 @@ private Optional<CheckpointTriggerRequest> chooseRequestToExecute(
// Returns true if the checkpoint is successfully completed, false otherwise.
private boolean maybeCompleteCheckpoint(PendingCheckpoint checkpoint) {
synchronized (lock) {
if (checkpoint.isFullyAcknowledged()) {
if (checkpoint.isFullyAcknowledged()
&& checkpoint.getCheckpointType().isTypeResolved()) {
try {
// we need to check inside the lock for being shutdown as well,
// otherwise we get races and invalid error log messages.
Expand Down Expand Up @@ -2449,4 +2450,20 @@ private void reportFinishedTasks(
private PendingCheckpointStats getStatsCallback(PendingCheckpoint pendingCheckpoint) {
return statsTracker.getPendingCheckpointStats(pendingCheckpoint.getCheckpointID());
}

/**
* Reports the resolution of a checkpoint type from a subtask.
*
* @param checkpointId ID of the checkpoint
* @param executionAttemptID ID of the reporting task
* @param isFull Whether the checkpoint was determined to be full
*/
public void reportCheckpointTypeResolution(
long checkpointId, ExecutionAttemptID executionAttemptID, boolean isFull) {

PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null) {
checkpoint.getCheckpointType().resolveType(isFull);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,16 @@ public boolean isSavepoint() {
return checkpointType.isSavepoint();
}

/**
* Returns whether the checkpoint properties describe a full checkpoint.
*
* @return <code>true</code> if the properties describes a full checkpoint, <code>false
* </code> otherwise.
*/
public boolean isFull() {
return checkpointType.isFull();
}

/**
* Returns whether the checkpoint properties describe a synchronous savepoint/checkpoint.
*
Expand Down Expand Up @@ -283,6 +293,39 @@ public String toString() {
false, // Retain on suspension
false);

private static final CheckpointProperties FULL_CHECKPOINT_NEVER_RETAINED =
new CheckpointProperties(
false,
CheckpointType.FULL_CHECKPOINT,
true,
true, // Delete on success
true, // Delete on cancellation
true, // Delete on failure
true, // Delete on suspension
false);

private static final CheckpointProperties FULL_CHECKPOINT_RETAINED_ON_FAILURE =
new CheckpointProperties(
false,
CheckpointType.FULL_CHECKPOINT,
true,
true, // Delete on success
true, // Delete on cancellation
false, // Retain on failure
true, // Delete on suspension
false);

private static final CheckpointProperties FULL_CHECKPOINT_RETAINED_ON_CANCELLATION =
new CheckpointProperties(
false,
CheckpointType.FULL_CHECKPOINT,
true,
true, // Delete on success
false, // Retain on cancellation
false, // Retain on failure
false, // Retain on suspension
false);

Comment on lines +296 to +328
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why those are introduced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are only used for tests
We don't really have an test suite for testing this kind of thing

I'm open to ideas for where to add tests
I've thought about
SnapshotUtilsTest
and
CheckpointPropertiesTest

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see.

I read through the CheckpointCoordinator and it seems the JM will always propagate incremental checkpoints periodically (not the ones triggered by Rest API). But actually the TM will do incremental or full ones by configuration it reads (create RocksNativeFullSnapshotStrategy or RocksIncrementalSnapshotStrategy when state backend build). Meaning that in UI will always show incremental cps even if we disable that. Am I right? If so, we should make some change in CheckpointCoordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree looking at the CheckpointCoordinator.

this.checkpointProperties =
CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy());

Specifically, the issue is that Checkpoint can be used for both full or incremental.

/** A checkpoint, full or incremental. */
public static final CheckpointType CHECKPOINT =
new CheckpointType("Checkpoint", SharingFilesStrategy.FORWARD_BACKWARD);

We'd have to move the evaluation of execution.checkpointing.incremental or change how we are determining if a Checkpoint is Full 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought a possible solution is to evaluate execution.checkpointing.incremental in CheckpointCoordinator and set checkpointProperties properly. WDTY?

/**
* Creates the checkpoint properties for a (manually triggered) savepoint.
*
Expand Down Expand Up @@ -362,4 +405,27 @@ public static CheckpointProperties forCheckpoint(CheckpointRetentionPolicy polic
throw new IllegalArgumentException("unknown policy: " + policy);
}
}

/**
* Creates the checkpoint properties for a full checkpoint.
*
* <p>Checkpoints may be queued in case too many other checkpoints are currently happening. They
* are garbage collected automatically, except when the owning job terminates in state {@link
* JobStatus#FAILED}. The user is required to configure the clean up behaviour on job
* cancellation.
*
* @return Checkpoint properties for an external full checkpoint.
*/
public static CheckpointProperties forFullCheckpoint(CheckpointRetentionPolicy policy) {
switch (policy) {
case NEVER_RETAIN_AFTER_TERMINATION:
return FULL_CHECKPOINT_NEVER_RETAINED;
case RETAIN_ON_FAILURE:
return FULL_CHECKPOINT_RETAINED_ON_FAILURE;
case RETAIN_ON_CANCELLATION:
return FULL_CHECKPOINT_RETAINED_ON_CANCELLATION;
default:
throw new IllegalArgumentException("unknown policy: " + policy);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,32 @@
/** The type of checkpoint to perform. */
@Internal
public final class CheckpointType implements SnapshotType {
private volatile boolean typeResolved = false;
private volatile boolean isFull = false;

/** A checkpoint, full or incremental. */
public static final CheckpointType CHECKPOINT =
new CheckpointType("Checkpoint", SharingFilesStrategy.FORWARD_BACKWARD);

/** A checkpoint that is explicitly marked as full */
public static final CheckpointType FULL_CHECKPOINT =
new CheckpointType("Full Checkpoint", SharingFilesStrategy.FORWARD);
new CheckpointType("Full Checkpoint", SharingFilesStrategy.FORWARD, true);

private final String name;

private final SharingFilesStrategy sharingFilesStrategy;

private CheckpointType(final String name, SharingFilesStrategy sharingFilesStrategy) {
private CheckpointType(String name, SharingFilesStrategy sharingFilesStrategy) {
this(name, sharingFilesStrategy, false);
}

private CheckpointType(
String name, SharingFilesStrategy sharingFilesStrategy, boolean preResolvedAsFull) {
this.name = name;
this.sharingFilesStrategy = sharingFilesStrategy;
if (preResolvedAsFull) {
this.isFull = true;
this.typeResolved = true;
}
}

public boolean isSavepoint() {
Expand All @@ -54,6 +65,21 @@ public SharingFilesStrategy getSharingFilesStrategy() {
return sharingFilesStrategy;
}

public boolean isFull() {
return typeResolved && isFull;
}

public void resolveType(boolean isFull) {
if (!typeResolved) {
this.isFull = isFull;
this.typeResolved = true;
}
}

public boolean isTypeResolved() {
return typeResolved;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -63,12 +89,15 @@ public boolean equals(Object o) {
return false;
}
CheckpointType type = (CheckpointType) o;
return name.equals(type.name) && sharingFilesStrategy == type.sharingFilesStrategy;
return typeResolved == type.typeResolved
&& isFull == type.isFull
&& name.equals(type.name)
&& sharingFilesStrategy == type.sharingFilesStrategy;
}

@Override
public int hashCode() {
return Objects.hash(name, sharingFilesStrategy);
return Objects.hash(name, sharingFilesStrategy, typeResolved, isFull);
}

@Override
Expand All @@ -79,6 +108,10 @@ public String toString() {
+ '\''
+ ", sharingFilesStrategy="
+ sharingFilesStrategy
+ ", typeResolved="
+ typeResolved
+ ", isFull="
+ isFull
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public interface SnapshotType extends Serializable {

boolean isSavepoint();

default boolean isFull() {
return getSharingFilesStrategy() != SharingFilesStrategy.FORWARD_BACKWARD;
}

String getName();

SharingFilesStrategy getSharingFilesStrategy();
Expand Down
Loading