Skip to content

Commit 4f17cd5

Browse files
author
Benjamin Sergeant
committed
(cobra bots) do not use a queue to store messages pending processing, let the bot handle queuing
1 parent b047644 commit 4f17cd5

19 files changed

+89
-286
lines changed

docs/CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# Changelog
22
All changes to this project will be documented in this file.
33

4+
## [9.5.4] - 2020-05-04
5+
6+
(cobra bots) do not use a queue to store messages pending processing, let the bot handle queuing
7+
48
## [9.5.3] - 2020-04-29
59

610
(http client) better current request cancellation support when the HttpClient destructor is invoked (see #189)

ixbots/CMakeLists.txt

-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ set (IXBOTS_SOURCES
88
ixbots/IXCobraToSentryBot.cpp
99
ixbots/IXCobraToStatsdBot.cpp
1010
ixbots/IXCobraToStdoutBot.cpp
11-
ixbots/IXQueueManager.cpp
1211
ixbots/IXStatsdClient.cpp
1312
)
1413

@@ -17,7 +16,6 @@ set (IXBOTS_HEADERS
1716
ixbots/IXCobraToSentryBot.h
1817
ixbots/IXCobraToStatsdBot.h
1918
ixbots/IXCobraToStdoutBot.h
20-
ixbots/IXQueueManager.h
2119
ixbots/IXStatsdClient.h
2220
)
2321

ixbots/ixbots/IXCobraBot.cpp

+3-68
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
#include "IXCobraBot.h"
88

9-
#include "IXQueueManager.h"
109
#include <ixcobra/IXCobraConnection.h>
1110
#include <ixcore/utils/IXCoreLogger.h>
1211

@@ -23,8 +22,6 @@ namespace ix
2322
const std::string& filter,
2423
const std::string& position,
2524
bool verbose,
26-
size_t maxQueueSize,
27-
bool useQueue,
2825
bool enableHeartbeat,
2926
int runtime)
3027
{
@@ -43,8 +40,6 @@ namespace ix
4340
std::atomic<bool> throttled(false);
4441
std::atomic<bool> fatalCobraError(false);
4542

46-
QueueManager queueManager(maxQueueSize);
47-
4843
auto timer = [&sentCount,
4944
&receivedCount,
5045
&sentCountTotal,
@@ -114,40 +109,6 @@ namespace ix
114109

115110
std::thread t2(heartbeat);
116111

117-
auto sender =
118-
[this, &queueManager, verbose, &sentCount, &stop, &throttled, &fatalCobraError] {
119-
while (true)
120-
{
121-
auto data = queueManager.pop();
122-
Json::Value msg = data.first;
123-
std::string position = data.second;
124-
125-
if (stop) break;
126-
if (msg.isNull()) continue;
127-
128-
if (_onBotMessageCallback &&
129-
_onBotMessageCallback(msg, position, verbose, throttled, fatalCobraError))
130-
{
131-
// That might be too noisy
132-
if (verbose)
133-
{
134-
CoreLogger::info("cobra bot: sending succesfull");
135-
}
136-
++sentCount;
137-
}
138-
else
139-
{
140-
CoreLogger::error("cobra bot: error sending");
141-
}
142-
143-
if (stop) break;
144-
}
145-
146-
CoreLogger::info("sender thread done");
147-
};
148-
149-
std::thread t3(sender);
150-
151112
std::string subscriptionPosition(position);
152113

153114
conn.setEventCallback([this,
@@ -160,8 +121,6 @@ namespace ix
160121
&throttled,
161122
&receivedCount,
162123
&fatalCobraError,
163-
&useQueue,
164-
&queueManager,
165124
&sentCount](const CobraEventPtr& event) {
166125
if (event->type == ix::CobraEventType::Open)
167126
{
@@ -190,8 +149,6 @@ namespace ix
190149
verbose,
191150
&throttled,
192151
&receivedCount,
193-
&queueManager,
194-
&useQueue,
195152
&subscriptionPosition,
196153
&fatalCobraError,
197154
&sentCount](const Json::Value& msg, const std::string& position) {
@@ -211,28 +168,9 @@ namespace ix
211168

212169
++receivedCount;
213170

214-
if (useQueue)
215-
{
216-
queueManager.add(msg, position);
217-
}
218-
else
219-
{
220-
if (_onBotMessageCallback &&
221-
_onBotMessageCallback(
222-
msg, position, verbose, throttled, fatalCobraError))
223-
{
224-
// That might be too noisy
225-
if (verbose)
226-
{
227-
CoreLogger::info("cobra bot: sending succesfull");
228-
}
229-
++sentCount;
230-
}
231-
else
232-
{
233-
CoreLogger::error("cobra bot: error sending");
234-
}
235-
}
171+
_onBotMessageCallback(
172+
msg, position, verbose,
173+
throttled, fatalCobraError, sentCount);
236174
});
237175
}
238176
else if (event->type == ix::CobraEventType::Subscribed)
@@ -308,9 +246,6 @@ namespace ix
308246
// heartbeat thread
309247
if (t2.joinable()) t2.join();
310248

311-
// sentry sender thread
312-
t3.join();
313-
314249
return fatalCobraError ? -1 : (int64_t) sentCount;
315250
}
316251

ixbots/ixbots/IXCobraBot.h

+3-4
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@
1414

1515
namespace ix
1616
{
17-
using OnBotMessageCallback = std::function<bool(const Json::Value&,
17+
using OnBotMessageCallback = std::function<void(const Json::Value&,
1818
const std::string&,
1919
const bool verbose,
2020
std::atomic<bool>&,
21-
std::atomic<bool>&)>;
21+
std::atomic<bool>&,
22+
std::atomic<uint64_t>&)>;
2223

2324
class CobraBot
2425
{
@@ -30,8 +31,6 @@ namespace ix
3031
const std::string& filter,
3132
const std::string& position,
3233
bool verbose,
33-
size_t maxQueueSize,
34-
bool useQueue,
3534
bool enableHeartbeat,
3635
int runtime);
3736

ixbots/ixbots/IXCobraToSentryBot.cpp

+54-61
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
#include "IXCobraToSentryBot.h"
88

99
#include "IXCobraBot.h"
10-
#include "IXQueueManager.h"
1110
#include <ixcobra/IXCobraConnection.h>
1211
#include <ixcore/utils/IXCoreLogger.h>
1312

@@ -23,7 +22,6 @@ namespace ix
2322
const std::string& position,
2423
SentryClient& sentryClient,
2524
bool verbose,
26-
size_t maxQueueSize,
2725
bool enableHeartbeat,
2826
int runtime)
2927
{
@@ -32,85 +30,80 @@ namespace ix
3230
const std::string& /*position*/,
3331
const bool verbose,
3432
std::atomic<bool>& throttled,
35-
std::atomic<bool> &
36-
/*fatalCobraError*/) -> bool {
37-
auto ret = sentryClient.send(msg, verbose);
38-
HttpResponsePtr response = ret.first;
39-
40-
if (!response)
41-
{
42-
CoreLogger::warn("Null HTTP Response");
43-
return false;
44-
}
45-
46-
if (verbose)
47-
{
48-
for (auto it : response->headers)
33+
std::atomic<bool>& /*fatalCobraError*/,
34+
std::atomic<uint64_t>& sentCount) -> void {
35+
sentryClient.send(msg, verbose,
36+
[&sentCount, &throttled, &verbose](const HttpResponsePtr& response) {
37+
if (!response)
4938
{
50-
CoreLogger::info(it.first + ": " + it.second);
39+
CoreLogger::warn("Null HTTP Response");
40+
return;
5141
}
5242

53-
CoreLogger::info("Upload size: " + std::to_string(response->uploadSize));
54-
CoreLogger::info("Download size: " + std::to_string(response->downloadSize));
55-
56-
CoreLogger::info("Status: " + std::to_string(response->statusCode));
57-
if (response->errorCode != HttpErrorCode::Ok)
43+
if (verbose)
5844
{
59-
CoreLogger::info("error message: " + response->errorMsg);
60-
}
45+
for (auto it : response->headers)
46+
{
47+
CoreLogger::info(it.first + ": " + it.second);
48+
}
6149

62-
if (response->headers["Content-Type"] != "application/octet-stream")
63-
{
64-
CoreLogger::info("payload: " + response->payload);
65-
}
66-
}
50+
CoreLogger::info("Upload size: " + std::to_string(response->uploadSize));
51+
CoreLogger::info("Download size: " + std::to_string(response->downloadSize));
6752

68-
bool success = response->statusCode == 200;
53+
CoreLogger::info("Status: " + std::to_string(response->statusCode));
54+
if (response->errorCode != HttpErrorCode::Ok)
55+
{
56+
CoreLogger::info("error message: " + response->errorMsg);
57+
}
6958

70-
if (!success)
71-
{
72-
CoreLogger::error("Error sending data to sentry: " + std::to_string(response->statusCode));
73-
CoreLogger::error("Body: " + ret.second);
74-
CoreLogger::error("Response: " + response->payload);
59+
if (response->headers["Content-Type"] != "application/octet-stream")
60+
{
61+
CoreLogger::info("payload: " + response->payload);
62+
}
63+
}
7564

76-
// Error 429 Too Many Requests
77-
if (response->statusCode == 429)
65+
if (response->statusCode == 200)
66+
{
67+
sentCount++;
68+
}
69+
else
7870
{
79-
auto retryAfter = response->headers["Retry-After"];
80-
std::stringstream ss;
81-
ss << retryAfter;
82-
int seconds;
83-
ss >> seconds;
71+
CoreLogger::error("Error sending data to sentry: " + std::to_string(response->statusCode));
72+
CoreLogger::error("Response: " + response->payload);
8473

85-
if (!ss.eof() || ss.fail())
74+
// Error 429 Too Many Requests
75+
if (response->statusCode == 429)
8676
{
87-
seconds = 30;
88-
CoreLogger::warn("Error parsing Retry-After header. "
89-
"Using " + retryAfter + " for the sleep duration");
77+
auto retryAfter = response->headers["Retry-After"];
78+
std::stringstream ss;
79+
ss << retryAfter;
80+
int seconds;
81+
ss >> seconds;
82+
83+
if (!ss.eof() || ss.fail())
84+
{
85+
seconds = 30;
86+
CoreLogger::warn("Error parsing Retry-After header. "
87+
"Using " + retryAfter + " for the sleep duration");
88+
}
89+
90+
CoreLogger::warn("Error 429 - Too Many Requests. ws will sleep "
91+
"and retry after " + retryAfter + " seconds");
92+
93+
throttled = true;
94+
auto duration = std::chrono::seconds(seconds);
95+
std::this_thread::sleep_for(duration);
96+
throttled = false;
9097
}
91-
92-
CoreLogger::warn("Error 429 - Too Many Requests. ws will sleep "
93-
"and retry after " + retryAfter + " seconds");
94-
95-
throttled = true;
96-
auto duration = std::chrono::seconds(seconds);
97-
std::this_thread::sleep_for(duration);
98-
throttled = false;
9998
}
100-
}
101-
102-
return success;
99+
});
103100
});
104101

105-
bool useQueue = true;
106-
107102
return bot.run(config,
108103
channel,
109104
filter,
110105
position,
111106
verbose,
112-
maxQueueSize,
113-
useQueue,
114107
enableHeartbeat,
115108
runtime);
116109
}

ixbots/ixbots/IXCobraToSentryBot.h

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ namespace ix
1818
const std::string& position,
1919
SentryClient& sentryClient,
2020
bool verbose,
21-
size_t maxQueueSize,
2221
bool enableHeartbeat,
2322
int runtime);
2423
} // namespace ix

ixbots/ixbots/IXCobraToStatsdBot.cpp

+4-9
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
#include "IXCobraToStatsdBot.h"
88

99
#include "IXCobraBot.h"
10-
#include "IXQueueManager.h"
1110
#include "IXStatsdClient.h"
1211
#include <chrono>
1312
#include <ixcobra/IXCobraConnection.h>
@@ -63,7 +62,6 @@ namespace ix
6362
const std::string& gauge,
6463
const std::string& timer,
6564
bool verbose,
66-
size_t maxQueueSize,
6765
bool enableHeartbeat,
6866
int runtime)
6967
{
@@ -79,7 +77,8 @@ namespace ix
7977
const std::string& /*position*/,
8078
const bool verbose,
8179
std::atomic<bool>& /*throttled*/,
82-
std::atomic<bool>& fatalCobraError) -> bool {
80+
std::atomic<bool>& fatalCobraError,
81+
std::atomic<uint64_t>& sentCount) -> void {
8382
std::string id;
8483
for (auto&& attr : tokens)
8584
{
@@ -122,7 +121,7 @@ namespace ix
122121
{
123122
CoreLogger::error("Gauge " + gauge + " is not a numeric type");
124123
fatalCobraError = true;
125-
return false;
124+
return;
126125
}
127126

128127
if (verbose)
@@ -140,18 +139,14 @@ namespace ix
140139
}
141140
}
142141

143-
return true;
142+
sentCount++;
144143
});
145144

146-
bool useQueue = true;
147-
148145
return bot.run(config,
149146
channel,
150147
filter,
151148
position,
152149
verbose,
153-
maxQueueSize,
154-
useQueue,
155150
enableHeartbeat,
156151
runtime);
157152
}

0 commit comments

Comments
 (0)