Skip to content

Commit 64ab2ad

Browse files
committed
review comments
1 parent 1f569e6 commit 64ab2ad

File tree

4 files changed

+27
-17
lines changed

4 files changed

+27
-17
lines changed

ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3566,6 +3566,7 @@ public void testInsertSnapshotIsolation() throws Exception {
35663566
swapTxnManager(txnMgr);
35673567

35683568
driver.run();
3569+
txnHandler.performWriteSetGC();
35693570
txnHandler.cleanTxnToWriteIdTable();
35703571
swapTxnManager(txnMgr2);
35713572

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public void markCleaned(CompactionInfo info) throws MetaException {
212212

213213
/**
214214
* Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
215-
* min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)).
215+
* min(max(TXNS.txn_id), min(WRITE_SET.WS_TXNID), min(Aborted TXNS.txn_id)).
216216
*/
217217
@Override
218218
@RetrySemantics.SafeToRetry

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@ Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long abortedT
715715

716716
/**
717717
* Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
718-
* min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)).
718+
* min(max(TXNS.txn_id), min(WRITE_SET.WS_TXNID), min(Aborted TXNS.txn_id)).
719719
*/
720720
@SqlRetry
721721
@Transactional(POOL_COMPACTOR)

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.hadoop.hive.metastore.txn.TxnStore;
4141
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
4242
import org.apache.hadoop.hive.metastore.txn.entities.TxnWriteDetails;
43+
import org.apache.hadoop.hive.metastore.txn.jdbc.InClauseBatchCommand;
4344
import org.apache.hadoop.hive.metastore.txn.jdbc.commands.DeleteReplTxnMapEntryCommand;
4445
import org.apache.hadoop.hive.metastore.txn.jdbc.commands.InsertCompletedTxnComponentsCommand;
4546
import org.apache.hadoop.hive.metastore.txn.jdbc.commands.RemoveTxnsFromMinHistoryLevelCommand;
@@ -70,6 +71,7 @@
7071
import java.util.Collections;
7172
import java.util.List;
7273
import java.util.Set;
74+
import java.util.function.Function;
7375
import java.util.stream.Collectors;
7476
import java.util.stream.IntStream;
7577

@@ -126,7 +128,7 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
126128
throw new RollbackException(null);
127129
}
128130
assert targetTxnIds.size() == 1;
129-
txnid = targetTxnIds.get(0);
131+
txnid = targetTxnIds.getFirst();
130132
}
131133

132134
/**
@@ -155,8 +157,8 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
155157
}
156158

157159
String conflictSQLSuffix = String.format("""
158-
FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = %d AND "TC_OPERATION_TYPE" IN (%s, %s)
159-
""", txnid, OperationType.UPDATE, OperationType.DELETE);
160+
FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND "TC_OPERATION_TYPE" IN (%s, %s)
161+
""", OperationType.UPDATE, OperationType.DELETE);
160162
long tempCommitId = TxnUtils.generateTemporaryId();
161163

162164
if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) {
@@ -176,7 +178,10 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
176178
""";
177179

178180
boolean isUpdateOrDelete = Boolean.TRUE.equals(jdbcResource.getJdbcTemplate().query(
179-
jdbcResource.getSqlGenerator().addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix),
181+
jdbcResource.getSqlGenerator()
182+
.addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix),
183+
new MapSqlParameterSource()
184+
.addValue("txnId", txnid),
180185
ResultSet::next));
181186

182187
if (isUpdateOrDelete) {
@@ -201,8 +206,8 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
201206
(txnType != TxnType.REBALANCE_COMPACTION) ? "" : " AND \"TC_OPERATION_TYPE\" <> :type")),
202207
new MapSqlParameterSource()
203208
.addValue("txnId", txnid)
204-
.addValue("commitId", tempCommitId)
205-
.addValue("type", OperationType.COMPACT.getSqlConst()));
209+
.addValue("type", OperationType.COMPACT.getSqlConst())
210+
.addValue("commitId", tempCommitId));
206211

207212
/**
208213
* This S4U will mutex with other commitTxn() and openTxns().
@@ -249,7 +254,7 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
249254
jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId",
250255
new MapSqlParameterSource()
251256
.addValue("txnId", txnid)
252-
.addValue("commitId", txnid));
257+
.addValue("commitId", jdbcResource.execute(new GetHighWaterMarkHandler())));
253258
}
254259
} else {
255260
/*
@@ -274,8 +279,7 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce
274279
jdbcResource.execute(new DeleteReplTxnMapEntryCommand(sourceTxnId, rqst.getReplPolicy()));
275280
}
276281
updateWSCommitIdAndCleanUpMetadata(jdbcResource, txnid, txnType, commitId, tempCommitId);
277-
jdbcResource.execute(new RemoveTxnsFromMinHistoryLevelCommand(ImmutableList.of(txnid)));
278-
jdbcResource.execute(new RemoveWriteIdsFromMinHistoryCommand(ImmutableList.of(txnid)));
282+
279283
if (rqst.isSetKeyValue()) {
280284
updateKeyValueAssociatedWithTxn(jdbcResource, rqst);
281285
}
@@ -562,11 +566,8 @@ private void updateKeyValueAssociatedWithTxn(MultiDataSourceJdbcResource jdbcRes
562566
}
563567
}
564568

565-
/**
566-
* See overridden method in CompactionTxnHandler also.
567-
*/
568-
private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbcResource, long txnid, TxnType txnType,
569-
Long commitId, long tempId) throws MetaException {
569+
private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbcResource,
570+
long txnid, TxnType txnType, Long commitId, long tempId) throws MetaException {
570571
List<String> queryBatch = new ArrayList<>(6);
571572
// update write_set with real commitId
572573
if (commitId != null) {
@@ -587,9 +588,17 @@ private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
587588
queryBatch.add("UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " + commitId + ", \"CQ_COMMIT_TIME\" = " +
588589
getEpochFn(jdbcResource.getDatabaseProduct()) + " WHERE \"CQ_TXN_ID\" = " + txnid);
589590
}
590-
591+
591592
// execute all in one batch
592593
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(queryBatch.toArray(new String[0]));
594+
595+
List<Function<List<Long>, InClauseBatchCommand<Long>>> commands = List.of(
596+
RemoveTxnsFromMinHistoryLevelCommand::new,
597+
RemoveWriteIdsFromMinHistoryCommand::new
598+
);
599+
for (Function<List<Long>, InClauseBatchCommand<Long>> cmd : commands) {
600+
jdbcResource.execute(cmd.apply(ImmutableList.of(txnid)));
601+
}
593602
}
594603

595604
/**

0 commit comments

Comments
 (0)