Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected void runAfterCatalogReady() {
}
return updated;
});
LOG.info("Check failure on be={}, times={}", failureTimes, failureTimes);
LOG.info("Check failure on be={}, times={}", id, failureTimes);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public class PublishVersionTask extends AgentTask {
private List<Long> errorTablets;

// tabletId => version, current version = 0
private Map<Long, Long> succTablets;
// Initialized to an empty map (not null) so that getSuccTablets() never returns null
// even when the task is force-finished without a real BE response.
private Map<Long, Long> succTablets = Maps.newHashMap();

/**
* To collect loaded rows for each tablet from each BE
Expand All @@ -59,7 +61,6 @@ public PublishVersionTask(long backendId, long transactionId, long dbId,
super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId, createTime);
this.transactionId = transactionId;
this.partitionVersionInfos = partitionVersionInfos;
this.succTablets = null;
this.errorTablets = new ArrayList<>();
this.isFinished = false;
}
Expand All @@ -84,7 +85,7 @@ public Map<Long, Long> getSuccTablets() {
}

public void setSuccTablets(Map<Long, Long> succTablets) {
this.succTablets = succTablets;
this.succTablets = (succTablets == null) ? Maps.newHashMap() : succTablets;
}

public synchronized List<Long> getErrorTablets() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,11 @@ private void checkReplicaContinuousVersionSucc(List<Long> subTxnIds, long alterW
boolean success = true;
for (int i = 0; i < subTxnIds.size(); i++) {
PublishVersionTask task = replicaPublishTasks.get(i);
success = (task != null && task.isFinished() && task.getSuccTablets().containsKey(tabletId)) || (
// Defensive null guard: AgentTaskCleanupDaemon may force-finish a PublishVersionTask
// without populating succTablets; MasterImpl.finishPublishVersion may also call
// setSuccTablets(null) on a non-OK BE response.
Map<Long, Long> succ = (task == null) ? null : task.getSuccTablets();
success = (task != null && task.isFinished() && succ != null && succ.containsKey(tabletId)) || (
replica.getState() == Replica.ReplicaState.ALTER && (!Config.publish_version_check_alter_replica
|| subTxnIds.get(i) < alterWaterschedTxnId || alterWaterschedTxnId == -1));
if (!success) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.task;

import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.Map;

/**
* Regression tests for the invariant: PublishVersionTask.getSuccTablets() must never return null,
* so that DatabaseTransactionMgr.checkReplicaContinuousVersionSucc cannot NPE when a task is
* force-finished without a real BE response (AgentTaskCleanupDaemon path) or finished from a
* non-OK BE callback (MasterImpl.finishPublishVersion path).
*/
public class PublishVersionTaskTest {

private PublishVersionTask newTask() {
return new PublishVersionTask(
/* backendId */ 10001L,
/* transactionId*/ 99L,
/* dbId */ 1L,
/* partitionVersionInfos */ null,
/* createTime */ System.currentTimeMillis());
}

/** Default constructor must yield a non-null succTablets. */
@Test
public void testDefaultSuccTabletsIsNotNull() {
PublishVersionTask task = newTask();
Assert.assertNotNull("succTablets must be non-null right after construction",
task.getSuccTablets());
Assert.assertTrue("succTablets must start empty", task.getSuccTablets().isEmpty());
// Should not NPE.
Assert.assertFalse(task.getSuccTablets().containsKey(1L));
}

/** setSuccTablets(null) must coerce to an empty map, not store null. */
@Test
public void testSetSuccTabletsNullCoercesToEmptyMap() {
PublishVersionTask task = newTask();
task.setSuccTablets(null);
Assert.assertNotNull(task.getSuccTablets());
Assert.assertTrue(task.getSuccTablets().isEmpty());
Assert.assertFalse(task.getSuccTablets().containsKey(123L));
}

/** A populated map must be returned as-is by the getter. */
@Test
public void testSetSuccTabletsKeepsValues() {
PublishVersionTask task = newTask();
Map<Long, Long> populated = ImmutableMap.of(1L, 100L, 2L, 200L);
task.setSuccTablets(populated);
Assert.assertEquals(populated, task.getSuccTablets());
Assert.assertTrue(task.getSuccTablets().containsKey(1L));
}

/**
* Simulate AgentTaskCleanupDaemon.removeInactiveBeAgentTasks: the daemon flips isFinished to
* true on every queued PublishVersionTask without ever calling setSuccTablets. Pre-fix this
* left succTablets at the constructor's null and any caller of getSuccTablets() NPE'd.
* After the fix, succTablets is a non-null empty map and downstream checks see
* "no tablet succeeded" instead of crashing.
*/
@Test
public void testForceFinishWithoutSetSuccTabletsDoesNotNpe() {
PublishVersionTask task = newTask();
task.setFinished(true);
// No setSuccTablets call — this is the AgentTaskCleanupDaemon code path.
Map<Long, Long> succ = task.getSuccTablets();
Assert.assertNotNull("getSuccTablets() must not return null even when force-finished", succ);
Assert.assertTrue(task.isFinished());
Assert.assertFalse(succ.containsKey(42L));
}

/**
* Simulate MasterImpl.finishPublishVersion on a non-OK BE response that does not set the
* succTablets field on the Thrift request. Pre-fix this stored null on the task; after the
* fix it stores an empty map.
*/
@Test
public void testFinishPublishVersionPathWithNullSuccTablets() {
PublishVersionTask task = newTask();
task.setSuccTablets(null); // emulates request.isSetSuccTablets() == false
task.setFinished(true); // matches MasterImpl ordering
Map<Long, Long> succ = task.getSuccTablets();
Assert.assertNotNull(succ);
Assert.assertEquals(Collections.emptyMap(), succ);
// The exact line that crashed pre-fix at DatabaseTransactionMgr.java:1478.
Assert.assertFalse(succ.containsKey(7L));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.transaction;

import org.apache.doris.catalog.LocalReplica;
import org.apache.doris.catalog.Replica;
import org.apache.doris.task.PublishVersionTask;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Set;

/**
* Regression test: even if a PublishVersionTask somehow has a null succTablets field (e.g. via a
* future regression that re-introduces the AgentTaskCleanupDaemon force-finish path),
* DatabaseTransactionMgr.checkReplicaContinuousVersionSucc must not throw NPE. It must treat the
* replica as "publish not yet succeeded for this tablet" and route it through the normal
* error/version-failed branches.
*/
public class CheckReplicaContinuousVersionSuccTest {

private static final long BACKEND_ID = 10001L;
private static final long TXN_ID = 99L;
private static final long DB_ID = 1L;
private static final long TABLET_ID = 3001L;
private static final long REPLICA_ID = 2001L;

private PublishVersionTask newFinishedTaskWithNullSuccTablets() throws Exception {
PublishVersionTask task = new PublishVersionTask(BACKEND_ID, TXN_ID, DB_ID,
/* partitionVersionInfos */ null, System.currentTimeMillis());
task.setFinished(true);
// Force succTablets to null via reflection. After the constructor-level fix the field
// is initialized to an empty map, so we simulate the pre-fix bad state to independently
// exercise the call-site null guard in checkReplicaContinuousVersionSucc.
Field f = PublishVersionTask.class.getDeclaredField("succTablets");
f.setAccessible(true);
f.set(task, null);
Assert.assertNull("precondition: succTablets must be null for this test",
task.getSuccTablets());
return task;
}

private void invokeCheck(Set<Long> errorReplicaIds,
List<Replica> tabletSuccReplicas,
List<Replica> tabletWriteFailedReplicas,
List<Replica> tabletVersionFailedReplicas,
PublishVersionTask task,
Replica replica,
long minReplicaVersion, long maxReplicaVersion) throws Exception {
DatabaseTransactionMgr mgr =
Mockito.mock(DatabaseTransactionMgr.class, Mockito.CALLS_REAL_METHODS);
Method m = DatabaseTransactionMgr.class.getDeclaredMethod(
"checkReplicaContinuousVersionSucc",
List.class, long.class, long.class,
Replica.class, long.class, long.class,
List.class, Set.class, List.class, List.class, List.class);
m.setAccessible(true);
try {
m.invoke(mgr,
Lists.newArrayList(TXN_ID), -1L, TABLET_ID,
replica, minReplicaVersion, maxReplicaVersion,
Lists.newArrayList(task), errorReplicaIds,
tabletSuccReplicas, tabletWriteFailedReplicas, tabletVersionFailedReplicas);
} catch (InvocationTargetException ite) {
if (ite.getCause() instanceof NullPointerException) {
Assert.fail("checkReplicaContinuousVersionSucc threw NPE on null succTablets: "
+ ite.getCause());
}
throw ite;
}
}

/**
* Negative case: task.isFinished() is true but task.getSuccTablets() is null. Pre-fix this
* NPE'd at line 1478. Post-fix it must treat the replica as a write-failure (or similar
* non-success branch) without throwing.
*/
@Test
public void testNoNpeWhenSuccTabletsIsNull() throws Exception {
PublishVersionTask task = newFinishedTaskWithNullSuccTablets();
Replica replica = new LocalReplica(REPLICA_ID, BACKEND_ID, /*version*/100L, /*schemaHash*/0,
/*dataSize*/0L, /*remoteDataSize*/0L, /*rowCount*/0L,
Replica.ReplicaState.NORMAL, /*lastFailedVersion*/-1L, /*lastSuccessVersion*/100L);

Set<Long> errorReplicaIds = Sets.newHashSet();
List<Replica> tabletSuccReplicas = Lists.newArrayList();
List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();

// replica.version (100) < maxReplicaVersion (101) → after the failure branch,
// replica should land in tabletWriteFailedReplicas.
invokeCheck(errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas, task, replica, /*minReplicaVersion*/100L,
/*maxReplicaVersion*/101L);

Assert.assertTrue("replica should be classified as write-failed when succTablets is null",
tabletWriteFailedReplicas.contains(replica));
Assert.assertTrue(tabletSuccReplicas.isEmpty());
Assert.assertTrue(tabletVersionFailedReplicas.isEmpty());
}

/**
* Positive case: task.isFinished() is true, succTablets contains the tablet — replica must
* be treated as success and removed from errorReplicaIds.
*/
@Test
public void testHappyPathWhenSuccTabletsContainsTabletId() throws Exception {
PublishVersionTask task = new PublishVersionTask(BACKEND_ID, TXN_ID, DB_ID, null,
System.currentTimeMillis());
task.setFinished(true);
java.util.Map<Long, Long> populated = new java.util.HashMap<>();
populated.put(TABLET_ID, 100L);
task.setSuccTablets(populated);

Replica replica = new LocalReplica(REPLICA_ID, BACKEND_ID, /*version*/100L, /*schemaHash*/0,
/*dataSize*/0L, /*remoteDataSize*/0L, /*rowCount*/0L,
Replica.ReplicaState.NORMAL, /*lastFailedVersion*/-1L, /*lastSuccessVersion*/100L);

Set<Long> errorReplicaIds = Sets.newHashSet(REPLICA_ID); // pretend it was tagged earlier
List<Replica> tabletSuccReplicas = Lists.newArrayList();
List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();

invokeCheck(errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas, task, replica, /*minReplicaVersion*/100L,
/*maxReplicaVersion*/100L);

Assert.assertFalse("happy path must clear the replica from errorReplicaIds",
errorReplicaIds.contains(REPLICA_ID));
Assert.assertTrue("happy path must add the replica to tabletSuccReplicas",
tabletSuccReplicas.contains(replica));
}

/**
* Task is null in the list (older code path observed in production logs). Should be treated
* as a failure without NPE.
*/
@Test
public void testNoNpeWhenTaskIsNull() throws Exception {
Replica replica = new LocalReplica(REPLICA_ID, BACKEND_ID, /*version*/100L, /*schemaHash*/0,
/*dataSize*/0L, /*remoteDataSize*/0L, /*rowCount*/0L,
Replica.ReplicaState.NORMAL, /*lastFailedVersion*/-1L, /*lastSuccessVersion*/100L);

Set<Long> errorReplicaIds = Sets.newHashSet();
List<Replica> tabletSuccReplicas = Lists.newArrayList();
List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();

invokeCheck(errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas, /*task*/null, replica, 100L, 101L);

Assert.assertTrue(tabletWriteFailedReplicas.contains(replica));
}
}
Loading
Loading