From 52e3470064f598402e4b16f3fee5dd5102f11353 Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Tue, 25 Jun 2024 13:39:11 +0200 Subject: [PATCH] RDART-973: Add support for the new progress notifications (#1546) * Add support for the new progress notifications * Add a sanity check * .. * wip * Clean up tests * Clean up * Changelog, another test * await the timeout future * Use realm-core 14.8.0 * downgrade core * disable failing test * Rework test * Fix core version * Tweak test so it passes --- .github/workflows/dart-desktop-tests.yml | 6 +- CHANGELOG.md | 2 +- .../src/handles/native/session_handle.dart | 2 +- packages/realm_dart/lib/src/realm_class.dart | 4 +- packages/realm_dart/lib/src/session.dart | 21 ++--- packages/realm_dart/test/realm_test.dart | 25 ++--- packages/realm_dart/test/session_test.dart | 93 ++++++++++++++++--- packages/realm_dart/test/test.dart | 15 ++- 8 files changed, 120 insertions(+), 48 deletions(-) diff --git a/.github/workflows/dart-desktop-tests.yml b/.github/workflows/dart-desktop-tests.yml index 0a22a26c2..af3238cfb 100644 --- a/.github/workflows/dart-desktop-tests.yml +++ b/.github/workflows/dart-desktop-tests.yml @@ -39,7 +39,7 @@ jobs: shell: bash - id: runner_os_lowercase - # there is no such thing as ${{ tolower(runner.os) }}, hence this abomination ¯\_(ツ)_/¯ + # there is no such thing as ${{ tolower(runner.os) }}, hence this abomination ¯\_(ツ)_/¯ # use with steps.runner_os_lowercase.outputs.os run: echo ${{ runner.os }} | awk '{print "os=" tolower($0)}' >> $GITHUB_OUTPUT shell: bash @@ -59,7 +59,7 @@ jobs: ulimit -n 10240 if: ${{ contains(runner.os, 'macos') }} - - name: Run tests ${{ runner }} ${{ runner.arch }} + - name: Run tests ${{ runner.os }} ${{ runner.arch }} run: melos test:unit # TODO: Publish all reports @@ -67,7 +67,7 @@ jobs: uses: dorny/test-reporter@v1.8.0 if: success() || failure() with: - name: Test Results Dart ${{ runner }} ${{ runner.arch }} + name: Test Results Dart ${{ runner.os }} ${{ runner.arch }} path: test-results.json reporter: dart-json only-summary: true diff --git a/CHANGELOG.md b/CHANGELOG.md index dd9ed42bd..6d13eb4bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ## vNext (TBD) ### Enhancements -* None +* The download progress estimate reported by `Session.getProgressStream` will now return meaningful estimated values, while previously it always returned 1. (Issue [#1564](https://github.com/realm/realm-dart/issues/1564)) ### Fixed * [sane_uuid](https://pub.dev/packages/sane_uuid) 1.0.0 was released, which has a few minor breaking change as compared to to 1.0.0-rc.5 that impact realm: diff --git a/packages/realm_dart/lib/src/handles/native/session_handle.dart b/packages/realm_dart/lib/src/handles/native/session_handle.dart index dcc3415ca..9ce60d04b 100644 --- a/packages/realm_dart/lib/src/handles/native/session_handle.dart +++ b/packages/realm_dart/lib/src/handles/native/session_handle.dart @@ -173,5 +173,5 @@ void _onConnectionStateChange(Object userdata, int oldState, int newState) { void syncProgressCallback(Object userdata, int transferred, int transferable, double estimate) { final controller = userdata as ProgressNotificationsController; - controller.onProgress(transferred, transferable); + controller.onProgress(estimate); } diff --git a/packages/realm_dart/lib/src/realm_class.dart b/packages/realm_dart/lib/src/realm_class.dart index 15a91bb95..846f9e43b 100644 --- a/packages/realm_dart/lib/src/realm_class.dart +++ b/packages/realm_dart/lib/src/realm_class.dart @@ -1016,8 +1016,8 @@ class RealmAsyncOpenProgressNotificationsController implements ProgressNotificat } @override - void onProgress(int transferredBytes, int transferableBytes) { - _streamController.add(SessionInternal.createSyncProgress(transferredBytes, transferableBytes)); + void onProgress(double progressEstimate) { + _streamController.add(SessionInternal.createSyncProgress(progressEstimate)); } void _start() { diff --git a/packages/realm_dart/lib/src/session.dart b/packages/realm_dart/lib/src/session.dart index 6ba07de14..58cba843f 100644 --- a/packages/realm_dart/lib/src/session.dart +++ b/packages/realm_dart/lib/src/session.dart @@ -73,15 +73,7 @@ class SyncProgress { /// value may either increase or decrease as new data needs to be transferred. final double progressEstimate; - const SyncProgress._({required this.progressEstimate}); - - static double _calculateProgress({required int transferred, required int transferable}) { - if (transferable == 0 || transferred > transferable) { - return 1; - } - - return transferred / transferable; - } + const SyncProgress({required this.progressEstimate}); } /// A type containing information about the transition of a connection state from one value to another. @@ -108,12 +100,11 @@ extension SessionInternal on Session { void raiseError(int errorCode, bool isFatal) => handle.raiseError(errorCode, isFatal); - static SyncProgress createSyncProgress(int transferredBytes, int transferableBytes) => - SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes)); + static SyncProgress createSyncProgress(double progressEstimate) => SyncProgress(progressEstimate: progressEstimate); } abstract interface class ProgressNotificationsController { - void onProgress(int transferredBytes, int transferableBytes); + void onProgress(double progressEstimate); } /// @nodoc @@ -133,10 +124,10 @@ class SessionProgressNotificationsController implements ProgressNotificationsCon } @override - void onProgress(int transferredBytes, int transferableBytes) { - _streamController.add(SyncProgress._(progressEstimate: SyncProgress._calculateProgress(transferred: transferredBytes, transferable: transferableBytes))); + void onProgress(double progressEstimate) { + _streamController.add(SyncProgress(progressEstimate: progressEstimate)); - if (transferredBytes >= transferableBytes && _mode == ProgressMode.forCurrentlyOutstandingWork) { + if (progressEstimate >= 1.0 && _mode == ProgressMode.forCurrentlyOutstandingWork) { _streamController.close(); } } diff --git a/packages/realm_dart/test/realm_test.dart b/packages/realm_dart/test/realm_test.dart index 08f577c15..c4de0d802 100644 --- a/packages/realm_dart/test/realm_test.dart +++ b/packages/realm_dart/test/realm_test.dart @@ -1337,18 +1337,21 @@ void main() { final user = await app.logIn(credentials); final configuration = Configuration.flexibleSync(user, getSyncSchema()); - int count = 0; - double progress = -1; - + double progressEstimate = -1; + bool progressReported = false; var syncedRealm = await getRealmAsync(configuration, onProgressCallback: (syncProgress) { - count++; - progress = syncProgress.progressEstimate; + progressEstimate = syncProgress.progressEstimate; + progressReported = true; }); + await Future.delayed(Duration(milliseconds: 500)); + expect(syncedRealm.isClosed, false); - // Semantics of onProgressCallback changed with https://github.com/realm/realm-core/issues/7452 - expect(count, 0); - expect(progress, -1); + + // For FLX realms with no subscriptions, the server won't report any progress before it resolves the + // Realm.open future. + expect(progressEstimate, -1); + expect(progressReported, false); }); baasTest('Realm.open (flexibleSync) - download a populated realm', (appConfiguration) async { @@ -1367,16 +1370,16 @@ void main() { final config = await _subscribeForAtlasAddedData(app); int printCount = 0; - double progress = 0; + double progressEstimate = 0; final syncedRealm = await getRealmAsync(config, onProgressCallback: (syncProgress) { printCount++; - progress = syncProgress.progressEstimate; + progressEstimate = syncProgress.progressEstimate; }); expect(syncedRealm.isClosed, false); expect(printCount, isNot(0)); - expect(progress, 1.0); + expect(progressEstimate, 1.0); }); baasTest('Realm.open (flexibleSync) - listen and cancel download progress of a populated realm', (appConfiguration) async { diff --git a/packages/realm_dart/test/session_test.dart b/packages/realm_dart/test/session_test.dart index 040f15429..f0076879f 100644 --- a/packages/realm_dart/test/session_test.dart +++ b/packages/realm_dart/test/session_test.dart @@ -159,6 +159,7 @@ void main() { StreamProgressData subscribeToProgress(Realm realm, ProgressDirection direction, ProgressMode mode) { final data = StreamProgressData(); final stream = realm.syncSession.getProgressStream(direction, mode); + data.subscription = stream.listen((event) { if (mode == ProgressMode.forCurrentlyOutstandingWork) { expect(event.progressEstimate, greaterThanOrEqualTo(data.progressEstimate)); @@ -191,26 +192,97 @@ void main() { baasTest('SyncSession.getProgressStream forCurrentlyOutstandingWork', (configuration) async { final differentiator = ObjectId(); - final realmA = await getIntegrationRealm(differentiator: differentiator); - final realmB = await getIntegrationRealm(differentiator: differentiator); + final uploadRealm = await getIntegrationRealm(differentiator: differentiator); for (var i = 0; i < 10; i++) { - realmA.write(() { - realmA.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50))); + uploadRealm.write(() { + uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50))); }); } - final uploadData = subscribeToProgress(realmA, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork); - final downloadData = subscribeToProgress(realmB, ProgressDirection.download, ProgressMode.forCurrentlyOutstandingWork); + final uploadData = subscribeToProgress(uploadRealm, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork); + await uploadRealm.syncSession.waitForUpload(); + await validateData(uploadData, expectDone: true); - await realmA.syncSession.waitForUpload(); + // Subscribe immediately after the upload to ensure we get the entire upload message as progress notifications + final downloadRealm = await getIntegrationRealm(differentiator: differentiator, waitForSync: false); + final downloadData = subscribeToProgress(downloadRealm, ProgressDirection.download, ProgressMode.forCurrentlyOutstandingWork); - await validateData(uploadData, expectDone: true); + await downloadRealm.subscriptions.waitForSynchronization(); - await realmB.syncSession.waitForDownload(); + await downloadRealm.syncSession.waitForDownload(); await validateData(downloadData, expectDone: true); + // We should not see more updates in either direction + final uploadCallbacks = uploadData.callbacksInvoked; + final downloadCallbacks = downloadData.callbacksInvoked; + + uploadRealm.write(() { + uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50))); + }); + + await uploadRealm.syncSession.waitForUpload(); + await downloadRealm.syncSession.waitForDownload(); + + expect(uploadRealm.all().length, downloadRealm.all().length); + expect(uploadData.callbacksInvoked, uploadCallbacks); + expect(downloadData.callbacksInvoked, downloadCallbacks); + + await uploadData.subscription.cancel(); + await downloadData.subscription.cancel(); + }); + + baasTest('SyncSession.getProgressStream after reconnecting', (configuration) async { + final differentiator = ObjectId(); + final uploadRealm = await getIntegrationRealm(differentiator: differentiator); + + // Make sure we've caught up, then close the Realm. We'll reopen it later and verify that progress notifications + // are delivered. This is different from "SyncSession.getProgressStream forCurrentlyOutstandingWork" where we're + // testing notifications after change of query. + final user = await getIntegrationUser(appConfig: configuration); + final config = getIntegrationConfig(user); + var downloadRealm = getRealm(config); + downloadRealm.subscriptions.update((mutableSubscriptions) { + mutableSubscriptions.add(downloadRealm.query(r'differentiator = $0', [differentiator])); + }); + + await downloadRealm.subscriptions.waitForSynchronization(); + downloadRealm.close(); + + for (var i = 0; i < 10; i++) { + uploadRealm.write(() { + uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50))); + }); + } + + final uploadData = subscribeToProgress(uploadRealm, ProgressDirection.upload, ProgressMode.forCurrentlyOutstandingWork); + await uploadRealm.syncSession.waitForUpload(); + await validateData(uploadData, expectDone: true); + + // Reopen the download realm and subscribe for notifications - those should still be delivered as normal. + downloadRealm = getRealm(getIntegrationConfig(user)); + final downloadData = subscribeToProgress(downloadRealm, ProgressDirection.download, ProgressMode.reportIndefinitely); + + await downloadRealm.syncSession.waitForDownload(); + + await validateData(downloadData, expectDone: false); + + // We should not see more updates in upload direction, but should see a callback invoked for download + final uploadCallbacks = uploadData.callbacksInvoked; + final downloadCallbacks = downloadData.callbacksInvoked; + + uploadRealm.write(() { + uploadRealm.add(NullableTypes(ObjectId(), differentiator, stringProp: generateRandomString(50))); + }); + + await uploadRealm.syncSession.waitForUpload(); + await downloadRealm.syncSession.waitForDownload(); + + expect(uploadRealm.all().length, downloadRealm.all().length); + expect(uploadData.callbacksInvoked, uploadCallbacks); + expect(downloadData.callbacksInvoked, greaterThan(downloadCallbacks)); + await uploadData.subscription.cancel(); await downloadData.subscription.cancel(); }); @@ -254,7 +326,6 @@ void main() { expect(downloadData.progressEstimate, 1.0); expect(uploadData.callbacksInvoked, greaterThan(uploadSnapshot.callbacksInvoked)); - expect(downloadData.callbacksInvoked, greaterThan(downloadSnapshot.callbacksInvoked)); await uploadData.subscription.cancel(); @@ -319,7 +390,7 @@ class StreamProgressData { bool doneInvoked; late StreamSubscription subscription; - StreamProgressData({this.progressEstimate = 0, this.callbacksInvoked = 0, this.doneInvoked = false}); + StreamProgressData({this.progressEstimate = -1, this.callbacksInvoked = 0, this.doneInvoked = false}); StreamProgressData.snapshot(StreamProgressData other) : this(callbacksInvoked: other.callbacksInvoked, doneInvoked: other.doneInvoked, progressEstimate: other.progressEstimate); diff --git a/packages/realm_dart/test/test.dart b/packages/realm_dart/test/test.dart index ff0432f26..c012823fc 100644 --- a/packages/realm_dart/test/test.dart +++ b/packages/realm_dart/test/test.dart @@ -429,7 +429,7 @@ void setupTests() { Realm.logger.setLogLevel(LogLevel.detail); Realm.logger.onRecord.listen((record) { - printOnFailure('${record.category} ${record.level.name}: ${record.message}'); + printOnFailure('${DateTime.now().toUtc()} ${record.category} ${record.level.name}: ${record.message}'); }); if (Platform.isIOS) { @@ -618,21 +618,28 @@ Future getAnonymousUser(App app) { return app.logIn(Credentials.anonymous(reuseCredentials: false)); } +FlexibleSyncConfiguration getIntegrationConfig(User user) { + return Configuration.flexibleSync(user, getSyncSchema())..sessionStopPolicy = SessionStopPolicy.immediately; +} + /// Returns a synced realm after logging in a user. /// /// A subscription for querying all [NullableTypes] objects containing /// the `differentiator` will be added if a `differentiator` is provided. -Future getIntegrationRealm({App? app, ObjectId? differentiator, AppConfiguration? appConfig}) async { +Future getIntegrationRealm({App? app, ObjectId? differentiator, AppConfiguration? appConfig, bool waitForSync = true}) async { + app ??= App(appConfig ?? await baasHelper!.getAppConfig()); final user = await getIntegrationUser(app: app, appConfig: appConfig); - final config = Configuration.flexibleSync(user, getSyncSchema())..sessionStopPolicy = SessionStopPolicy.immediately; + final config = getIntegrationConfig(user); final realm = getRealm(config); if (differentiator != null) { realm.subscriptions.update((mutableSubscriptions) { mutableSubscriptions.add(realm.query(r'differentiator = $0', [differentiator])); }); - await realm.subscriptions.waitForSynchronization(); + if (waitForSync) { + await realm.subscriptions.waitForSynchronization(); + } } return realm;