Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: an asynchronous data write error #187

Merged
merged 12 commits into from
Feb 17, 2025
Merged

fix: an asynchronous data write error #187

merged 12 commits into from
Feb 17, 2025

Conversation

lqxhub
Copy link
Collaborator

@lqxhub lqxhub commented Jan 24, 2025

修复多线程向网络中写入数据,可能导致的flag错误

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced event handling mechanisms across network components.
    • Improved context management for writable events.
  • Bug Fixes

    • Streamlined error handling in socket and event processing.
    • Refined memory management in event processing.
  • Refactor

    • Updated method signatures to provide more detailed event context.
    • Simplified error handling logic in network event processing.
  • Chores

    • Added virtual destructor for timer task interface.
    • Updated header file include mechanism.
    • Expanded criteria for 64-bit architecture definition.

Copy link

coderabbitai bot commented Jan 24, 2025

Walkthrough

The pull request introduces significant modifications to the network event handling system across multiple files. The primary changes involve updating the OnWritable method signatures in various classes to include additional context parameters like connection ID, file descriptor, and event pointer. These modifications aim to enhance event processing by providing more detailed information during writable and error handling scenarios. The changes span multiple network-related classes, including EpollEvent, KqueueEvent, ListenSocket, StreamSocket, and affect the overall event management strategy.

Changes

File Change Summary
src/net/epoll_event.cc Modified DoRead and DoWrite methods, changing conditional logic and method call parameters.
src/net/kqueue_event.cc Updated DoWrite method with enhanced context handling and error management.
src/net/listen_socket.cc, src/net/listen_socket.h Updated OnWritable method signature to include id, fd, and event parameters; modified SendPacket method signature to include an additional callback parameter.
src/net/net_event.h Added BaseEvent class declaration; modified OnWritable and SendPacket method signatures.
src/net/stream_socket.cc, src/net/stream_socket.h Updated OnWritable method and SendPacket method signatures; simplified error handling in Read method.
src/net/thread_manager.h Modified SendPacket method to include a callback for setting write events after packet sending.
src/net/timer_task.h Changed preprocessor directive to #pragma once and added virtual destructor.
src/net/config.h Updated HAVE_64BIT macro definition to include defined(__arm64__).

Possibly related PRs

  • feat: support ipv6 #100: The changes in the main PR, which modify the EpollEvent and KqueueEvent classes to handle writable events with updated method signatures, are related to the changes in the retrieved PR that also update the KqueueEvent class's DoWrite method to include additional parameters for writable events. Both PRs involve modifications to the event handling logic in similar classes.

Suggested reviewers

  • AlexStocks
  • marsevilspirit

Poem

🐰 Hopping through network streams so bright,
Events now dance with context's might!
Parameters fly, connections gleam,
A rabbit's code, a programmer's dream!
Writable, readable, events take flight! 🚀

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/net/stream_socket.cc (1)

52-52: Returning the old Boolean value may be confusing.
writeReady_.exchange(true) returns the previous value, which might be used incorrectly by callers. If it’s never used, consider returning a more explicit result (e.g. void or always true) to reduce confusion.

src/net/net_event.h (1)

47-47: Consider documenting the new parameters and their thread-safety implications.

The enhanced OnWritable signature provides better context for debugging and controlling asynchronous writes. However, since this is a virtual interface method used across multiple implementations, it would benefit from clear documentation.

Add documentation for the parameters and thread-safety guarantees:

  // Handle write event when the connection is writable and the data can be sent
+ /**
+  * @param id Connection identifier for tracking the specific data stream
+  * @param fd File descriptor associated with the connection
+  * @param event Pointer to the event context for additional write control
+  * @return Status code indicating the write operation result
+  * @thread_safety This method must be thread-safe as it can be called from different event loops
+  */
  virtual int OnWritable(unsigned long int id, int fd, BaseEvent *event) = 0;

Since this change affects the core network event interface:

  1. Ensure all implementations handle concurrent calls to OnWritable correctly
  2. Consider adding error codes to enum { NE_ERROR... } for specific write failure scenarios
  3. Document if BaseEvent ownership is transferred or if it's just borrowed
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4cf903c and 9f5524e.

📒 Files selected for processing (9)
  • src/net/epoll_event.cc (2 hunks)
  • src/net/kqueue_event.cc (1 hunks)
  • src/net/listen_socket.cc (1 hunks)
  • src/net/listen_socket.h (1 hunks)
  • src/net/net_event.h (2 hunks)
  • src/net/stream_socket.cc (4 hunks)
  • src/net/stream_socket.h (1 hunks)
  • src/net/thread_manager.h (1 hunks)
  • src/net/timer_task.h (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: build_on_ubuntu
  • GitHub Check: build_on_macos
🔇 Additional comments (14)
src/net/stream_socket.h (1)

30-30: Ensure proper usage of the newly added parameters in callers.
Having three parameters (id, fd, and event) may introduce additional complexity and potential for misuse. Double-check that all event-driven code paths properly manage these arguments (for example, ensuring id and fd remain consistent, and not calling DelWriteEvent prematurely).

src/net/listen_socket.h (1)

35-35: Confirm no unintended side effects from added parameters.
This method is declared but not used in typical scenarios for a listening socket. Ensure that any accidental invocation of OnWritable for this socket type is discarded early or handled safely.

src/net/timer_task.h (2)

8-8: Use of #pragma once is good practice.
No issues identified here; it helps avoid multiple-inclusion problems and is widely supported.


28-29: Addition of a virtual destructor is a solid improvement.
This ensures correct cleanup for derived classes.

src/net/stream_socket.cc (4)

16-16: Clarify concurrency semantics around sendData_ and writeQueue_.
The updated OnWritable(...) uses a lock-free queue and atomic flags, but it’s not fully apparent if any data race can occur with concurrent writers from other threads. Confirm that Pop(...) and sendData_ are consistently protected or properly synchronized to avoid race conditions, especially if multiple threads can invoke OnWritable(...).


20-20: Ensure correct re-subscription to write events.
When you remove the write event here, you rely on writeReady_.store(false) to re-subscribe in the future. Confirm that any subsequent writes re-enable the event, especially in edge cases where more than one piece of data becomes available concurrently.


40-40: Potential risk of missed writes.
Again calling event->DelWriteEvent(id, fd) depends on writeReady_ being re-enabled later for new data. Carefully confirm there’s no gap where new data arrives but the write event is not re-registered.


64-67: Error vs. normal closure.
This block logs an error and returns NE_ERROR if ::read fails for any cause not in (EAGAIN, EWOULDBLOCK, ECONNRESET). Then you detect ret == 0 separately. This is correct, but confirm all required edge cases (e.g. partial reads, other OS-level errors) are handled as desired.

src/net/epoll_event.cc (2)

167-168: Confirm changed flow for NE_CLOSE handling.

Previously, checking for NE_CLOSE may have been part of an else if chain. Now it is evaluated separately. Ensure this un-nested if block behaves correctly in all cases, especially if there’s any chance ret could be both NE_ERROR and NE_CLOSE in different conditions.


179-179: Include additional context in writable event processing.

Calling OnWritable(event.data.u64, conn->fd_, this) provides richer context for debugging and asynchronous handling. This appears consistent with the updated signature in ListenSocket and other event classes. Good job aligning these methods.

src/net/kqueue_event.cc (2)

197-201: Validate pointer lifetime in 32-bit vs. 64-bit branches.

In the non-64-bit branch, event.udata is allocated and later deleted. In the 64-bit branch, it is cast directly to uint64_t. Confirm that this approach does not leak memory or cause dangling pointers when switching between architectures.


203-206: Check partial writes and retry logic.

If OnWritable returns NE_ERROR, the code immediately handles it as an error. Ensure partial writes (which are common in non-blocking I/O) are retried and that the NE_ERROR code path only executes when there is a genuine failure, not merely an incomplete send.

src/net/thread_manager.h (1)

267-269: Confirm safe early return on successful send.

Short-circuiting when SendPacket returns true improves efficiency but could hide partial-send scenarios if the underlying socket buffers only some of the data. Verify that SendPacket reliably handles partial writes or maintain a separate mechanism to handle cases where only part of the message is sent.

src/net/net_event.h (1)

31-31: LGTM! Clean forward declaration.

The forward declaration of BaseEvent follows good C++ practices for header organization and compilation efficiency.

@@ -39,7 +39,7 @@ int ListenSocket::OnReadable(const std::shared_ptr<Connection> &conn, std::strin
return newConnFd;
}

int ListenSocket::OnWritable() { return 1; }
int ListenSocket::OnWritable(uint64_t id, int fd, BaseEvent *event) { return 1; }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Implement proper writable event handling logic.

Currently, the method simply returns 1, which looks like a placeholder. Consider returning a more descriptive constant (e.g., NE_OK) and leveraging the parameters (id, fd, event) to handle writable events in a meaningful way (e.g., initiating a non-blocking accept or other logic).

@lqxhub lqxhub marked this pull request as draft January 24, 2025 14:19
@lqxhub lqxhub marked this pull request as ready for review February 15, 2025 09:14
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
src/net/stream_socket.cc (1)

68-91: 🛠️ Refactor suggestion

Enhance error handling and buffer safety.

The current implementation has the following concerns:

  1. The error handling has been simplified, potentially missing specific error cases that might need special handling.
  2. The fixed buffer size could lead to buffer overflow if not properly validated.

Consider these improvements:

 int StreamSocket::Read(std::string *readBuff) {
+  if (!readBuff) {
+    ERROR("StreamSocket fd: {} invalid read buffer", Fd());
+    return NE_ERROR;
+  }
+
+  static constexpr size_t MAX_READ_SIZE = 1024 * 1024;  // 1MB
+  if (readBuff->size() >= MAX_READ_SIZE) {
+    ERROR("StreamSocket fd: {} read buffer overflow", Fd());
+    return NE_ERROR;
+  }
+
   char readBuffer[readBuffSize_];
   while (true) {
     int ret = static_cast<int>(::read(Fd(), readBuffer, readBuffSize_));
     if (ret == -1) {
-      if (EAGAIN == errno || EWOULDBLOCK == errno || ECONNRESET == errno) {
+      if (EAGAIN == errno || EWOULDBLOCK == errno) {
         return NE_OK;
+      } else if (ECONNRESET == errno) {
+        INFO("StreamSocket fd: {} connection reset by peer", Fd());
+        return NE_CLOSE;
       }
       ERROR("StreamSocket fd: {} read error: {}", Fd(), errno);
       return NE_ERROR;
     }
     if (ret == 0) {
+      INFO("StreamSocket fd: {} connection closed by peer", Fd());
       return NE_CLOSE;
     }
     if (ret > 0) {
+      if (readBuff->size() + ret > MAX_READ_SIZE) {
+        ERROR("StreamSocket fd: {} read buffer overflow", Fd());
+        return NE_ERROR;
+      }
       readBuff->append(readBuffer, ret);
     }
     if (!NoBlock()) {
       break;
     }
   }
   return NE_OK;
 }
♻️ Duplicate comments (1)
src/net/listen_socket.cc (1)

43-43: ⚠️ Potential issue

Implement proper writable event handling logic.

Currently, the method simply returns 1, which looks like a placeholder. Consider returning a more descriptive constant (e.g., NE_OK) and leveraging the parameters (id, fd, event) to handle writable events in a meaningful way (e.g., initiating a non-blocking accept or other logic).

Apply this diff to implement proper writable event handling:

-int ListenSocket::OnWritable(uint64_t id, int fd, BaseEvent *event) { return 1; }
+int ListenSocket::OnWritable(uint64_t id, int fd, BaseEvent *event) {
+  // Listen sockets typically don't need writable events
+  event->DelWriteEvent(id, fd);
+  return NE_OK;
+}
🧹 Nitpick comments (3)
src/net/stream_socket.cc (2)

17-52: LGTM! Thread-safe implementation with proper synchronization.

The implementation correctly uses double-check pattern with mutex to prevent race conditions. The event handling is properly synchronized, and appropriate status codes are returned.

Consider extracting the double-check pattern into a helper method to avoid code duplication:

+private:
+  bool IsWriteQueueEmptyWithLock() {
+    std::lock_guard lock(write_mutex_);
+    return writeQueue_.Empty();
+  }
+
+public:
 int StreamSocket::OnWritable(uint64_t id, int fd, BaseEvent *event) {
   if (sendData_.empty()) {
     if (!writeQueue_.Pop(sendData_)) {  // no data to send
-      std::lock_guard lock(write_mutex_);
-      if (writeQueue_.Empty()) {  // double check
+      if (IsWriteQueueEmptyWithLock()) {  // double check
         writeReady_ = false;
         event->DelWriteEvent(id, fd);
       }
     }
     return NE_OK;
   }
   // ... rest of the code ...
   if (writeQueue_.Empty()) {
-    std::lock_guard lock(write_mutex_);
-    if (writeQueue_.Empty()) {  // double check
+    if (IsWriteQueueEmptyWithLock()) {  // double check
       writeReady_ = false;
       event->DelWriteEvent(id, fd);
     }
     return NE_OK;
   }
   // ... rest of the code ...
 }
🧰 Tools
🪛 Cppcheck (2.10-2)

[error] 38-38: Syntax Error

(internalAstError)


54-65: LGTM! Thread-safe implementation with proper callback handling.

The implementation correctly uses atomic exchange and mutex to prevent race conditions. The callback is properly synchronized and only invoked when necessary.

Consider adding a null check for the message to prevent pushing empty messages:

 void StreamSocket::SendPacket(std::string &&msg, std::function<void()> addWriteFlag) {
+  if (msg.empty()) {
+    return;
+  }
   bool sendOver;
   do {
     sendOver = writeQueue_.Push(msg);
   } while (!sendOver);
   if (!writeReady_.exchange(true)) {
     std::lock_guard lock(write_mutex_);
     if (addWriteFlag) {
       addWriteFlag();
     }
   }
 }
src/net/listen_socket.cc (1)

45-45: LGTM! Empty implementation is appropriate for listen sockets.

Since listen sockets are used for accepting new connections and don't need to send packets, the empty implementation is appropriate.

Consider adding a comment to document the intentional empty implementation:

-void ListenSocket::SendPacket(std::string &&msg, std::function<void()> addWriteFlag) {}
+// Listen sockets don't need to send packets, so this is intentionally empty
+void ListenSocket::SendPacket(std::string &&msg, std::function<void()> addWriteFlag) {}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7002a80 and 432e686.

📒 Files selected for processing (10)
  • src/net/config.h (1 hunks)
  • src/net/epoll_event.cc (2 hunks)
  • src/net/kqueue_event.cc (2 hunks)
  • src/net/listen_socket.cc (1 hunks)
  • src/net/listen_socket.h (1 hunks)
  • src/net/net_event.h (2 hunks)
  • src/net/stream_socket.cc (3 hunks)
  • src/net/stream_socket.h (2 hunks)
  • src/net/thread_manager.h (1 hunks)
  • src/net/timer_task.h (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/net/timer_task.h
  • src/net/thread_manager.h
  • src/net/kqueue_event.cc
  • src/net/epoll_event.cc
🔇 Additional comments (5)
src/net/config.h (1)

24-25: LGTM! Platform detection enhanced for ARM64.

The addition of __arm64__ to the 64-bit architecture detection is appropriate and maintains consistency with existing platform checks.

src/net/stream_socket.h (2)

30-30: Improved thread safety and error handling in write operations.

The updated method signatures enhance the handling of asynchronous writes:

  1. OnWritable now receives context (id, fd, event) for proper event management
  2. SendPacket includes a callback for write flag management, improving coordination of write operations

Also applies to: 32-32


46-46: Enhanced thread safety with write mutex.

The addition of write_mutex_ provides necessary synchronization for concurrent write operations, directly addressing the multi-threaded data writing issue.

src/net/net_event.h (2)

31-32: LGTM! Added BaseEvent forward declaration.

The forward declaration enables type-safe event pointer passing while maintaining header independence.


47-47: Enhanced event handling interface.

The updated virtual method signatures provide:

  1. Comprehensive context for write operations through OnWritable
  2. Flexible write flag management through SendPacket callback

Also applies to: 52-52

Comment on lines 35 to +39
// The function is cant be used
int OnWritable() override;
int OnWritable(uint64_t id, int fd, BaseEvent *event) override;

// The function is cant be used
bool SendPacket(std::string &&msg) override;
void SendPacket(std::string &&msg, std::function<void()> addWriteFlag) override;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using pure virtual methods or throwing exceptions for unusable operations.

The methods are marked as "cant be used" in comments, but they're still implemented. This could lead to confusion or runtime errors. Consider:

  1. Making these methods pure virtual and not implementing them in ListenSocket
  2. Or throwing an appropriate exception with a clear message

Example implementation:

-  // The function is cant be used
-  int OnWritable(uint64_t id, int fd, BaseEvent *event) override;
+  int OnWritable(uint64_t id, int fd, BaseEvent *event) override {
+    throw std::runtime_error("OnWritable operation not supported for ListenSocket");
+  }

-  // The function is cant be used
-  void SendPacket(std::string &&msg, std::function<void()> addWriteFlag) override;
+  void SendPacket(std::string &&msg, std::function<void()> addWriteFlag) override {
+    throw std::runtime_error("SendPacket operation not supported for ListenSocket");
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// The function is cant be used
int OnWritable() override;
int OnWritable(uint64_t id, int fd, BaseEvent *event) override;
// The function is cant be used
bool SendPacket(std::string &&msg) override;
void SendPacket(std::string &&msg, std::function<void()> addWriteFlag) override;
int OnWritable(uint64_t id, int fd, BaseEvent *event) override {
throw std::runtime_error("OnWritable operation not supported for ListenSocket");
}
void SendPacket(std::string &&msg, std::function<void()> addWriteFlag) override {
throw std::runtime_error("SendPacket operation not supported for ListenSocket");
}

Copy link
Collaborator

@marsevilspirit marsevilspirit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@AlexStocks AlexStocks merged commit 10fa247 into unstable Feb 17, 2025
5 of 6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants