@@ -1357,42 +1357,67 @@ Future<Void> TagPartitionedLogSystem::onKnownCommittedVersionChange() {
1357
1357
return waitForAny (result);
1358
1358
}
1359
1359
1360
- void TagPartitionedLogSystem::popLogRouter (
1361
- Version upTo,
1362
- Tag tag,
1363
- Version durableKnownCommittedVersion,
1364
- int8_t popLocality) { // FIXME: do not need to pop all generations of old logs
1360
+ void TagPartitionedLogSystem::popLogRouter (Version upTo,
1361
+ Tag tag,
1362
+ Version durableKnownCommittedVersion,
1363
+ int8_t popLocality) {
1365
1364
if (!upTo)
1366
1365
return ;
1367
- for (auto & t : tLogs) {
1368
- if (t->locality == popLocality) {
1369
- for (auto & log : t->logRouters ) {
1370
- Version prev = outstandingPops[std::make_pair (log ->get ().id (), tag)].first ;
1371
- if (prev < upTo)
1372
- outstandingPops[std::make_pair (log ->get ().id (), tag)] =
1373
- std::make_pair (upTo, durableKnownCommittedVersion);
1374
- if (prev == 0 ) {
1375
- popActors.add (popFromLog (
1376
- this , log , tag, 0.0 )); // Fast pop time because log routers can only hold 5 seconds of data.
1377
- }
1378
- }
1379
- }
1380
- }
1381
1366
1382
- for (auto & old : oldLogData) {
1383
- for (auto & t : old.tLogs ) {
1367
+ Version lastGenerationStartVersion = TagPartitionedLogSystem::getMaxLocalStartVersion (tLogs);
1368
+ if (upTo >= lastGenerationStartVersion) {
1369
+ for (auto & t : tLogs) {
1384
1370
if (t->locality == popLocality) {
1385
1371
for (auto & log : t->logRouters ) {
1386
1372
Version prev = outstandingPops[std::make_pair (log ->get ().id (), tag)].first ;
1387
- if (prev < upTo)
1373
+ if (prev < upTo) {
1388
1374
outstandingPops[std::make_pair (log ->get ().id (), tag)] =
1389
1375
std::make_pair (upTo, durableKnownCommittedVersion);
1390
- if (prev == 0 )
1391
- popActors.add (popFromLog (this , log , tag, 0.0 ));
1376
+ }
1377
+ if (prev == 0 ) {
1378
+ popActors.add (popFromLog (this ,
1379
+ log ,
1380
+ tag,
1381
+ /* delayBeforePop=*/ 0.0 ,
1382
+ /* popLogRouter=*/ true )); // Fast pop time because log routers can only
1383
+ // hold 5 seconds of data.
1384
+ }
1392
1385
}
1393
1386
}
1394
1387
}
1395
1388
}
1389
+
1390
+ Version nextGenerationStartVersion = lastGenerationStartVersion;
1391
+ for (const auto & old : oldLogData) {
1392
+ Version generationStartVersion = TagPartitionedLogSystem::getMaxLocalStartVersion (old.tLogs );
1393
+ if (generationStartVersion <= upTo) {
1394
+ for (auto & t : old.tLogs ) {
1395
+ if (t->locality == popLocality) {
1396
+ for (auto & log : t->logRouters ) {
1397
+ auto logRouterIdTagPair = std::make_pair (log ->get ().id (), tag);
1398
+
1399
+ // We pop the log router only if the popped version is within this generation's version range.
1400
+ // That is between the current generation's start version and the next generation's start
1401
+ // version.
1402
+ if (logRouterLastPops.find (logRouterIdTagPair) == logRouterLastPops.end () ||
1403
+ logRouterLastPops[logRouterIdTagPair] < nextGenerationStartVersion) {
1404
+ Version prev = outstandingPops[logRouterIdTagPair].first ;
1405
+ if (prev < upTo) {
1406
+ outstandingPops[logRouterIdTagPair] =
1407
+ std::make_pair (upTo, durableKnownCommittedVersion);
1408
+ }
1409
+ if (prev == 0 ) {
1410
+ popActors.add (
1411
+ popFromLog (this , log , tag, /* delayBeforePop=*/ 0.0 , /* popLogRouter=*/ true ));
1412
+ }
1413
+ }
1414
+ }
1415
+ }
1416
+ }
1417
+ }
1418
+
1419
+ nextGenerationStartVersion = generationStartVersion;
1420
+ }
1396
1421
}
1397
1422
1398
1423
void TagPartitionedLogSystem::popTxs (Version upTo, int8_t popLocality) {
@@ -1424,7 +1449,8 @@ void TagPartitionedLogSystem::pop(Version upTo, Tag tag, Version durableKnownCom
1424
1449
}
1425
1450
if (prev == 0 ) {
1426
1451
// pop tag from log upto version defined in outstandingPops[].first
1427
- popActors.add (popFromLog (this , log , tag, 1.0 )); // < FIXME: knob
1452
+ popActors.add (
1453
+ popFromLog (this , log , tag, /* delayBeforePop*/ 1.0 , /* popLogRouter=*/ false )); // < FIXME: knob
1428
1454
}
1429
1455
}
1430
1456
}
@@ -1434,10 +1460,11 @@ void TagPartitionedLogSystem::pop(Version upTo, Tag tag, Version durableKnownCom
1434
1460
ACTOR Future<Void> TagPartitionedLogSystem::popFromLog (TagPartitionedLogSystem* self,
1435
1461
Reference<AsyncVar<OptionalInterface<TLogInterface>>> log,
1436
1462
Tag tag,
1437
- double time) {
1463
+ double delayBeforePop,
1464
+ bool popLogRouter) {
1438
1465
state Version last = 0 ;
1439
1466
loop {
1440
- wait (delay (time , TaskPriority::TLogPop));
1467
+ wait (delay (delayBeforePop , TaskPriority::TLogPop));
1441
1468
1442
1469
// to: first is upto version, second is durableKnownComittedVersion
1443
1470
state std::pair<Version, Version> to = self->outstandingPops [std::make_pair (log ->get ().id (), tag)];
@@ -1453,6 +1480,10 @@ ACTOR Future<Void> TagPartitionedLogSystem::popFromLog(TagPartitionedLogSystem*
1453
1480
wait (log ->get ().interf ().popMessages .getReply (TLogPopRequest (to.first , to.second , tag),
1454
1481
TaskPriority::TLogPop));
1455
1482
1483
+ if (popLogRouter) {
1484
+ self->logRouterLastPops [std::make_pair (log ->get ().id (), tag)] = to.first ;
1485
+ }
1486
+
1456
1487
last = to.first ;
1457
1488
} catch (Error& e) {
1458
1489
if (e.code () == error_code_actor_cancelled)
@@ -2843,6 +2874,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
2843
2874
.detail (" OldLogRouterTags" , oldLogSystem->logRouterTags );
2844
2875
if (oldLogSystem->logRouterTags > 0 ||
2845
2876
logSystem->tLogs [0 ]->startVersion < oldLogSystem->knownCommittedVersion + 1 ) {
2877
+ // Use log routers to recover [knownCommittedVersion, recoveryVersion] from the old generation.
2846
2878
oldRouterRecruitment = TagPartitionedLogSystem::recruitOldLogRouters (oldLogSystem.getPtr (),
2847
2879
recr.oldLogRouters ,
2848
2880
recoveryCount,
0 commit comments