Skip to content

Commit 5897de6

Browse files
committed
(server) attempt at fixing #131 by using blocking writes in server mode
1 parent cca304f commit 5897de6

File tree

4 files changed

+48
-18
lines changed

4 files changed

+48
-18
lines changed

docs/CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# Changelog
22
All notable changes to this project will be documented in this file.
33

4+
## [7.5.3] - 2019-12-12
5+
6+
(server) attempt at fixing #131 by using blocking writes in server mode
7+
48
## [7.5.2] - 2019-12-11
59

610
(ws) cobra to sentry - created events with sentry tags based on tags present in the cobra messages

ixwebsocket/IXWebSocketTransport.cpp

+36-16
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ namespace ix
7777

7878
WebSocketTransport::WebSocketTransport()
7979
: _useMask(true)
80+
, _blockingSend(false)
8081
, _compressedMessage(false)
8182
, _readyState(ReadyState::CLOSED)
8283
, _closeCode(WebSocketCloseConstants::kInternalErrorCode)
@@ -178,6 +179,7 @@ namespace ix
178179

179180
// Server should not mask the data it sends to the client
180181
_useMask = false;
182+
_blockingSend = true;
181183

182184
_socket = socket;
183185

@@ -339,22 +341,9 @@ namespace ix
339341
// there can be a lot of it for large messages.
340342
if (pollResult == PollResultType::SendRequest)
341343
{
342-
while (!isSendBufferEmpty() && !_requestInitCancellation)
344+
if (!flushSendBuffer())
343345
{
344-
// Wait with a 10ms timeout until the socket is ready to write.
345-
// This way we are not busy looping
346-
PollResultType result = _socket->isReadyToWrite(10);
347-
348-
if (result == PollResultType::Error)
349-
{
350-
closeSocket();
351-
setReadyState(ReadyState::CLOSED);
352-
break;
353-
}
354-
else if (result == PollResultType::ReadyForWrite)
355-
{
356-
sendOnSocket();
357-
}
346+
return PollResult::CannotFlushSendBuffer;
358347
}
359348
}
360349
else if (pollResult == PollResultType::ReadyForRead)
@@ -924,13 +913,21 @@ namespace ix
924913
}
925914
}
926915

916+
bool success = true;
917+
927918
// Request to flush the send buffer on the background thread if it isn't empty
928919
if (!isSendBufferEmpty())
929920
{
930921
_socket->wakeUpFromPoll(Socket::kSendRequest);
922+
923+
// FIXME: we should have a timeout when sending large messages: see #131
924+
if (_blockingSend && !flushSendBuffer())
925+
{
926+
success = false;
927+
}
931928
}
932929

933-
return WebSocketSendInfo(true, compressionError, payloadSize, wireSize);
930+
return WebSocketSendInfo(success, compressionError, payloadSize, wireSize);
934931
}
935932

936933
void WebSocketTransport::sendFragment(wsheader_type::opcode_type type,
@@ -1168,4 +1165,27 @@ namespace ix
11681165
return _txbuf.size();
11691166
}
11701167

1168+
bool WebSocketTransport::flushSendBuffer()
1169+
{
1170+
while (!isSendBufferEmpty() && !_requestInitCancellation)
1171+
{
1172+
// Wait with a 10ms timeout until the socket is ready to write.
1173+
// This way we are not busy looping
1174+
PollResultType result = _socket->isReadyToWrite(10);
1175+
1176+
if (result == PollResultType::Error)
1177+
{
1178+
closeSocket();
1179+
setReadyState(ReadyState::CLOSED);
1180+
return false;
1181+
}
1182+
else if (result == PollResultType::ReadyForWrite)
1183+
{
1184+
sendOnSocket();
1185+
}
1186+
}
1187+
1188+
return true;
1189+
}
1190+
11711191
} // namespace ix

ixwebsocket/IXWebSocketTransport.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ namespace ix
6161
enum class PollResult
6262
{
6363
Succeeded,
64-
AbnormalClose
64+
AbnormalClose,
65+
CannotFlushSendBuffer
6566
};
6667

6768
using OnMessageCallback =
@@ -135,6 +136,10 @@ namespace ix
135136
// client should mask but server should not
136137
std::atomic<bool> _useMask;
137138

139+
// Tells whether we should flush the send buffer before
140+
// saying that a send is complete. This is the mode for server code.
141+
std::atomic<bool> _blockingSend;
142+
138143
// Buffer for reading from our socket. That buffer is never resized.
139144
std::vector<uint8_t> _readbuf;
140145

@@ -238,6 +243,7 @@ namespace ix
238243
size_t closeWireSize,
239244
bool remote);
240245

246+
bool flushSendBuffer();
241247
void sendOnSocket();
242248
WebSocketSendInfo sendData(wsheader_type::opcode_type type,
243249
const std::string& message,

ixwebsocket/IXWebSocketVersion.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66

77
#pragma once
88

9-
#define IX_WEBSOCKET_VERSION "7.5.2"
9+
#define IX_WEBSOCKET_VERSION "7.5.3"

0 commit comments

Comments
 (0)