Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@ import scala.concurrent.{ExecutionContext, Future}
/** Holds information about a hard domain migration
*
* @param currentMigrationId The migration id of the current incarnation of the global domain.
* @param acsRecordTime The record time at which the ACS snapshot was taken on the previous
* incarnation of the global domain.
* None if this is the first incarnation of the global domain.
*/
final case class DomainMigrationInfo(
currentMigrationId: Long,
acsRecordTime: Option[CantonTimestamp],
migrationTimeInfo: Option[MigrationTimeInfo],
)

/** @param acsRecordTime The record time at which the ACS snapshot was taken on the previous
* incarnation of the global domain.
* None if this is the first incarnation of the global domain.
* @param synchronizerWasPaused True when we ran through a proper migration, false for disaster recovery
*/
final case class MigrationTimeInfo(
acsRecordTime: CantonTimestamp,
synchronizerWasPaused: Boolean,
)

object DomainMigrationInfo {
Expand All @@ -34,9 +41,13 @@ object DomainMigrationInfo {
connection.ensureUserMetadataAnnotation(
user,
Map(
BaseLedgerConnection.DOMAIN_MIGRATION_ACS_RECORD_TIME_METADATA_KEY -> info.acsRecordTime
BaseLedgerConnection.DOMAIN_MIGRATION_ACS_RECORD_TIME_METADATA_KEY -> info.migrationTimeInfo
.map(_.acsRecordTime)
.fold("*")(_.toProtoPrimitive.toString),
BaseLedgerConnection.DOMAIN_MIGRATION_CURRENT_MIGRATION_ID_METADATA_KEY -> info.currentMigrationId.toString,
BaseLedgerConnection.DOMAIN_MIGRATION_DOMAIN_WAS_PAUSED_KEY -> info.migrationTimeInfo
.map(_.synchronizerWasPaused)
.fold("*")(_.toString),
),
RetryFor.WaitingOnInitDependency,
)
Expand Down Expand Up @@ -68,9 +79,22 @@ object DomainMigrationInfo {
BaseLedgerConnection.DOMAIN_MIGRATION_CURRENT_MIGRATION_ID_METADATA_KEY,
)
.map(_.toLong)
synchronizerWasPaused <- connection
.waitForUserMetadata(
user,
BaseLedgerConnection.DOMAIN_MIGRATION_DOMAIN_WAS_PAUSED_KEY,
)
.map {
case "*" => None
case s =>
Some(s.toBoolean)
}
} yield DomainMigrationInfo(
currentMigrationId = currentMigrationId,
acsRecordTime = acsRecordTime,
migrationTimeInfo = for {
recordTime <- acsRecordTime
wasPaused <- synchronizerWasPaused
} yield MigrationTimeInfo(recordTime, wasPaused),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -737,18 +737,82 @@ class UpdateHistory(
historyId: Long
)(implicit tc: TraceContext): Future[Unit] = {
val previousMigrationId = domainMigrationInfo.currentMigrationId - 1
domainMigrationInfo.acsRecordTime match {
case Some(acsRecordTime) =>
domainMigrationInfo.migrationTimeInfo match {
case Some(info) =>
for {
_ <- deleteAcsSnapshotsAfter(historyId, previousMigrationId, acsRecordTime)
_ <- deleteRolledBackUpdateHistory(historyId, previousMigrationId, acsRecordTime)
_ <-
if (info.synchronizerWasPaused) {
for {
_ <- verifyNoRolledBackAcsSnapshots(
historyId,
previousMigrationId,
info.acsRecordTime,
)
_ <- verifyNoRolledBackData(historyId, previousMigrationId, info.acsRecordTime)
} yield ()
} else {
for {
_ <- deleteAcsSnapshotsAfter(historyId, previousMigrationId, info.acsRecordTime)
_ <- deleteRolledBackUpdateHistory(
historyId,
previousMigrationId,
info.acsRecordTime,
)
} yield ()
}
} yield ()
case _ =>
logger.debug("No previous domain migration, not checking or deleting updates")
Future.unit
}
}

private[this] def verifyNoRolledBackData(
historyId: Long, // Not using the storeId from the state, as the state might not be updated yet
migrationId: Long,
recordTime: CantonTimestamp,
)(implicit tc: TraceContext): Future[Unit] = {
val action = DBIO
.sequence(
Seq(
sql"""
select count(*) from update_history_creates
Copy link
Contributor Author

@moritzkiefer-da moritzkiefer-da Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the queries are identical to the delete variant so essentially the same query plan as well

 Finalize Aggregate  (cost=959598.74..959598.75 rows=1 width=8)
   ->  Gather  (cost=959598.52..959598.73 rows=2 width=8)
         Workers Planned: 2
         ->  Partial Aggregate  (cost=958598.52..958598.53 rows=1 width=8)
               ->  Parallel Index Only Scan using updt_hist_crea_hi_mi_rt on update_history_creates  (cost=0.57..953136.30 rows=2184889 width=0)
                     Index Cond: ((history_id = 2) AND (migration_id = 6) AND (record_time > '1760054400000000'::bigint))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELECT EXISTS(SELECT 1 FROM ... WHERE ...)

is probably even faster, but I don't think it's worth optimizing for maximum speed here since AFAICT we're only running these queries on actual migration/DR procedures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah also the counts are useful in logs imho. So I'd lean towards keeping it as it is.

where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
""".as[Long].head,
sql"""
select count(*) from update_history_exercises
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
""".as[Long].head,
sql"""
select count(*) from update_history_transactions
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
""".as[Long].head,
sql"""
select count(*) from update_history_assignments
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
""".as[Long].head,
sql"""
select count(*) from update_history_unassignments
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
""".as[Long].head,
)
)
.map(rows =>
if (rows.sum > 0) {
throw new IllegalStateException(
s"Found $rows rows for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime, " +
"but the configuration says the domain was paused during the migration. " +
"Check the domain migration configuration and the content of the update history database."
)
} else {
logger.debug(
s"No updates found for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime"
)
}
)
storage.query(action, "verifyNoRolledBackData")
}

private[this] def deleteRolledBackUpdateHistory(
historyId: Long, // Not using the storeId from the state, as the state might not be updated yet
migrationId: Long,
Expand Down Expand Up @@ -843,6 +907,37 @@ class UpdateHistory(
)
}

def verifyNoRolledBackAcsSnapshots(
historyId: Long,
migrationId: Long,
recordTime: CantonTimestamp,
)(implicit tc: TraceContext): Future[Unit] = {
val action = sql"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm somehow I hit this in tests

2025-10-20T19:38:41.004+0200 [⋮] DEBUG - o.l.s.a.a.HttpRequestLogger:SvOffboardingIntegrationTest/config=426b87db/scan=sv2Scan (48a11ed8da4f8f67570214ff1f7918c4---39233d6980013913) - HTTP POST /api/scan/v0/state/acs/force from (127.0.0.1:41912): received request. 
2025-10-20T19:38:41.004+0200 [⋮] DEBUG - o.l.s.a.a.HttpRequestLogger:SvOffboardingIntegrationTest/config=426b87db/scan=sv2Scan (48a11ed8da4f8f67570214ff1f7918c4---39233d6980013913) - HTTP POST /api/scan/v0/state/acs/force from (127.0.0.1:41912): omitting logging of request entity data. 
2025-10-20T19:38:41.023+0200 [⋮] DEBUG - o.l.s.a.a.HttpRequestLogger:SvOffboardingIntegrationTest/config=426b87db/scan=sv2Scan (48a11ed8da4f8f67570214ff1f7918c4---39233d6980013913) - HTTP POST /api/scan/v0/state/acs/force from (127.0.0.1:41912): Responding with status code: 200 OK 
2025-10-20T19:38:41.023+0200 [⋮] DEBUG - o.l.s.a.a.HttpRequestLogger:SvOffboardingIntegrationTest/config=426b87db/scan=sv2Scan (48a11ed8da4f8f67570214ff1f7918c4---39233d6980013913) - HTTP POST /api/scan/v0/state/acs/force from (127.0.0.1:41912): Responding with entity data: {"record_time":"2025-10-20T17:38:33.082427Z","migration_id":0} 
2025-10-20T19:40:39.182+0200 [⋮] INFO - o.l.s.s.a.AcsSnapshotTrigger:DecentralizedSynchronizerMigrationIntegrationTest/config=1e0ce823/scan=sv2Scan (2d7c6cb4b923a9a8d12966e2663ce745-AcsSnapshotTrigger--1a3ab5fb07777d77) - Still not time to take a snapshot. Now: 2025-10-20T17:40:39.177592Z. Next snapshot time: 2025-10-20T18:00:00Z. 
2025-10-20T19:46:21.396+0200 [⋮] INFO - c.d.c.r.DbStorageSingle:DecentralizedSynchronizerMigrationIntegrationTest/config=1e0ce823/scan=sv2ScanLocal (a642760f98f4c3fa3c393238ead07771-ledger ingestion loop--70a3dfb0faf77da1) - Detected an error. java.lang.IllegalStateException: Found 1 acs snapshots for DSO-1e0ce823-1e0ce823::1220b88707c5... where migration_id = 0 and record_time > 2025-10-20T17:42:56.116892Z, but the configuration says the domain was paused during the migration. Check the domain migration configuration and the content of the update history database

But I don't understand why, I don't see a snapshot with a higher time 😕

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2025-10-21T08:14:42.806+0200 [⋮] INFO - o.l.s.s.a.AcsSnapshotTrigger:DecentralizedSynchronizerMigrationIntegrationTest/config=6c5a71a3/scan=sv2ScanLocal (7b749ed6846de9cc6cebb9d0b4dc4423-AcsSnapshotTrigger--234cf2d4cae90320) - Still not time to take a snapshot. Now: 2025-10-21T06:14:42.805240Z. Next snapshot time: 2025-10-21T09:00:00Z. 
2025-10-21T08:14:43.449+0200 [⋮] INFO - c.d.c.r.DbStorageSingle:DecentralizedSynchronizerMigrationIntegrationTest/config=6c5a71a3/scan=sv2ScanLocal (d23d0a2bdd9e7135da49e3b4a602eb8e-ledger ingestion loop--d2bdb56a49aefe7c) - Detected an error. java.lang.IllegalStateException: Found acs snapshots at Vector(2025-10-21T09:00:00Z) for DSO-6c5a71a3-6c5a71a3::1220264dccd0... where migration_id = 0 and record_time > 2025-10-21T06:11:11.069075Z, but the configuration says the domain was paused during the migration. Check the domain migration configuration and the content of the update history database

This makes no sense to me.

@OriolMunoz-da I think I need a 🦆, am I missing something obvious here? I can't find any log that inserts this snapshot. We also are nowhere close to 09:00:00 (and this is a wallclock test not simtime). There also are no weird force calls (and even if they were they would not be at 09:00:00). Is there some codepath I'm missing that creates this snapshot here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed with @OriolMunoz-da: this is a bug in the acs snapshot logic. He's looking into fixing it

select snapshot_record_time from acs_snapshot
where history_id = $historyId and migration_id = $migrationId and snapshot_record_time > $recordTime
"""
.as[CantonTimestamp]
.map(times =>
if (times.length > 0) {
throw new IllegalStateException(
s"Found acs snapshots at $times for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime, " +
"but the configuration says the domain was paused during the migration. " +
"Check the domain migration configuration and the content of the update history database"
)
} else {
logger.debug(
s"No updates found for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime"
)
}
)

storage
.query(
action,
"verifyNoRolledBackAcsSnapshots",
)
}

/** Deletes all updates on the given domain with a record time before the given time.
*/
def deleteUpdatesBefore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1875,16 +1875,48 @@ final class DbMultiDomainAcsStore[TXE](
)(implicit tc: TraceContext): Future[Unit] = {
txLogTableNameOpt.fold(Future.unit) { _ =>
val previousMigrationId = domainMigrationInfo.currentMigrationId - 1
domainMigrationInfo.acsRecordTime match {
case Some(acsRecordTime) =>
deleteRolledBackTxLogEntries(txLogStoreId, previousMigrationId, acsRecordTime)
domainMigrationInfo.migrationTimeInfo match {
case Some(info) =>
if (info.synchronizerWasPaused) {
verifyNoRolledBackData(txLogStoreId, previousMigrationId, info.acsRecordTime)
} else {
deleteRolledBackTxLogEntries(txLogStoreId, previousMigrationId, info.acsRecordTime)
}
case _ =>
logger.debug("No previous domain migration, not checking or deleting txlog entries")
Future.unit
}
}
}

private[this] def verifyNoRolledBackData(
txLogStoreId: TxLogStoreId, // Not using the storeId from the state, as the state might not be updated yet
migrationId: Long,
recordTime: CantonTimestamp,
)(implicit tc: TraceContext) = {
val action =
sql"""
select count(*) from #$txLogTableName
where store_id = $txLogStoreId and migration_id = $migrationId and record_time > $recordTime
"""
.as[Long]
.head
.map(rows =>
if (rows > 0) {
throw new IllegalStateException(
s"Found $rows rows for $txLogStoreDescriptor where migration_id = $migrationId and record_time > $recordTime, " +
"but the configuration says the domain was paused during the migration. " +
"Check the domain migration configuration and the content of the txlog database."
)
} else {
logger.debug(
s"No txlog entries found for $txLogStoreDescriptor where migration_id = $migrationId and record_time > $recordTime"
)
}
)
storage.query(action, "verifyNoRolledBackData")
}

private[this] def deleteRolledBackTxLogEntries(
txLogStoreId: TxLogStoreId, // Not using the storeId from the state, as the state might not be updated yet
migrationId: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.{
}
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.test.dummyholding.DummyHolding
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.test.dummytwointerfaces.DummyTwoInterfaces
import org.lfdecentralizedtrust.splice.migration.MigrationTimeInfo

import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
Expand Down Expand Up @@ -104,6 +105,7 @@ abstract class MultiDomainAcsStoreTest[
GenericAcsRowData,
GenericInterfaceRowData,
] = defaultContractFilter,
migrationTimeInfo: Option[MigrationTimeInfo] = None,
): Store

protected type Store = S
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.lfdecentralizedtrust.splice.environment.ledger.api.{
TransactionTreeUpdate,
TreeUpdateOrOffsetCheckpoint,
}
import org.lfdecentralizedtrust.splice.migration.MigrationTimeInfo
import org.lfdecentralizedtrust.splice.util.DomainRecordTimeRange

import java.time.Instant
Expand Down Expand Up @@ -617,6 +618,83 @@ class UpdateHistoryTest extends UpdateHistoryTestBase {
} yield succeed
}

"tx rollbacks after migrations are handled correctly" in {
val t0 = time(1)
val t1 = time(2)
val store1 = mkStore(party1, migration1, participant1)
val store2TimeTooEarly = mkStore(
party1,
migration2,
participant1,
migrationTimeInfo = Some(MigrationTimeInfo(t0, synchronizerWasPaused = true)),
)
val store2TimeCorrect = mkStore(
party1,
migration2,
participant1,
migrationTimeInfo = Some(MigrationTimeInfo(t1, synchronizerWasPaused = true)),
)
for {
_ <- initStore(store1)
_ <- create(domain1, cid1, offset1, party1, store1, t0)
_ <- create(domain1, cid2, offset2, party1, store1, t1)
updates1 <- updates(store1)
ex <- recoverToExceptionIf[IllegalStateException](initStore(store2TimeTooEarly))
_ = ex.getMessage should include("Found List(1, 0, 1, 0, 0) rows")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log is a bit confusing but it's the same log format we have for the delete and I can't be bothered to fix that righ tnow.

_ <- initStore(store2TimeCorrect)
updates2 <- updates(store2TimeCorrect)
} yield {
checkUpdates(
updates1,
Seq(
ExpectedCreate(cid1, domain1),
ExpectedCreate(cid2, domain1),
),
)
checkUpdates(
updates2,
Seq(
ExpectedCreate(cid1, domain1),
ExpectedCreate(cid2, domain1),
),
)
}
}

"tx rollbacks after DR are handled correctly" in {
val t0 = time(1)
val t1 = time(2)
val store1 = mkStore(party1, migration1, participant1)
val store2TimeTooEarly = mkStore(
party1,
migration2,
participant1,
migrationTimeInfo = Some(MigrationTimeInfo(t0, synchronizerWasPaused = false)),
)
for {
_ <- initStore(store1)
_ <- create(domain1, cid1, offset1, party1, store1, t0)
_ <- create(domain1, cid2, offset2, party1, store1, t1)
updates1 <- updates(store1)
_ <- initStore(store2TimeTooEarly)
updates2 <- updates(store2TimeTooEarly)
} yield {
checkUpdates(
updates1,
Seq(
ExpectedCreate(cid1, domain1),
ExpectedCreate(cid2, domain1),
),
)
checkUpdates(
updates2,
Seq(
ExpectedCreate(cid1, domain1)
),
)
}
}

}

"getImportUpdates" should {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.lfdecentralizedtrust.splice.environment.ledger.api.{
ReassignmentUpdate,
TransactionTreeUpdate,
}
import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo
import org.lfdecentralizedtrust.splice.migration.{DomainMigrationInfo, MigrationTimeInfo}
import org.lfdecentralizedtrust.splice.store.db.{AcsJdbcTypes, AcsTables, SplicePostgresTest}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
Expand Down Expand Up @@ -239,12 +239,13 @@ abstract class UpdateHistoryTestBase
participantId: ParticipantId = participant1,
storeName: String = storeName1,
backfillingRequired: BackfillingRequirement = BackfillingRequirement.NeedsBackfilling,
migrationTimeInfo: Option[MigrationTimeInfo] = None,
): UpdateHistory = {
new UpdateHistory(
storage,
DomainMigrationInfo(
domainMigrationId,
None,
migrationTimeInfo,
),
storeName,
participantId,
Expand Down
Loading
Loading