[AIT-276] feat: introduce ACK-based local application of LiveObjects ops#1194
[AIT-276] feat: introduce ACK-based local application of LiveObjects ops#1194
Conversation
WalkthroughAdds connection siteCode propagation and implements a publish-and-apply ACK flow that synthesizes LOCAL inbound messages on server ACK; introduces source-aware operation handling (LOCAL vs CHANNEL), ACK buffering/deduplication, changes apply methods to return Boolean success, and updates managers and tests accordingly. Changes
sequenceDiagram
participant Client
participant DefaultRealtimeObjects
participant Channel
participant Server
participant ObjectsManager
Client->>DefaultRealtimeObjects: publishAndApply(objectMessages)
DefaultRealtimeObjects->>Channel: publish(message)
Channel->>Server: send
Server-->>DefaultRealtimeObjects: ACK (PublishResult with serial, siteCode)
DefaultRealtimeObjects->>DefaultRealtimeObjects: validate siteCode & serial
DefaultRealtimeObjects->>ObjectsManager: synthesize LOCAL ObjectMessage(s)
ObjectsManager->>ObjectsManager: apply messages (LOCAL), dedupe via appliedOnAckSerials
ObjectsManager-->>Client: complete publish/apply result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (4)
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt (1)
102-105: Consider asserting the returned apply status as part of these scenarios.
SinceapplyObject()now returnsBoolean, asserting it here would improve contract-level coverage.Suggested test tightening
- liveMap.applyObject(message, ObjectsOperationSource.CHANNEL) + val applied = liveMap.applyObject(message, ObjectsOperationSource.CHANNEL) + assertEquals(false, applied) ... - liveMap.applyObject(message, ObjectsOperationSource.CHANNEL) + val applied = liveMap.applyObject(message, ObjectsOperationSource.CHANNEL) + assertEquals(true, applied)Also applies to: 132-135
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt` around lines 102 - 105, The test should assert the Boolean return value from liveMap.applyObject(...) to cover the contract; update DefaultLiveMapTest to capture the result of applyObject(message, ObjectsOperationSource.CHANNEL) (and the other applyObject call around lines 132-135) and add assertions like assertFalse or assertTrue as appropriate based on the scenario before verifying siteTimeserials, so the test checks both the returned apply status and the side-effect on liveMap.siteTimeserials.liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt (1)
92-95: Consider assertingapplyObject()boolean result in these tests.
These cases already validate side effects; also asserting the returnedBooleanwould protect the new apply contract from regressions.Suggested test tightening
- liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL) + val applied = liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL) + assertEquals(false, applied) ... - liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL) + val applied = liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL) + assertEquals(true, applied)Also applies to: 119-122
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt` around lines 92 - 95, The tests call DefaultLiveCounterTest::liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL) but don’t assert its boolean return; capture the result and assert it matches the expected outcome (e.g., assertFalse(result) when the operation is skipped and the siteTimeserials remains "serial2", assertTrue(result) when the operation should be applied). Update both occurrences (the call around the siteTimeserials "serial2" check and the similar block at lines 119-122) to assign the return value from liveCounter.applyObject(...) to a variable and assert the Boolean equals the expected applied/skipped state.liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt (2)
319-330: Address swallowed exception warning from static analysis.The
try-catchon line 323 catches and swallows exceptions when accessingadapter.getChannel(channelName).reason. While the exception is used as an optional cause for the createdAblyException, if an unexpected exception occurs (not just the channel not being available), it's silently ignored.Consider being more specific about what exceptions to catch, or log the swallowed exception:
🔧 Proposed fix to log swallowed exception
- val errorReason = try { adapter.getChannel(channelName).reason } catch (e: Exception) { null } + val errorReason = try { + adapter.getChannel(channelName).reason + } catch (e: Exception) { + Log.d(tag, "Could not retrieve channel reason during state change handling", e) + null + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt` around lines 319 - 330, The try-catch around adapter.getChannel(channelName).reason swallows any Exception, hiding unexpected errors; update the code in DefaultRealtimeObjects (around adapter.getChannel, channelName, errorReason, ablyException, objectsManager.failBufferedAcks) to avoid silent swallowing by either catching only the expected exception type(s) thrown when a channel is absent or by logging the caught exception before returning null; ensure the catch block records the exception (e.g., logger.warn or logger.error with the exception) so unexpected failures are visible while still allowing errorReason to be null for building the AblyException.
212-221: Early returns silently degrade to echo-based application.When
siteCodeis null orserialsis unavailable/wrong length, the function logs an error and returns early, causing operations to be applied only when echoed from the server. This is a reasonable fallback, but consider whether callers should be aware of this degraded behavior.If this degradation is intentional and acceptable (operations still eventually apply), the current approach is fine. If callers need to know about the failure to apply locally, consider returning a result indicating the outcome.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt` around lines 212 - 221, The current early-return behavior when adapter.connectionManager.siteCode is null or publishResult.serials is null/length-mismatch (logged via Log.e with tag) silently degrades to echo-based application; change the function to surface this outcome to callers instead of only logging and returning—either by returning a status/result (e.g., boolean or enum) indicating "appliedLocally", "degradedToEcho", or an error, or by invoking the existing callback/throwing a descriptive exception; update all references to siteCode, publishResult.serials, and objectMessages handling so callers can react to the degraded path rather than being unaware.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt`:
- Around line 319-330: The try-catch around
adapter.getChannel(channelName).reason swallows any Exception, hiding unexpected
errors; update the code in DefaultRealtimeObjects (around adapter.getChannel,
channelName, errorReason, ablyException, objectsManager.failBufferedAcks) to
avoid silent swallowing by either catching only the expected exception type(s)
thrown when a channel is absent or by logging the caught exception before
returning null; ensure the catch block records the exception (e.g., logger.warn
or logger.error with the exception) so unexpected failures are visible while
still allowing errorReason to be null for building the AblyException.
- Around line 212-221: The current early-return behavior when
adapter.connectionManager.siteCode is null or publishResult.serials is
null/length-mismatch (logged via Log.e with tag) silently degrades to echo-based
application; change the function to surface this outcome to callers instead of
only logging and returning—either by returning a status/result (e.g., boolean or
enum) indicating "appliedLocally", "degradedToEcho", or an error, or by invoking
the existing callback/throwing a descriptive exception; update all references to
siteCode, publishResult.serials, and objectMessages handling so callers can
react to the degraded path rather than being unaware.
In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt`:
- Around line 92-95: The tests call
DefaultLiveCounterTest::liveCounter.applyObject(message,
ObjectsOperationSource.CHANNEL) but don’t assert its boolean return; capture the
result and assert it matches the expected outcome (e.g., assertFalse(result)
when the operation is skipped and the siteTimeserials remains "serial2",
assertTrue(result) when the operation should be applied). Update both
occurrences (the call around the siteTimeserials "serial2" check and the similar
block at lines 119-122) to assign the return value from
liveCounter.applyObject(...) to a variable and assert the Boolean equals the
expected applied/skipped state.
In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt`:
- Around line 102-105: The test should assert the Boolean return value from
liveMap.applyObject(...) to cover the contract; update DefaultLiveMapTest to
capture the result of applyObject(message, ObjectsOperationSource.CHANNEL) (and
the other applyObject call around lines 132-135) and add assertions like
assertFalse or assertTrue as appropriate based on the scenario before verifying
siteTimeserials, so the test checks both the returned apply status and the
side-effect on liveMap.siteTimeserials.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (16)
lib/src/main/java/io/ably/lib/transport/ConnectionManager.javalib/src/main/java/io/ably/lib/types/ConnectionDetails.javaliveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt
…operations - Implemented synthetic ACK-based object message application logic (`publishAndApply`). - Added buffering and order-preserving application of ACK results during sync (`applyAckResult`). - Introduced `ObjectsOperationSource` enum to distinguish operation sources (ACK vs. channel). - Updated `applyObject` and related object-specific management functions to utilize the source enum. - Enhanced tests for ACK-based application and updated handling of unsupported operations (returns `false` instead of throwing).
- Improved handling of skipped operations on ACK (RTO9a3) with detailed comments on echo discard logic. - Added extensive unit tests for `ObjectsManager`, `LiveMapManager`, and `DefaultLiveCounter` covering edge cases like buffering, tombstoning, and operation deduplication. - Introduced helper properties for buffered ACKs and object operations in tests.
a53b95d to
f57632f
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt (1)
165-176: Assert the expectedObjectsOperationSourceexplicitly in these verifications.Using
any()here can miss regressions whereObjectsManagerforwards the wrong source.💡 Proposed test tightening
verify(exactly = 1) { - testObject1.applyObject(objectMessage1, any()) + testObject1.applyObject(objectMessage1, ObjectsOperationSource.CHANNEL) } @@ verify(exactly = 1) { - testObject2.applyObject(objectMessage2, any()) + testObject2.applyObject(objectMessage2, ObjectsOperationSource.CHANNEL) } @@ verify(exactly = 1) { - testObject3.applyObject(objectMessage3, any()) + testObject3.applyObject(objectMessage3, ObjectsOperationSource.CHANNEL) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt` around lines 165 - 176, The verifications for applyObject use any() for the source which can hide regressions; update each verify call on testObject1.applyObject, testObject2.applyObject, and testObject3.applyObject to assert the specific ObjectsOperationSource expected (e.g., OBJECTS_OPERATION_SOURCE_SYNC or the concrete enum/value used in your code) instead of any(), so the test explicitly checks that ObjectsManager forwards the correct source when syncing objects from the pool.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt`:
- Around line 213-221: The code in DefaultRealtimeObjects.kt currently silently
returns when siteCode or publishResult.serials are null (the block checking
siteCode and serials), which causes downstream createMapAsync/createCounterAsync
to fail later; instead, change the behavior to fail fast or wait for echo: when
siteCode == null or serials == null/size mismatch, throw a clear exception (e.g.
IllegalStateException) or return a failed Future/CompletableDeferred so the
caller (createMapAsync/createCounterAsync) receives an explicit error; include
context (siteCode/publishResult info and objectMessages size) in the error
message so callers can handle retry/blocking for echo rather than proceeding
silently.
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 231-236: The ACK-echo dedup block that checks objectMessage.serial
against realtimeObjects.appliedOnAckSerials must only run for channel echoes;
update the conditional to also require the message source be a channel (e.g.
objectMessage.source == MessageSource.CHANNEL or the appropriate enum/constant
used in this codebase) before logging, removing the serial, and continuing, so
LOCAL replays do not clear dedup markers; keep the existing serial check and
remove/continue behavior but nest or extend the if to include the source check
(referencing objectMessage.serial, objectMessage.source, and
realtimeObjects.appliedOnAckSerials).
---
Nitpick comments:
In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt`:
- Around line 165-176: The verifications for applyObject use any() for the
source which can hide regressions; update each verify call on
testObject1.applyObject, testObject2.applyObject, and testObject3.applyObject to
assert the specific ObjectsOperationSource expected (e.g.,
OBJECTS_OPERATION_SOURCE_SYNC or the concrete enum/value used in your code)
instead of any(), so the test explicitly checks that ObjectsManager forwards the
correct source when syncing objects from the pool.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (17)
lib/src/main/java/io/ably/lib/transport/ConnectionManager.javalib/src/main/java/io/ably/lib/types/ConnectionDetails.javaliveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (4)
- lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
- liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt
- liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt
- lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Introduces ACK-based local application for LiveObjects operations so local state updates happen on publish ACKs (with correct deduplication against subsequent channel echoes) and preserves ordering across sync sequences.
Changes:
- Added
publishAndApplyflow that synthesizes inbound object messages on publish ACK and applies them locally. - Buffered ACK results during sync and applied them before buffered channel operations to ensure correct ordering/deduplication.
- Introduced
ObjectsOperationSourceand updated object-application pipeline to respect LOCAL vs CHANNEL semantics; expanded tests accordingly.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt | applyOperation now returns Boolean and treats unsupported ops as non-fatal (false). |
| liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt | Uses publishAndApply; propagates Boolean from operation application. |
| liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt | Mirrors LiveMap changes for counters; unsupported ops return false. |
| liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt | Uses publishAndApply; propagates Boolean from operation application. |
| liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt | Adds ObjectsOperationSource parameter and returns Boolean to support LOCAL-vs-CHANNEL behavior. |
| liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt | New enum distinguishing LOCAL (ACK) vs CHANNEL sources. |
| liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt | Buffers ACK results during sync, applies ACKs before buffered channel ops, and performs echo dedup via appliedOnAckSerials. |
| liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt | Adds PublishAndApplyFailedDueToChannelState error code. |
| liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt | Implements publishAndApply, tracks appliedOnAckSerials, and fails buffered ACKs on channel state changes. |
| lib/src/main/java/io/ably/lib/types/ConnectionDetails.java | Adds siteCode to connection details (decoded from msgpack). |
| lib/src/main/java/io/ably/lib/transport/ConnectionManager.java | Stores siteCode from ConnectionDetails for use by LiveObjects. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt | Updates tests for boolean return semantics and unsupported ops returning false. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt | Adds/updates tests for LOCAL vs CHANNEL application and return values. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt | Updates tests for boolean return semantics and unsupported ops returning false. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt | Adds/updates tests for LOCAL vs CHANNEL application and return values. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt | Adds comprehensive ACK buffering/dedup/order tests and teardown disposal for created instances. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt | Exposes bufferedAcks for tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return objectsPool.get(objectId) as? LiveMap | ||
| ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d | ||
| } | ||
|
|
There was a problem hiding this comment.
createMapAsync now assumes publishAndApply always applies the MAP_CREATE locally and throws if the object isn't in the pool. However publishAndApply explicitly returns early when siteCode or PublishResult.serials is unavailable (it logs and defers application until the channel echo), which will make createMapAsync throw even though the operation may still succeed later. Consider falling back to the previous behavior (create/return a zero-value object in the sequential scope, or otherwise ensure an object is returned when local apply isn't possible) instead of throwing here.
| return objectsPool.get(objectId) as? LiveMap | |
| ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d | |
| } | |
| val liveMap = objectsPool.get(objectId) as? LiveMap | |
| if (liveMap != null) { | |
| return liveMap | |
| } | |
| // In some cases (e.g., missing siteCode/serials) the apply may be deferred | |
| // until the channel echo is processed. Wait briefly for the object to appear | |
| // in the pool before giving up. | |
| val awaitedLiveMap = awaitLiveMapFromPool(objectId) | |
| return awaitedLiveMap | |
| ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d | |
| } | |
| /** | |
| * Wait for a LiveMap with the given objectId to appear in the objects pool. | |
| * Returns the LiveMap if it appears within [timeoutMillis], or null otherwise. | |
| */ | |
| private suspend fun awaitLiveMapFromPool( | |
| objectId: String, | |
| timeoutMillis: Long = 5000L, | |
| ): LiveMap? { | |
| return try { | |
| withTimeout(timeoutMillis) { | |
| while (true) { | |
| val map = objectsPool.get(objectId) as? LiveMap | |
| if (map != null) { | |
| return@withTimeout map | |
| } | |
| delay(10L) | |
| } | |
| } | |
| } catch (e: TimeoutCancellationException) { | |
| null | |
| } | |
| } |
There was a problem hiding this comment.
@coderabbitai can you check above suggestion thoroughly and let me know about details
There was a problem hiding this comment.
@sacOO7, here's a thorough breakdown:
Is the Copilot concern valid?
Yes. The core concern is real. publishAndApply has two explicit early-return paths that skip local application:
// RTO20c1 — siteCode missing (older server / not yet set)
if (siteCode == null) {
Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
return
}
// RTO20c2 — serials missing or wrong count
if (serials == null || serials.size != objectMessages.size) {
Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed")
return
}In both cases publishAndApply returns without putting anything into objectsPool. The message is published and will eventually be echoed back on the channel, but createMapAsync immediately follows with:
return objectsPool.get(objectId) as? LiveMap
?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId")This will always throw in the fallback cases, even though the operation was published successfully and will eventually be applied. This is a regression — the old code handled this gracefully by creating a zero-value object.
Issues with the Copilot suggested fix
The polling approach has several problems:
| Issue | Detail |
|---|---|
| Busy-wait | delay(10L) in a while(true) loop polls the pool every 10 ms. A reactive approach (e.g., a CompletableDeferred<LiveMap> or a StateFlow on the pool) would be far more efficient. |
| Arbitrary timeout | The hardcoded 5000L ms is a guess — no spec basis. Under slow networks or high load it may still expire before the echo arrives. |
| Worst-case UX | In the fallback scenario the caller already waited for publish ACK; now they wait another 5 s and then get an error anyway. |
| No observable improvement over previous behaviour | The old code immediately returned a zero-value object that got updated on echo. The suggestion delays and then throws — strictly worse. |
| Still can't distinguish paths | publishAndApply returns Unit; the suggestion doesn't fix the root cause (caller can't know whether local apply happened). |
Recommended approach
The cleanest fix is to make publishAndApply return a Boolean indicating whether local application happened, and fall back to the previous behaviour in createMapAsync/createCounterAsync when it returns false:
// In publishAndApply — return Boolean
internal suspend fun publishAndApply(objectMessages: Array<ObjectMessage>): Boolean {
val publishResult = publish(objectMessages)
val siteCode = adapter.connectionManager.siteCode
if (siteCode == null) {
Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
return false // <-- caller can fall back
}
val serials = publishResult.serials
if (serials == null || serials.size != objectMessages.size) {
Log.e(tag, "RTO20c2: serials unavailable or wrong length; operations will be applied when echoed")
return false // <-- caller can fall back
}
// ... build synthetic messages and apply ...
withContext(sequentialScope.coroutineContext) {
objectsManager.applyAckResult(syntheticMessages)
}
return true
}Then in createMapAsync:
val applied = publishAndApply(arrayOf(msg))
if (applied) {
// RTO11h2 – fast path: object was applied locally
return objectsPool.get(objectId) as? LiveMap
?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId")
}
// Fallback (RTO20c1/RTO20c2): local apply deferred to channel echo;
// return a zero-value object that will be updated when the echo arrives
return withContext(sequentialScope.coroutineContext) {
objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveMap
}This approach:
- Preserves the throw for the normal (ACK-applied) path — so the spec contract holds.
- Falls back gracefully to the previous behaviour when the server doesn't supply the required info.
- Avoids polling, hardcoded timeouts, and busy-waits entirely.
- Is self-documenting about which path was taken.
The same pattern applies symmetrically to createCounterAsync.
| return objectsPool.get(objectId) as? LiveCounter | ||
| ?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d |
There was a problem hiding this comment.
createCounterAsync throws if the counter object isn't present in the pool immediately after publishAndApply, but publishAndApply can legitimately return without applying locally (e.g. missing siteCode / missing PublishResult.serials) and rely on the channel echo instead. This makes counter creation fail in scenarios where the system is intentionally designed to fall back to echo-based application; consider reinstating a fallback that returns/creates a zero-value object when local apply can't happen.
| return objectsPool.get(objectId) as? LiveCounter | |
| ?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d | |
| (objectsPool.get(objectId) as? LiveCounter)?.let { return it } | |
| // In some configurations, publishAndApply may not apply the change locally and instead | |
| // rely on the channel echo to populate the objectsPool (e.g. missing siteCode / | |
| // missing PublishResult.serials). In that case, wait briefly for the echo-based apply. | |
| val echoAppliedCounter = withTimeoutOrNull(5_000L) { | |
| while (true) { | |
| val counter = objectsPool.get(objectId) as? LiveCounter | |
| if (counter != null) { | |
| return@withTimeoutOrNull counter | |
| } | |
| delay(10L) | |
| } | |
| } | |
| if (echoAppliedCounter != null) { | |
| return echoAppliedCounter | |
| } | |
| throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d |
There was a problem hiding this comment.
I guess we should fix this too
There was a problem hiding this comment.
we don't do this based on spec, we throw exception if we wasn't ably to apply locally (it means something went wrong, and sdk user can decide for themselves what to do next)
| @Test | ||
| fun `(RTO5) ObjectsManager should handle object sync messages`() { | ||
| val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps() |
There was a problem hiding this comment.
This test class now disposes DefaultRealtimeObjects instances created via makeRealtimeObjects, but other tests in the same file still create instances via getDefaultRealtimeObjectsWithMockedDeps() which also starts background coroutines (incomingObjectsHandler) and are not disposed/tracked. To avoid leaked coroutines between tests, consider routing all instance creation through the tracking helper (or adding the mocked-deps instances to testInstances and disposing them in tearDown).
| @Test | |
| fun `(RTO5) ObjectsManager should handle object sync messages`() { | |
| val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps() | |
| private fun makeRealtimeObjectsWithMockedDeps(): DefaultRealtimeObjects { | |
| return getDefaultRealtimeObjectsWithMockedDeps().also { testInstances.add(it) } | |
| } | |
| @Test | |
| fun `(RTO5) ObjectsManager should handle object sync messages`() { | |
| val defaultRealtimeObjects = makeRealtimeObjectsWithMockedDeps() |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 17 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Few comments
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (1)
77-85:⚠️ Potential issue | 🟠 MajorDo not replace an outstanding
syncCompletionWaiter.
applyAckResult()suspends on the current deferred. If a new sync starts before that one completes, Line 84 swaps in a different waiter and the suspended ACK coroutine is left waiting on the old one forever. Reuse the existing incomplete waiter, or fail it before replacing it.🔧 Suggested fix
- syncCompletionWaiter = CompletableDeferred() + if (syncCompletionWaiter == null || syncCompletionWaiter?.isCompleted == true) { + syncCompletionWaiter = CompletableDeferred() + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 77 - 85, startNewSync currently overwrites syncCompletionWaiter causing any coroutine in applyAckResult waiting on the old deferred to hang; instead, in startNewSync check the existing syncCompletionWaiter: if it's still active reuse it (do not replace), otherwise create a new CompletableDeferred; alternatively, if you must replace, completeExceptionally or cancel the old waiter before assigning a new CompletableDeferred to syncCompletionWaiter so applyAckResult won't remain suspended. Locate startNewSync and syncCompletionWaiter and implement one of these approaches to ensure applyAckResult's suspended awaits are resolved.
♻️ Duplicate comments (2)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (1)
209-219:⚠️ Potential issue | 🟠 MajorRestrict ACK-echo deduplication to channel messages.
This branch still runs for
ObjectsOperationSource.LOCAL. A local replay can remove the serial fromappliedOnAckSerials, so the later channel echo is no longer filtered and can be applied twice.🔧 Suggested fix
- if (objectMessage.serial != null && - realtimeObjects.appliedOnAckSerials.contains(objectMessage.serial)) { + if (source == ObjectsOperationSource.CHANNEL && + objectMessage.serial != null && + realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial) + ) { Log.d(tag, "RTO9a3: serial ${objectMessage.serial} already applied on ACK; discarding echo") - realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial) continue // discard without taking any further action }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 209 - 219, The ACK-echo deduplication block in ObjectsManager (checking realtimeObjects.appliedOnAckSerials against objectMessage.serial) must only run for channel-originated messages to avoid local replays removing serials and letting channel echoes apply twice; modify the conditional around that block to also require objectMessage.source == ObjectsOperationSource.CHANNEL (or equivalent enum value) so the serial removal and continue are only executed for channel messages while preserving existing behavior for LOCAL and other sources.liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt (1)
211-220:⚠️ Potential issue | 🟠 MajorFail
publishAndApply()explicitly when ACK metadata is missing.
createMapAsync()/createCounterAsync()assume this method has applied the operation locally and immediately read fromobjectsPoolafterward. Returning here turns a successful publish into a later"not applied as expected"failure, and a caller retry can duplicate the create on the wire. Surface the failure here instead of silently falling back.🔧 Suggested fix
val siteCode = adapter.connectionManager.siteCode if (siteCode == null) { - Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed") - return + throw serverError("RTO20c1: siteCode unavailable; cannot apply ACK locally") } val serials = publishResult.serials if (serials == null || serials.size != objectMessages.size) { - Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed") - return + throw serverError( + "RTO20c2: PublishResult.serials unavailable or wrong length; " + + "expected=${objectMessages.size}, actual=${serials?.size ?: "null"}" + ) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt` around lines 211 - 220, The check in publishAndApply (in DefaultRealtimeObjects.kt) currently logs and returns when adapter.connectionManager.siteCode or publishResult.serials is missing, which hides publish failures from createMapAsync/createCounterAsync; instead fail the publish explicitly by returning/propagating an error (e.g., throw an exception or complete the returned future/Deferred exceptionally) when siteCode == null or when publishResult.serials is null or of the wrong length; update the code paths in publishAndApply so callers like createMapAsync and createCounterAsync see the failure rather than proceeding to read objectsPool, referencing publishAndApply, publishResult.serials, adapter.connectionManager.siteCode, and objectsPool to locate where to change the control flow.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt`:
- Around line 24-29: The test cleanup only disposes instances tracked in
testInstances, but DefaultRealtimeObjects created via
getDefaultRealtimeObjectsWithMockedDeps() are not added and leak background
coroutines; update the test setup so every DefaultRealtimeObjects created is
registered for disposal—either modify getDefaultRealtimeObjectsWithMockedDeps()
to append created DefaultRealtimeObjects to the shared testInstances list or
ensure callers always call makeRealtimeObjects() instead of directly
constructing objects—so that the teardown routine disposes/cleans up all
DefaultRealtimeObjects (references: DefaultRealtimeObjects, makeRealtimeObjects,
getDefaultRealtimeObjectsWithMockedDeps, testInstances).
---
Outside diff comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 77-85: startNewSync currently overwrites syncCompletionWaiter
causing any coroutine in applyAckResult waiting on the old deferred to hang;
instead, in startNewSync check the existing syncCompletionWaiter: if it's still
active reuse it (do not replace), otherwise create a new CompletableDeferred;
alternatively, if you must replace, completeExceptionally or cancel the old
waiter before assigning a new CompletableDeferred to syncCompletionWaiter so
applyAckResult won't remain suspended. Locate startNewSync and
syncCompletionWaiter and implement one of these approaches to ensure
applyAckResult's suspended awaits are resolved.
---
Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt`:
- Around line 211-220: The check in publishAndApply (in
DefaultRealtimeObjects.kt) currently logs and returns when
adapter.connectionManager.siteCode or publishResult.serials is missing, which
hides publish failures from createMapAsync/createCounterAsync; instead fail the
publish explicitly by returning/propagating an error (e.g., throw an exception
or complete the returned future/Deferred exceptionally) when siteCode == null or
when publishResult.serials is null or of the wrong length; update the code paths
in publishAndApply so callers like createMapAsync and createCounterAsync see the
failure rather than proceeding to read objectsPool, referencing publishAndApply,
publishResult.serials, adapter.connectionManager.siteCode, and objectsPool to
locate where to change the control flow.
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 209-219: The ACK-echo deduplication block in ObjectsManager
(checking realtimeObjects.appliedOnAckSerials against objectMessage.serial) must
only run for channel-originated messages to avoid local replays removing serials
and letting channel echoes apply twice; modify the conditional around that block
to also require objectMessage.source == ObjectsOperationSource.CHANNEL (or
equivalent enum value) so the serial removal and continue are only executed for
channel messages while preserving existing behavior for LOCAL and other sources.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: edba32a5-48a9-4429-a369-7d78f273741a
📒 Files selected for processing (4)
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (1)
- liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
Show resolved
Hide resolved
708fe84 to
d732e91
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 111-117: The applyAckResult method can hang because it creates a
new syncCompletionWaiter when state != Synced (e.g., Initialized) but that
waiter is never completed; update applyAckResult in ObjectsManager to not create
a new CompletableDeferred unconditionally—either (A) throw an explicit error
when realtimeObjects.state == ObjectsState.Initialized to fail fast, or (B) only
await an existing syncCompletionWaiter (i.e., if (syncCompletionWaiter != null)
await it) instead of assigning a new one; reference applyAckResult,
syncCompletionWaiter, startNewSync and endSync when making the change so the
waiter lifecycle remains consistent.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 518bbd87-2192-43ce-b7ef-0b0146efcec7
📒 Files selected for processing (5)
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (1)
- liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
| internal suspend fun applyAckResult(messages: List<ObjectMessage>) { | ||
| if (realtimeObjects.state != ObjectsState.Synced) { | ||
| if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred() | ||
| syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1) | ||
| } | ||
| applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f | ||
| } |
There was a problem hiding this comment.
Potential hang if applyAckResult called in Initialized state.
If state != Synced and syncCompletionWaiter == null (e.g., Initialized state before any sync), line 113 creates a new CompletableDeferred that will never be completed—startNewSync replaces it with a fresh instance and endSync completes only the waiter it knows about.
Consider either throwing if state is Initialized, or ensuring the waiter created here is eventually completed.
💡 Proposed defensive check
internal suspend fun applyAckResult(messages: List<ObjectMessage>) {
if (realtimeObjects.state != ObjectsState.Synced) {
- if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred()
+ val waiter = syncCompletionWaiter
+ ?: throw clientError("applyAckResult called before sync started")
- syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
+ waiter.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
}
applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 111 - 117, The applyAckResult method can hang because it creates a new
syncCompletionWaiter when state != Synced (e.g., Initialized) but that waiter is
never completed; update applyAckResult in ObjectsManager to not create a new
CompletableDeferred unconditionally—either (A) throw an explicit error when
realtimeObjects.state == ObjectsState.Initialized to fail fast, or (B) only
await an existing syncCompletionWaiter (i.e., if (syncCompletionWaiter != null)
await it) instead of assigning a new one; reference applyAckResult,
syncCompletionWaiter, startNewSync and endSync when making the change so the
waiter lifecycle remains consistent.
introduce ACK-based local application of LiveObjects ops
publishAndApply).applyAckResult).ObjectsOperationSourceenum to distinguish operation sources (ACK vs. channel).applyObjectand related object-specific management functions to utilize the source enum.falseinstead of throwing).Spec: ably/specification#419
JS: ably/ably-js#2155
Summary by CodeRabbit
New Features
Bug Fixes
Tests