Skip to content

Commit

Permalink
fix: fix stream state transition with gRPC reactor
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui committed Mar 15, 2024
1 parent 99c505a commit 196b6e9
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 45 deletions.
104 changes: 69 additions & 35 deletions cpp/source/client/TelemetryBidiReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ ROCKETMQ_NAMESPACE_BEGIN
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
rmq::MessagingService::Stub* stub,
std::string peer_address)
: client_(client), peer_address_(std::move(peer_address)), stream_state_(StreamState::Created) {
auto ptr = client.lock();
: client_(client), peer_address_(std::move(peer_address)), stream_state_(StreamState::Active) {
auto ptr = client_.lock();
auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
context_.set_deadline(deadline);
Metadata metadata;
Expand All @@ -45,6 +45,7 @@ TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
context_.AddMetadata(entry.first, entry.second);
}
stub->async()->Telemetry(&context_, this);
fireRead();
StartCall();
}

Expand All @@ -64,8 +65,6 @@ bool TelemetryBidiReactor::await() {
}

void TelemetryBidiReactor::OnWriteDone(bool ok) {
SPDLOG_DEBUG("OnWriteDone: {}", ok);

{
bool expect = true;
if (!command_inflight_.compare_exchange_strong(expect, false, std::memory_order_relaxed)) {
Expand All @@ -77,36 +76,28 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
SPDLOG_WARN("Failed to write telemetry command {} to {}", write_.DebugString(), peer_address_);
{
absl::MutexLock lk(&stream_state_mtx_);
stream_state_ = StreamState::WriteDone;
if (streamStateGood()) {
stream_state_ = StreamState::WriteFailure;
}
}

fireClose();
return;
}

{
absl::MutexLock lk(&stream_state_mtx_);
if (StreamState::Created == stream_state_) {
stream_state_ = StreamState::Active;
fireRead();
}
}

fireWrite();
}

void TelemetryBidiReactor::OnReadDone(bool ok) {
SPDLOG_DEBUG("OnReadDone: ok={}", ok);
if (!ok) {
if (client_.lock()) {
SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_);
}

{
absl::MutexLock lk(&stream_state_mtx_);
stream_state_ = StreamState::ReadDone;
if (!ok) {
if (streamStateGood()) {
stream_state_ = StreamState::ReadFailure;
SPDLOG_WARN("Faild to read from telemetry stream from {}", peer_address_);
}
return;
}
}
fireClose();
return;
}

Expand Down Expand Up @@ -283,6 +274,16 @@ void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings

void TelemetryBidiReactor::fireRead() {
SPDLOG_DEBUG("{}#fireRead", peer_address_);

{
absl::MutexLock lk(&stream_state_mtx_);
if (!streamStateGood()) {
SPDLOG_WARN("Further read from {} is not allowded due to stream-state={}", peer_address_,
static_cast<std::uint8_t>(stream_state_));
return;
}
}

StartRead(&read_);
}

Expand All @@ -295,13 +296,11 @@ void TelemetryBidiReactor::write(TelemetryCommand command) {
}

void TelemetryBidiReactor::fireWrite() {
SPDLOG_DEBUG("{}#fireWrite", peer_address_);

{
absl::MutexLock lk(&stream_state_mtx_);
if (stream_state_ != StreamState::Active && stream_state_ != StreamState::Created) {
SPDLOG_WARN("TelemetryBidiReactor to {} is closed or half-closed, ignoring fireWrite event. stream-state={}",
peer_address_, static_cast<std::uint8_t>(stream_state_));
if (!streamStateGood()) {
SPDLOG_WARN("Further write to {} is not allowded due to stream-state={}", peer_address_,
static_cast<std::uint8_t>(stream_state_));
return;
}
}
Expand All @@ -328,18 +327,33 @@ void TelemetryBidiReactor::fireWrite() {

void TelemetryBidiReactor::fireClose() {
SPDLOG_INFO("{}#fireClose", peer_address_);
if (StreamState::Active == stream_state_) {
StartWritesDone();
{
absl::MutexLock lk(&stream_state_mtx_);
if (StreamState::Active == stream_state_) {
stream_state_cv_.Wait(&stream_state_mtx_);
}

{
absl::MutexLock lk(&stream_state_mtx_);
if (!streamStateGood()) {
SPDLOG_WARN("No futher Read/Write call to {} is allowed due to stream-state={}", peer_address_,
static_cast<std::uint8_t>(stream_state_));
return;
}
}

StartWritesDone();
{
absl::MutexLock lk(&stream_state_mtx_);
stream_state_cv_.Wait(&stream_state_mtx_);
}
}

void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
if (!ok) {
absl::MutexLock lk(&stream_state_mtx_);
if (streamStateGood()) {
stream_state_ = StreamState::WriteFailure;
}
SPDLOG_WARN("Previous telemetry write to {} failed", peer_address_);
return;
}

SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
}

Expand Down Expand Up @@ -368,7 +382,10 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& status) {
{
SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_);
absl::MutexLock lk(&stream_state_mtx_);
stream_state_ = StreamState::Closed;
if (streamStateGood()) {
stream_state_ = StreamState::Closed;
}

stream_state_cv_.SignalAll();
}

Expand All @@ -382,4 +399,21 @@ void TelemetryBidiReactor::OnDone(const grpc::Status& status) {
}
}

void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) {
if (!ok) {
absl::MutexLock lk(&stream_state_mtx_);
if (streamStateGood()) {
stream_state_ = StreamState::ReadInitialMetadataFailure;
}
SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_);
return;
}

SPDLOG_DEBUG("Received initial metadata from {}", peer_address_);
}

bool TelemetryBidiReactor::streamStateGood() {
return StreamState::Active == stream_state_;
}

ROCKETMQ_NAMESPACE_END
60 changes: 50 additions & 10 deletions cpp/source/client/include/TelemetryBidiReactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ ROCKETMQ_NAMESPACE_BEGIN

enum class StreamState : std::uint8_t
{
Created = 0,
Active = 1,
ReadDone = 2,
WriteDone = 3,
Closed = 4,
Active = 0,

// Once stream state reaches one of the following, Start* should not be called.
Closed = 1,
ReadInitialMetadataFailure = 2,
ReadFailure = 3,
WriteFailure = 4,
};

class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>,
Expand All @@ -47,13 +49,46 @@ class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, Te

~TelemetryBidiReactor();

void OnWriteDone(bool ok) override;

void OnWritesDoneDone(bool ok) override;
/// Notifies the application that all operations associated with this RPC
/// have completed and all Holds have been removed. OnDone provides the RPC
/// status outcome for both successful and failed RPCs and will be called in
/// all cases. If it is not called, it indicates an application-level problem
/// (like failure to remove a hold).
///
/// \param[in] s The status outcome of this RPC
void OnDone(const grpc::Status& status) override;

/// Notifies the application that a read of initial metadata from the
/// server is done. If the application chooses not to implement this method,
/// it can assume that the initial metadata has been read before the first
/// call of OnReadDone or OnDone.
///
/// \param[in] ok Was the initial metadata read successfully? If false, no
/// new read/write operation will succeed, and any further
/// Start* operations should not be called.
void OnReadInitialMetadataDone(bool /*ok*/) override;

/// Notifies the application that a StartRead operation completed.
///
/// \param[in] ok Was it successful? If false, no new read/write operation
/// will succeed, and any further Start* should not be called.
void OnReadDone(bool ok) override;

void OnDone(const grpc::Status& status) override;
/// Notifies the application that a StartWrite or StartWriteLast operation
/// completed.
///
/// \param[in] ok Was it successful? If false, no new read/write operation
/// will succeed, and any further Start* should not be called.
void OnWriteDone(bool ok) override;

/// Notifies the application that a StartWritesDone operation completed. Note
/// that this is only used on explicit StartWritesDone operations and not for
/// those that are implicitly invoked as part of a StartWriteLast.
///
/// \param[in] ok Was it successful? If false, the application will later see
/// the failure reflected as a bad status in OnDone and no
/// further Start* should be called.
void OnWritesDoneDone(bool ok) override;

void fireRead();

Expand Down Expand Up @@ -87,7 +122,7 @@ class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, Te
TelemetryCommand write_;

/**
* @brief Each TelemetryBidiReactor belongs to a specific client as its owner.
* @brief Each TelemetryBidiReactor belongs to a specific client as its owner.
*/
std::weak_ptr<Client> client_;

Expand Down Expand Up @@ -118,6 +153,11 @@ class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, Te
void applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client);

void applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client);

/**
* Indicate if the underlying gRPC bidirectional stream is good enough to fire further Start* calls.
*/
bool streamStateGood() ABSL_EXCLUSIVE_LOCKS_REQUIRED(stream_state_mtx_);
};

ROCKETMQ_NAMESPACE_END

0 comments on commit 196b6e9

Please sign in to comment.