Skip to content

Commit 436b1bb

Browse files
amabluea-maurice
authored andcommitted
WebSocketListenProvider will now properly handle errors and gracefully fail if
an error occurs while attempting to listen on a location. PiperOrigin-RevId: 246583767
1 parent 25bbbe3 commit 436b1bb

File tree

6 files changed

+83
-11
lines changed

6 files changed

+83
-11
lines changed

database/src/desktop/core/listen_provider.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#define FIREBASE_DATABASE_CLIENT_CPP_SRC_DESKTOP_CORE_LISTEN_PROVIDER_H_
1717

1818
#include "database/src/common/query_spec.h"
19+
#include "database/src/desktop/view/view.h"
1920

2021
namespace firebase {
2122
namespace database {
@@ -30,7 +31,8 @@ class ListenProvider {
3031
// Begin listening on a location with a set of parameters given by the
3132
// QuerySpec. While listening, the server will send down updates which will be
3233
// parsed and passed along to the SyncTree to be cached locally.
33-
virtual void StartListening(const QuerySpec& query_spec) = 0;
34+
virtual void StartListening(const QuerySpec& query_spec,
35+
const View* view) = 0;
3436

3537
// Stop listening on a location given by the QuerySpec.
3638
virtual void StopListening(const QuerySpec& query_spec) = 0;

database/src/desktop/core/repo.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,11 +391,13 @@ void Repo::DeferredInitialization() {
391391
UniquePtr<PersistenceManager> persistence_manager =
392392
MakeUnique<PersistenceManager>(std::move(persistence_storage_engine),
393393
std::move(tracked_query_manager));
394-
UniquePtr<ListenProvider> listen_provider =
395-
MakeUnique<WebSocketListenProvider>(connection_.get());
394+
UniquePtr<WebSocketListenProvider> listen_provider =
395+
MakeUnique<WebSocketListenProvider>(this, connection_.get());
396+
WebSocketListenProvider* listen_provider_ptr = listen_provider.get();
396397
server_sync_tree_ = MakeUnique<SyncTree>(std::move(pending_write_tree),
397398
std::move(persistence_manager),
398399
std::move(listen_provider));
400+
listen_provider_ptr->set_sync_tree(server_sync_tree_.get());
399401
}
400402

401403
void Repo::PostEvents(const std::vector<Event>& events) {

database/src/desktop/core/repo.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ class Repo : public connection::PersistentConnectionEventHandler {
107107

108108
static scheduler::Scheduler& scheduler() { return s_scheduler_; }
109109

110+
ThisRef& this_ref() { return safe_this_; }
111+
110112
private:
111113
WriteId GetNextWriteId();
112114

database/src/desktop/core/sync_tree.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ static QuerySpec QuerySpecForListening(const QuerySpec& query_spec) {
454454

455455
void SyncTree::SetupListener(const QuerySpec& query_spec, const View* view) {
456456
const Path& path = query_spec.path;
457-
listen_provider_->StartListening(QuerySpecForListening(query_spec));
457+
listen_provider_->StartListening(QuerySpecForListening(query_spec), view);
458458

459459
Tree<SyncPoint>* subtree = sync_point_tree_.GetChild(path);
460460

@@ -557,7 +557,8 @@ std::vector<Event> SyncTree::RemoveEventRegistration(
557557
// Ok, we've collected all the listens we need. Set them up.
558558
for (const View* view : new_views) {
559559
QuerySpec new_query = view->query_spec();
560-
listen_provider_->StartListening(QuerySpecForListening(new_query));
560+
listen_provider_->StartListening(QuerySpecForListening(new_query),
561+
view);
561562
}
562563
} else {
563564
// There's nothing below us, so nothing we need to start listening on

database/src/desktop/core/web_socket_listen_provider.cc

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "database/src/common/query_spec.h"
1818
#include "database/src/desktop/connection/persistent_connection.h"
1919
#include "database/src/desktop/core/listen_provider.h"
20+
#include "database/src/desktop/view/view.h"
2021

2122
namespace firebase {
2223
namespace database {
@@ -25,8 +26,66 @@ namespace internal {
2526
using connection::PersistentConnection;
2627
using connection::ResponsePtr;
2728

28-
void WebSocketListenProvider::StartListening(const QuerySpec& query_spec) {
29-
connection_->Listen(query_spec, PersistentConnection::Tag(), ResponsePtr());
29+
class WebSocketListenResponse : public connection::Response {
30+
public:
31+
WebSocketListenResponse(const Response::ResponseCallback& callback,
32+
const Repo::ThisRef& repo_ref, SyncTree* sync_tree,
33+
const QuerySpec& query_spec, const View* view)
34+
: connection::Response(callback),
35+
repo_ref_(repo_ref),
36+
sync_tree_(sync_tree),
37+
query_spec_(query_spec),
38+
view_(view) {}
39+
40+
Repo::ThisRef& repo_ref() { return repo_ref_; }
41+
SyncTree* sync_tree() { return sync_tree_; }
42+
const QuerySpec& query_spec() { return query_spec_; }
43+
const View* view() { return view_; }
44+
45+
private:
46+
Repo::ThisRef repo_ref_;
47+
SyncTree* sync_tree_;
48+
QuerySpec query_spec_;
49+
const View* view_;
50+
};
51+
52+
void WebSocketListenProvider::StartListening(const QuerySpec& query_spec,
53+
const View* view) {
54+
connection_->Listen(
55+
query_spec, PersistentConnection::Tag(),
56+
MakeShared<WebSocketListenResponse>(
57+
[](const SharedPtr<connection::Response>& connection_response) {
58+
WebSocketListenResponse* response =
59+
static_cast<WebSocketListenResponse*>(
60+
connection_response.get());
61+
62+
Repo::ThisRefLock lock(&response->repo_ref());
63+
Repo* repo = lock.GetReference();
64+
if (repo == nullptr) {
65+
// Repo was deleted, do not proceed.
66+
return;
67+
}
68+
69+
std::vector<Event> events;
70+
if (!response->HasError()) {
71+
const QuerySpec& query_spec = response->view()->query_spec();
72+
events =
73+
response->sync_tree()->ApplyListenComplete(query_spec.path);
74+
} else {
75+
LogWarning("Listen at %s failed: %s",
76+
response->query_spec().path.c_str(),
77+
response->GetErrorMessage().c_str());
78+
79+
// If a listen failed, kill all of the listeners here, not just
80+
// the one that triggered the error. Note that this may need to be
81+
// scoped to just this listener if we change permissions on
82+
// filtered children
83+
events = response->sync_tree()->RemoveAllEventRegistrations(
84+
response->query_spec(), response->GetErrorCode());
85+
}
86+
repo->PostEvents(events);
87+
},
88+
repo_->this_ref(), sync_tree_, query_spec, view));
3089
}
3190

3291
void WebSocketListenProvider::StopListening(const QuerySpec& query_spec) {

database/src/desktop/core/web_socket_listen_provider.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,29 @@
1818
#include "database/src/common/query_spec.h"
1919
#include "database/src/desktop/connection/persistent_connection.h"
2020
#include "database/src/desktop/core/listen_provider.h"
21+
#include "database/src/desktop/core/repo.h"
2122

2223
namespace firebase {
2324
namespace database {
2425
namespace internal {
2526

2627
class WebSocketListenProvider : public ListenProvider {
2728
public:
28-
WebSocketListenProvider(connection::PersistentConnection* connection)
29-
: connection_(connection) {}
29+
WebSocketListenProvider(Repo* repo,
30+
connection::PersistentConnection* connection)
31+
: repo_(repo), sync_tree_(nullptr), connection_(connection) {}
3032

31-
virtual ~WebSocketListenProvider() {}
33+
~WebSocketListenProvider() override {}
3234

33-
void StartListening(const QuerySpec& query_spec) override;
35+
void set_sync_tree(SyncTree* sync_tree) { sync_tree_ = sync_tree; }
36+
37+
void StartListening(const QuerySpec& query_spec, const View* view) override;
3438

3539
void StopListening(const QuerySpec& query_spec) override;
3640

3741
private:
42+
Repo* repo_;
43+
SyncTree* sync_tree_;
3844
connection::PersistentConnection* connection_;
3945
};
4046

0 commit comments

Comments
 (0)