Skip to content

Commit 299dc04

Browse files
committed
(ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd
1 parent f4af84d commit 299dc04

File tree

4 files changed

+124
-31
lines changed

4 files changed

+124
-31
lines changed

docs/CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# Changelog
22
All changes to this project will be documented in this file.
33

4+
## [7.8.6] - 2019-12-28
5+
6+
(ws cobra to sentry/statsd) fix for handling null events properly for empty queues + use queue to send data to statsd
7+
48
## [7.8.5] - 2019-12-28
59

610
(ws cobra to sentry) handle null events for empty queues

ixwebsocket/IXWebSocketVersion.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66

77
#pragma once
88

9-
#define IX_WEBSOCKET_VERSION "7.8.5"
9+
#define IX_WEBSOCKET_VERSION "7.8.6"

ws/ws_cobra_to_sentry.cpp

+5-6
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ namespace ix
2222
class QueueManager
2323
{
2424
public:
25-
QueueManager(size_t maxQueueSize, std::atomic<bool> &stop) : _maxQueueSize(maxQueueSize), _stop(stop) {}
25+
QueueManager(size_t maxQueueSize,
26+
std::atomic<bool> &stop) :
27+
_maxQueueSize(maxQueueSize),
28+
_stop(stop) {}
2629

2730
Json::Value pop();
2831
void add(Json::Value msg);
@@ -146,11 +149,7 @@ namespace ix
146149
{
147150
Json::Value msg = queueManager.pop();
148151

149-
while (msg.isNull())
150-
{
151-
msg = queueManager.pop();
152-
if (stop) return;
153-
}
152+
if (msg.isNull()) continue;
154153
if (stop) return;
155154

156155
auto ret = sentryClient.send(msg, verbose);

ws/ws_cobra_to_statsd.cpp

+114-24
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <atomic>
88
#include <chrono>
9+
#include <condition_variable>
910
#include <ixcobra/IXCobraConnection.h>
1011
#include <spdlog/spdlog.h>
1112
#include <sstream>
@@ -16,6 +17,58 @@
1617
#include <statsd_client.h>
1718
#endif
1819

20+
namespace
21+
{
22+
class QueueManager
23+
{
24+
public:
25+
QueueManager(size_t maxQueueSize,
26+
std::atomic<bool> &stop) :
27+
_maxQueueSize(maxQueueSize),
28+
_stop(stop) {}
29+
30+
Json::Value pop();
31+
void add(Json::Value msg);
32+
33+
private:
34+
std::queue<Json::Value> _queue;
35+
std::mutex _mutex;
36+
std::condition_variable _condition;
37+
size_t _maxQueueSize;
38+
std::atomic<bool>& _stop;
39+
};
40+
41+
Json::Value QueueManager::pop()
42+
{
43+
std::unique_lock<std::mutex> lock(_mutex);
44+
45+
if (_queue.empty())
46+
{
47+
Json::Value val;
48+
return val;
49+
}
50+
51+
_condition.wait(lock, [this] { return !_stop; });
52+
53+
auto msg = _queue.front();
54+
_queue.pop();
55+
return msg;
56+
}
57+
58+
void QueueManager::add(Json::Value msg)
59+
{
60+
std::unique_lock<std::mutex> lock(_mutex);
61+
62+
// if the sending is not fast enough there is no point
63+
// in queuing too many events.
64+
if (_queue.size() < _maxQueueSize)
65+
{
66+
_queue.push(msg);
67+
_condition.notify_one();
68+
}
69+
}
70+
}
71+
1972
namespace ix
2073
{
2174
// fields are command line argument that can be specified multiple times
@@ -79,27 +132,72 @@ namespace ix
79132

80133
auto tokens = parseFields(fields);
81134

82-
// statsd client
83-
// test with netcat as a server: `nc -ul 8125`
84-
bool statsdBatch = true;
85-
#ifndef _WIN32
86-
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
135+
Json::FastWriter jsonWriter;
136+
std::atomic<uint64_t> sentCount(0);
137+
std::atomic<uint64_t> receivedCount(0);
138+
std::atomic<bool> stop(false);
139+
140+
size_t maxQueueSize = 1000;
141+
QueueManager queueManager(maxQueueSize, stop);
142+
143+
auto timer = [&sentCount, &receivedCount] {
144+
while (true)
145+
{
146+
spdlog::info("messages received {} sent {}", receivedCount, sentCount);
147+
148+
auto duration = std::chrono::seconds(1);
149+
std::this_thread::sleep_for(duration);
150+
}
151+
};
152+
153+
std::thread t1(timer);
154+
155+
auto statsdSender = [&queueManager,
156+
&host,
157+
&port,
158+
&sentCount,
159+
&tokens,
160+
&prefix,
161+
&stop] {
162+
// statsd client
163+
// test with netcat as a server: `nc -ul 8125`
164+
bool statsdBatch = true;
165+
#ifndef _WIN32
166+
statsd::StatsdClient statsdClient(host, port, prefix, statsdBatch);
87167
#else
88-
int statsdClient;
168+
int statsdClient;
89169
#endif
170+
while (true)
171+
{
172+
Json::Value msg = queueManager.pop();
90173

91-
Json::FastWriter jsonWriter;
92-
uint64_t msgCount = 0;
174+
if (msg.isNull()) continue;
175+
if (stop) return;
176+
177+
std::string id;
178+
for (auto&& attr : tokens)
179+
{
180+
id += ".";
181+
id += extractAttr(attr, msg);
182+
}
183+
184+
sentCount += 1;
185+
186+
#ifndef _WIN32
187+
statsdClient.count(id, 1);
188+
#endif
189+
}
190+
};
191+
192+
std::thread t2(statsdSender);
93193

94194
conn.setEventCallback([&conn,
95195
&channel,
96196
&filter,
97197
&jsonWriter,
98-
&statsdClient,
99198
verbose,
100-
&tokens,
101-
&prefix,
102-
&msgCount](ix::CobraConnectionEventType eventType,
199+
&queueManager,
200+
&receivedCount](ix::CobraConnectionEventType eventType,
103201
const std::string& errMsg,
104202
const ix::WebSocketHttpHeaders& headers,
105203
const std::string& subscriptionId,
@@ -122,25 +220,17 @@ namespace ix
122220
spdlog::info("Subscriber authenticated");
123221
conn.subscribe(channel,
124222
filter,
125-
[&jsonWriter, &statsdClient, verbose, &tokens, &prefix, &msgCount](
223+
[&jsonWriter, &queueManager, verbose, &receivedCount](
126224
const Json::Value& msg) {
127225
if (verbose)
128226
{
129227
spdlog::info(jsonWriter.write(msg));
130228
}
131229

132-
std::string id;
133-
for (auto&& attr : tokens)
134-
{
135-
id += ".";
136-
id += extractAttr(attr, msg);
137-
}
138-
139-
spdlog::info("{} {}{}", msgCount++, prefix, id);
230+
receivedCount++;
140231

141-
#ifndef _WIN32
142-
statsdClient.count(id, 1);
143-
#endif
232+
++receivedCount;
233+
queueManager.add(msg);
144234
});
145235
}
146236
else if (eventType == ix::CobraConnection_EventType_Subscribed)

0 commit comments

Comments
 (0)