Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 46 additions & 19 deletions Sources/AblyLiveObjects/DefaultRealtimeObjects.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
}
}

/// If this returns false, it means that there is currently no stored sync sequence ID or SyncObjectsPool
/// If this returns false, it means that there is currently no stored sync sequence ID, SyncObjectsPool, or BufferedObjectOperations.
internal var testsOnly_hasSyncSequence: Bool {
mutex.withLock {
mutableState.syncSequence != nil
Expand All @@ -45,6 +45,9 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD

/// The `ObjectMessage`s gathered during this sync sequence.
internal var syncObjectsPool: [ObjectState]

/// `OBJECT` ProtocolMessages that were received during this sync sequence, to be applied once the sync sequence is complete, per RTO7a.
internal var bufferedObjectOperations: [InboundObjectMessage]
}

/// Tracks whether an object sync sequence has happened yet. This allows us to wait for a sync before returning from `getRoot()`, per RTO1c.
Expand Down Expand Up @@ -230,6 +233,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD

private struct MutableState {
internal var objectsPool: ObjectsPool
/// Note that we only ever populate this during a multi-`ProtocolMessage` sync sequence. It is not used in the RTO4b or RTO5a5 cases where the sync data is entirely contained within a single ProtocolMessage, because an individual ProtocolMessage is processed atomically and so no other operations that might wish to query this property can occur concurrently with the handling of these cases.
internal var syncSequence: SyncSequence?
internal var syncStatus = SyncStatus()
internal var onChannelAttachedHasObjects: Bool?
Expand All @@ -255,7 +259,7 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD

// I have, for now, not directly implemented the "perform the actions for object sync completion" of RTO4b4 since my implementation doesn't quite match the model given there; here you only have a SyncObjectsPool if you have an OBJECT_SYNC in progress, which you might not have upon receiving an ATTACHED. Instead I've just implemented what seem like the relevant side effects. Can revisit this if "the actions for object sync completion" get more complex.

// RTO4b3, RTO4b4, RTO5c3, RTO5c4
// RTO4b3, RTO4b4, RTO4b5, RTO5c3, RTO5c4, RTO5c5
syncSequence = nil
syncStatus.signalSyncComplete()
}
Expand All @@ -275,6 +279,8 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD

// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
let completedSyncObjectsPool: [ObjectState]?
// If populated, this contains a set of buffered inbound OBJECT messages that should be applied.
let completedSyncBufferedObjectOperations: [InboundObjectMessage]?

if let protocolMessageChannelSerial {
let syncCursor: SyncCursor
Expand All @@ -292,27 +298,28 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
// RTO5a3: Continue existing sync sequence
syncSequence
} else {
// RTO5a2: new sequence started, discard previous
.init(id: syncCursor.sequenceID, syncObjectsPool: [])
// RTO5a2a, RTO5a2b: new sequence started, discard previous
.init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: [])
}
} else {
// There's no current sync sequence; start one
.init(id: syncCursor.sequenceID, syncObjectsPool: [])
.init(id: syncCursor.sequenceID, syncObjectsPool: [], bufferedObjectOperations: [])
}

// RTO5b
updatedSyncSequence.syncObjectsPool.append(contentsOf: objectMessages.compactMap(\.object))

syncSequence = updatedSyncSequence

completedSyncObjectsPool = if syncCursor.isEndOfSequence {
updatedSyncSequence.syncObjectsPool
(completedSyncObjectsPool, completedSyncBufferedObjectOperations) = if syncCursor.isEndOfSequence {
(updatedSyncSequence.syncObjectsPool, updatedSyncSequence.bufferedObjectOperations)
} else {
nil
(nil, nil)
}
} else {
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
completedSyncObjectsPool = objectMessages.compactMap(\.object)
completedSyncBufferedObjectOperations = nil
}

if let completedSyncObjectsPool {
Expand All @@ -323,7 +330,21 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD
coreSDK: coreSDK,
logger: logger,
)
// RTO5c3, RTO5c4

// RTO5c6
if let completedSyncBufferedObjectOperations, !completedSyncBufferedObjectOperations.isEmpty {
logger.log("Applying \(completedSyncBufferedObjectOperations.count) buffered OBJECT ObjectMessages", level: .debug)
for objectMessage in completedSyncBufferedObjectOperations {
applyObjectProtocolMessageObjectMessage(
objectMessage,
logger: logger,
mapDelegate: mapDelegate,
coreSDK: coreSDK,
)
}
}

// RTO5c3, RTO5c4, RTO5c5
syncSequence = nil

syncStatus.signalSyncComplete()
Expand All @@ -342,16 +363,22 @@ internal final class DefaultRealtimeObjects: RealtimeObjects, LiveMapObjectPoolD

logger.log("handleObjectProtocolMessage(objectMessages: \(objectMessages))", level: .debug)

// TODO: RTO8a's buffering

// RTO8b
for objectMessage in objectMessages {
applyObjectProtocolMessageObjectMessage(
objectMessage,
logger: logger,
mapDelegate: mapDelegate,
coreSDK: coreSDK,
)
if let existingSyncSequence = syncSequence {
// RTO8a: Buffer the OBJECT message, to be handled once the sync completes
logger.log("Buffering OBJECT message due to in-progress sync", level: .debug)
var newSyncSequence = existingSyncSequence
newSyncSequence.bufferedObjectOperations.append(contentsOf: objectMessages)
syncSequence = newSyncSequence
} else {
// RTO8b: Handle the OBJECT message immediately
for objectMessage in objectMessages {
applyObjectProtocolMessageObjectMessage(
objectMessage,
logger: logger,
mapDelegate: mapDelegate,
coreSDK: coreSDK,
)
}
}
}

Expand Down
100 changes: 98 additions & 2 deletions Tests/AblyLiveObjectsTests/DefaultRealtimeObjectsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct DefaultRealtimeObjectsTests {
// @spec RTO5b
// @spec RTO5c3
// @spec RTO5c4
// @spec RTO5c5
@Test
func handlesMultiProtocolMessageSync() async throws {
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
Expand Down Expand Up @@ -94,7 +95,7 @@ struct DefaultRealtimeObjectsTests {
protocolMessageChannelSerial: "\(sequenceId):", // Empty cursor indicates end
)

// Verify sync sequence is cleared and there is no SyncObjectsPool (RTO5c3, RTO5c4)
// Verify sync sequence is cleared and there is no SyncObjectsPool or BufferedObjectOperations (RTO5c3, RTO5c4, RTO5c5)
#expect(!realtimeObjects.testsOnly_hasSyncSequence)

// Verify all objects were applied to pool (side effect of applySyncObjectsPool per RTO5c1b1b)
Expand All @@ -108,6 +109,7 @@ struct DefaultRealtimeObjectsTests {

// @spec RTO5a2
// @spec RTO5a2a
// @spec RTO5a2b
@Test
func newSequenceIdDiscardsInFlightSync() async throws {
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
Expand All @@ -123,6 +125,11 @@ struct DefaultRealtimeObjectsTests {

#expect(realtimeObjects.testsOnly_hasSyncSequence)

// Inject an OBJECT; it will get buffered per RTO8a and subsequently discarded per RTO5a2b
realtimeObjects.handleObjectProtocolMessage(objectMessages: [
TestFactories.mapCreateOperationMessage(objectId: "map:3@789"),
])

// Start new sequence with different ID (RTO5a2)
let secondMessages = [TestFactories.simpleMapMessage(objectId: "map:2@456")]
realtimeObjects.handleObjectSyncProtocolMessage(
Expand All @@ -142,6 +149,7 @@ struct DefaultRealtimeObjectsTests {
// Verify only the second sequence's objects were applied (RTO5a2a - previous cleared)
let pool = realtimeObjects.testsOnly_objectsPool
#expect(pool.entries["map:1@123"] == nil) // From discarded first sequence
#expect(pool.entries["map:3@789"] == nil) // Check we discarded the OBJECT that was buffered during discarded first sequence (RTO5a2b)
#expect(pool.entries["map:2@456"] != nil) // From completed second sequence
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
}
Expand Down Expand Up @@ -336,6 +344,7 @@ struct DefaultRealtimeObjectsTests {
// @spec RTO4b2
// @spec RTO4b3
// @spec RTO4b4
// @spec RTO4b5
@Test
func handlesHasObjectsFalse() {
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
Expand Down Expand Up @@ -381,7 +390,7 @@ struct DefaultRealtimeObjectsTests {
#expect(newRoot as AnyObject !== originalPool.root as AnyObject) // Should be a new instance
#expect(newRoot.testsOnly_data.isEmpty) // Should be zero-valued (empty)

// RTO4b3, RTO4b4: SyncObjectsPool must be cleared, sync sequence cleared
// RTO4b3, RTO4b4, RTO4b5: SyncObjectsPool must be cleared, sync sequence cleared, BufferedObjectOperations cleared
#expect(!realtimeObjects.testsOnly_hasSyncSequence)
}

Expand Down Expand Up @@ -922,5 +931,92 @@ struct DefaultRealtimeObjectsTests {
#expect(counter.testsOnly_siteTimeserials["site1"] == "ts2")
}
}

// Tests that when an OBJECT ProtocolMessage is received during a sync sequence, its operations are buffered per RTO8a and applied after sync completion per RTO5c6.
struct BufferOperationTests {
// @spec RTO8a
// @spec RTO5c6
@Test
func buffersObjectOperationsDuringSyncAndAppliesAfterCompletion() async throws {
let realtimeObjects = DefaultRealtimeObjectsTests.createDefaultRealtimeObjects()
let sequenceId = "seq123"

// Start sync sequence with first OBJECT_SYNC message
let (entryKey, entry) = TestFactories.stringMapEntry(key: "existingKey", value: "existingValue")
let firstSyncMessages = [
TestFactories.mapObjectMessage(
objectId: "map:1@123",
siteTimeserials: ["site1": "ts1"], // Explicit sync data siteCode and serial
entries: [entryKey: entry],
),
]
realtimeObjects.handleObjectSyncProtocolMessage(
objectMessages: firstSyncMessages,
protocolMessageChannelSerial: "\(sequenceId):cursor1",
)

// Verify sync sequence is active
#expect(realtimeObjects.testsOnly_hasSyncSequence)

// Inject first OBJECT ProtocolMessage during sync (RTO8a)
let firstObjectMessage = TestFactories.mapSetOperationMessage(
objectId: "map:1@123",
key: "key1",
value: "value1",
serial: "ts3", // Higher than sync data "ts1"
siteCode: "site1",
)
realtimeObjects.handleObjectProtocolMessage(objectMessages: [firstObjectMessage])

// Verify the operation was buffered and not applied yet
let poolAfterFirstObject = realtimeObjects.testsOnly_objectsPool
#expect(poolAfterFirstObject.entries["map:1@123"] == nil) // Object not yet created from sync

// Inject second OBJECT ProtocolMessage during sync (RTO8a)
let secondObjectMessage = TestFactories.counterIncOperationMessage(
objectId: "counter:1@456",
amount: 10,
serial: "ts4", // Higher than sync data "ts2"
siteCode: "site1",
)
realtimeObjects.handleObjectProtocolMessage(objectMessages: [secondObjectMessage])

// Verify the second operation was also buffered and not applied yet
let poolAfterSecondObject = realtimeObjects.testsOnly_objectsPool
#expect(poolAfterSecondObject.entries["counter:1@456"] == nil) // Object not yet created from sync

// Complete sync sequence with final OBJECT_SYNC message
let finalSyncMessages = [
TestFactories.counterObjectMessage(
objectId: "counter:1@456",
siteTimeserials: ["site1": "ts2"],
count: 5,
),
]
realtimeObjects.handleObjectSyncProtocolMessage(
objectMessages: finalSyncMessages,
protocolMessageChannelSerial: "\(sequenceId):", // Empty cursor indicates end
)

// Verify sync sequence is cleared
#expect(!realtimeObjects.testsOnly_hasSyncSequence)

// Verify all objects were applied to pool from sync
let finalPool = realtimeObjects.testsOnly_objectsPool
let map = try #require(finalPool.entries["map:1@123"]?.mapValue)
let counter = try #require(finalPool.entries["counter:1@456"]?.counterValue)

// Verify the buffered operations were applied after sync completion (RTO5c6)
// Check that MAP_SET operation was applied to the map
let mapValue = try #require(map.get(key: "key1")?.stringValue)
#expect(mapValue == "value1")
#expect(map.testsOnly_siteTimeserials["site1"] == "ts3")

// Check that COUNTER_INC operation was applied to the counter
let counterValue = try counter.value
#expect(counterValue == 15) // 5 (from sync) + 10 (from buffered operation)
#expect(counter.testsOnly_siteTimeserials["site1"] == "ts4")
}
}
}
}