Skip to content

Keep topics data up-to-date by handling events #1508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions lib/model/channel.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import 'dart:async';
import 'dart:collection';

import 'package:collection/collection.dart';
import 'package:flutter/foundation.dart';

import '../api/model/events.dart';
import '../api/model/initial_snapshot.dart';
import '../api/model/model.dart';
import '../api/route/channels.dart';
import 'store.dart';
import 'user.dart';

final _apiGetStreamTopics = getStreamTopics; // similar to _apiSendMessage in lib/model/message.dart

/// The portion of [PerAccountStore] for channels, topics, and stuff about them.
///
/// This type is useful for expressing the needs of other parts of the
Expand Down Expand Up @@ -38,6 +43,26 @@ mixin ChannelStore on UserStore {
/// and [streamsByName].
Map<int, Subscription> get subscriptions;

/// Fetch topics in a stream from the server.
///
/// The results from the last successful fetch
/// can be retrieved with [getStreamTopics].
Future<void> fetchTopics(int streamId);

/// Pairs of the known topics and its latest message ID, in the given stream.
///
/// Returns null if the data has never been fetched yet.
/// To fetch it from the server, use [fetchTopics].
///
/// The result is guaranteed to be sorted by [GetStreamTopicsEntry.maxId], and the
/// topics are guaranteed to be distinct.
///
/// In some cases, the same maxId affected by message moves can be present in
/// multiple [GetStreamTopicsEntry] entries. For this reason, the caller
/// should not rely on [getStreamTopics] to determine which topic the message
/// is in. Instead, refer to [PerAccountStore.messages].
List<GetStreamTopicsEntry>? getStreamTopics(int streamId);

/// The visibility policy that the self-user has for the given topic.
///
/// This does not incorporate the user's channel-level policy,
Expand Down Expand Up @@ -199,6 +224,13 @@ mixin ProxyChannelStore on ChannelStore {
@override
Map<int, Subscription> get subscriptions => channelStore.subscriptions;

@override
Future<void> fetchTopics(int streamId) => channelStore.fetchTopics(streamId);

@override
List<GetStreamTopicsEntry>? getStreamTopics(int streamId) =>
channelStore.getStreamTopics(streamId);

@override
UserTopicVisibilityPolicy topicVisibilityPolicy(int streamId, TopicName topic) =>
channelStore.topicVisibilityPolicy(streamId, topic);
Expand Down Expand Up @@ -260,6 +292,34 @@ class ChannelStoreImpl extends HasUserStore with ChannelStore {
@override
final Map<int, Subscription> subscriptions;

/// Maps indexed by stream IDs, of the known latest message IDs in each topic.
///
/// For example: `_latestMessageIdsByStreamTopic[stream.id][topic] = maxId`
///
/// In some cases, the same message IDs, when affected by message moves, can
/// be present for mutliple stream-topic keys.
final Map<int, Map<TopicName, int>> _latestMessageIdsByStreamTopic = {};

@override
Future<void> fetchTopics(int streamId) async {
final result = await _apiGetStreamTopics(connection, streamId: streamId,
allowEmptyTopicName: true);
Comment on lines +304 to +306
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we've already successfully fetched the list for this channel? In that case I think we want to show the existing list (which, after all, this class will have been keeping up to date) rather than make a new fetch.

_latestMessageIdsByStreamTopic[streamId] = {
for (final GetStreamTopicsEntry(:name, :maxId) in result.topics)
name: maxId,
};
}

@override
List<GetStreamTopicsEntry>? getStreamTopics(int streamId) {
final latestMessageIdsInStream = _latestMessageIdsByStreamTopic[streamId];
if (latestMessageIdsInStream == null) return null;
return [
for (final MapEntry(:key, :value) in latestMessageIdsInStream.entries)
GetStreamTopicsEntry(maxId: value, name: key),
].sortedBy((value) => -value.maxId);
}

@override
Map<int, TopicKeyedMap<UserTopicVisibilityPolicy>> get debugTopicVisibility => topicVisibility;

Expand Down Expand Up @@ -425,6 +485,47 @@ class ChannelStoreImpl extends HasUserStore with ChannelStore {
forStream[event.topicName] = visibilityPolicy;
}
}

/// Handle a [MessageEvent], returning whether listeners should be notified.
bool handleMessageEvent(MessageEvent event) {
if (event.message is! StreamMessage) return false;
final StreamMessage(:streamId, :topic) = event.message as StreamMessage;

final latestMessageIdsByTopic = _latestMessageIdsByStreamTopic[streamId];
if (latestMessageIdsByTopic == null) return false;
assert(!latestMessageIdsByTopic.containsKey(topic)
|| latestMessageIdsByTopic[topic]! < event.message.id);
latestMessageIdsByTopic[topic] = event.message.id;
Comment on lines +490 to +498
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we'll want to think through in how this part of the model should work is how to handle the fetch/event race which it introduces.

An example of that is that I think this assert isn't correct — we could do a fetch that learns about a topic with a new message, and only later get the MessageEvent about that message.

For an example of where we've reasoned through this sort of fetch/event race before, see the implementation of reconcileMessages in lib/model/message.dart .

return true;
}

/// Handle a [UpdateMessageEvent], returning whether listeners should be
/// notified.
bool handleUpdateMessageEvent(UpdateMessageEvent event) {
if (event.moveData == null) return false;
final UpdateMessageMoveData(
:origStreamId, :origTopic, :newStreamId, :newTopic, :propagateMode,
) = event.moveData!;
bool shouldNotify = false;

final origLatestMessageIdsByTopics = _latestMessageIdsByStreamTopic[origStreamId];
if (propagateMode == PropagateMode.changeAll
&& origLatestMessageIdsByTopics != null) {
shouldNotify = origLatestMessageIdsByTopics.remove(origTopic) != null;
}
Comment on lines +511 to +515
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be the right logic to have, but there are some edge cases it doesn't handle. So this code should have comments pointing out those cases and explaining why we're choosing to not handle them.


final newLatestMessageIdsByTopics = _latestMessageIdsByStreamTopic[newStreamId];
if (newLatestMessageIdsByTopics != null) {
final movedMaxId = event.messageIds.max;
if (!newLatestMessageIdsByTopics.containsKey(newTopic)
|| newLatestMessageIdsByTopics[newTopic]! < movedMaxId) {
newLatestMessageIdsByTopics[newTopic] = movedMaxId;
shouldNotify = true;
}
}

return shouldNotify;
}
}

/// A [Map] with [TopicName] keys and [V] values.
Expand Down
2 changes: 2 additions & 0 deletions lib/model/store.dart
Original file line number Diff line number Diff line change
Expand Up @@ -781,13 +781,15 @@ class PerAccountStore extends PerAccountStoreBase with
unreads.handleMessageEvent(event);
recentDmConversationsView.handleMessageEvent(event);
recentSenders.handleMessage(event.message); // TODO(#824)
if (_channels.handleMessageEvent(event)) notifyListeners();
// When adding anything here (to handle [MessageEvent]),
// it probably belongs in [reconcileMessages] too.

case UpdateMessageEvent():
assert(debugLog("server event: update_message ${event.messageId}"));
_messages.handleUpdateMessageEvent(event);
unreads.handleUpdateMessageEvent(event);
if (_channels.handleUpdateMessageEvent(event)) notifyListeners();

case DeleteMessageEvent():
assert(debugLog("server event: delete_message ${event.messageIds}"));
Expand Down
34 changes: 18 additions & 16 deletions lib/widgets/topic_list.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import '../api/model/model.dart';
import '../api/route/channels.dart';
import '../generated/l10n/zulip_localizations.dart';
import '../model/narrow.dart';
import '../model/store.dart';
import '../model/unreads.dart';
import 'action_sheet.dart';
import 'app_bar.dart';
Expand Down Expand Up @@ -121,16 +122,13 @@ class _TopicList extends StatefulWidget {

class _TopicListState extends State<_TopicList> with PerAccountStoreAwareStateMixin {
Unreads? unreadsModel;
// TODO(#1499): store the results on [ChannelStore], and keep them
// up-to-date by handling events
List<GetStreamTopicsEntry>? lastFetchedTopics;

@override
void onNewStore() {
unreadsModel?.removeListener(_modelChanged);
final store = PerAccountStoreWidget.of(context);
unreadsModel = store.unreads..addListener(_modelChanged);
_fetchTopics();
_fetchTopics(store);
}

@override
Expand All @@ -145,31 +143,30 @@ class _TopicListState extends State<_TopicList> with PerAccountStoreAwareStateMi
});
}

void _fetchTopics() async {
void _fetchTopics(PerAccountStore store) async {
// Do nothing when the fetch fails; the topic-list will stay on
// the loading screen, until the user navigates away and back.
// TODO(design) show a nice error message on screen when this fails
final store = PerAccountStoreWidget.of(context);
final result = await getStreamTopics(store.connection,
streamId: widget.streamId,
allowEmptyTopicName: true);
await store.fetchTopics(widget.streamId);
if (!mounted) return;
setState(() {
lastFetchedTopics = result.topics;
// The actuall state lives in the PerAccountStore
});
}

@override
Widget build(BuildContext context) {
if (lastFetchedTopics == null) {
final store = PerAccountStoreWidget.of(context);
final streamTopics = store.getStreamTopics(widget.streamId);
if (streamTopics == null) {
return const Center(child: CircularProgressIndicator());
}

// TODO(design) handle the rare case when `lastFetchedTopics` is empty

// This is adapted from parts of the build method on [_InboxPageState].
final topicItems = <_TopicItemData>[];
for (final GetStreamTopicsEntry(:maxId, name: topic) in lastFetchedTopics!) {
for (final GetStreamTopicsEntry(:maxId, name: topic) in streamTopics) {
final unreadMessageIds =
unreadsModel!.streams[widget.streamId]?[topic] ?? <int>[];
final countInTopic = unreadMessageIds.length;
Expand All @@ -179,9 +176,6 @@ class _TopicListState extends State<_TopicList> with PerAccountStoreAwareStateMi
topic: topic,
unreadCount: countInTopic,
hasMention: hasMention,
// `lastFetchedTopics.maxId` can become outdated when a new message
// arrives or when there are message moves, until we re-fetch.
// TODO(#1499): track changes to this
maxId: maxId,
));
}
Expand Down Expand Up @@ -231,6 +225,14 @@ class _TopicItem extends StatelessWidget {

final store = PerAccountStoreWidget.of(context);
final designVariables = DesignVariables.of(context);
// The message with `maxId` might not remain in `topic` since we last fetch
// the list of topics. Make sure we check for that before passing `maxId`
// to the topic action sheet.
// See also: [ChannelStore.getStreamTopics]
final message = store.messages[maxId];
final isMaxIdInTopic = message is StreamMessage
&& message.streamId == streamId
&& message.topic.isSameAs(topic);
Comment on lines +228 to +235
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. This will mean that if we just don't happen to have any messages from that topic in MessageStore — for example, because it's not a topic that's included in the very last messages — then we won't give the user the option to resolve or unresolve the topic from this screen. That seems unfortunate.

What happens if we go ahead and use this message ID even though it might turn out to no longer be in the topic?


final visibilityPolicy = store.topicVisibilityPolicy(streamId, topic);
final double opacity;
Expand Down Expand Up @@ -259,7 +261,7 @@ class _TopicItem extends StatelessWidget {
onLongPress: () => showTopicActionSheet(context,
channelId: streamId,
topic: topic,
someMessageIdInTopic: maxId),
someMessageIdInTopic: isMaxIdInTopic ? maxId : null),
splashFactory: NoSplash.splashFactory,
child: Padding(padding: EdgeInsetsDirectional.fromSTEB(6, 8, 12, 8),
child: Row(
Expand Down
7 changes: 7 additions & 0 deletions test/api/route/route_checks.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import 'package:checks/checks.dart';
import 'package:zulip/api/model/model.dart';
import 'package:zulip/api/route/channels.dart';
import 'package:zulip/api/route/messages.dart';
import 'package:zulip/api/route/saved_snippets.dart';

Expand All @@ -9,4 +11,9 @@ extension CreateSavedSnippetResultChecks on Subject<CreateSavedSnippetResult> {
Subject<int> get savedSnippetId => has((e) => e.savedSnippetId, 'savedSnippetId');
}

extension GetStreamTopicEntryChecks on Subject<GetStreamTopicsEntry> {
Subject<int> get maxId => has((e) => e.maxId, 'maxId');
Subject<TopicName> get name => has((e) => e.name, 'name');
}

// TODO add similar extensions for other classes in api/route/*.dart
Loading
Loading