18
18
#include " cpprest/details/x509_cert_utilities.h"
19
19
#include " pplx/threadpool.h"
20
20
21
+ #include " ws_client_impl.h"
22
+
21
23
// Force websocketpp to use C++ std::error_code instead of Boost.
22
24
#define _WEBSOCKETPP_CPP11_SYSTEM_ERROR_
23
25
#if defined(_MSC_VER)
@@ -117,8 +119,7 @@ class wspp_callback_client : public websocket_client_callback_impl, public std::
117
119
public:
118
120
wspp_callback_client (websocket_client_config config) :
119
121
websocket_client_callback_impl (std::move(config)),
120
- m_state (CREATED),
121
- m_num_sends (0 )
122
+ m_state (CREATED)
122
123
#if defined(__APPLE__) || (defined(ANDROID) || defined(__ANDROID__)) || defined(_WIN32)
123
124
, m_openssl_failed(false )
124
125
#endif
@@ -402,10 +403,10 @@ class wspp_callback_client : public websocket_client_callback_impl, public std::
402
403
{
403
404
case websocket_message_type::text_message:
404
405
case websocket_message_type::binary_message:
405
- case websocket_message_type::pong:
406
+ case websocket_message_type::pong:
406
407
break ;
407
408
default :
408
- return pplx::task_from_exception<void >(websocket_exception (" Invalid message type " ));
409
+ return pplx::task_from_exception<void >(websocket_exception (" Message Type not supported. " ));
409
410
}
410
411
411
412
const auto length = msg.m_length ;
@@ -418,18 +419,13 @@ class wspp_callback_client : public websocket_client_callback_impl, public std::
418
419
return pplx::task_from_exception<void >(websocket_exception (" Message size too large. Ensure message length is less than UINT_MAX." ));
419
420
}
420
421
422
+ auto msg_pending = m_out_queue.push (msg);
423
+
424
+ // No sends in progress
425
+ if (msg_pending == outgoing_msg_queue::state::was_empty)
421
426
{
422
- if (++m_num_sends == 1 ) // No sends in progress
423
- {
424
- // Start sending the message
425
- send_msg (msg);
426
- }
427
- else
428
- {
429
- // Only actually have to take the lock if touching the queue.
430
- std::lock_guard<std::mutex> lock (m_send_lock);
431
- m_outgoing_msg_queue.push (msg);
432
- }
427
+ // Start sending the message
428
+ send_msg (msg);
433
429
}
434
430
435
431
return pplx::create_task (msg.body_sent ());
@@ -565,16 +561,12 @@ class wspp_callback_client : public websocket_client_callback_impl, public std::
565
561
msg.signal_body_sent ();
566
562
}
567
563
568
- if (--this_client->m_num_sends > 0 )
564
+ websocket_outgoing_message next_msg;
565
+ bool msg_pending = this_client->m_out_queue .pop_and_peek (next_msg);
566
+
567
+ if (msg_pending)
569
568
{
570
- // Only hold the lock when actually touching the queue.
571
- websocket_outgoing_message next_msg;
572
- {
573
- std::lock_guard<std::mutex> lock (this_client->m_send_lock );
574
- next_msg = this_client->m_outgoing_msg_queue .front ();
575
- this_client->m_outgoing_msg_queue .pop ();
576
- }
577
- this_client->send_msg (next_msg);
569
+ this_client->send_msg (next_msg);
578
570
}
579
571
});
580
572
}
@@ -669,19 +661,19 @@ class wspp_callback_client : public websocket_client_callback_impl, public std::
669
661
ec);
670
662
break ;
671
663
case websocket_message_type::binary_message:
672
- client.send (
664
+ client.send (
673
665
this_client->m_con ,
674
666
sp_allocated.get (),
675
667
length,
676
668
websocketpp::frame::opcode::binary,
677
669
ec);
678
670
break ;
679
- case websocket_message_type::pong:
680
- client.pong (
681
- this_client->m_con ,
682
- " " ,
683
- ec);
684
- break ;
671
+ case websocket_message_type::pong:
672
+ client.pong (
673
+ this_client->m_con ,
674
+ " " ,
675
+ ec);
676
+ break ;
685
677
default :
686
678
// This case should have already been filtered above.
687
679
std::abort ();
@@ -763,14 +755,8 @@ class wspp_callback_client : public websocket_client_callback_impl, public std::
763
755
State m_state;
764
756
std::unique_ptr<websocketpp_client_base> m_client;
765
757
766
- // Guards access to m_outgoing_msg_queue
767
- std::mutex m_send_lock;
768
-
769
- // Queue to order the sends
770
- std::queue<websocket_outgoing_message> m_outgoing_msg_queue;
771
-
772
- // Number of sends in progress and queued up.
773
- std::atomic<int > m_num_sends;
758
+ // Queue to track pending sends
759
+ outgoing_msg_queue m_out_queue;
774
760
775
761
// External callback for handling received and close event
776
762
std::function<void (websocket_incoming_message)> m_external_message_handler;
0 commit comments