9
9
#include < condition_variable>
10
10
#include < ixcobra/IXCobraConnection.h>
11
11
#include < ixsentry/IXSentryClient.h>
12
+ #include < map>
12
13
#include < mutex>
13
14
#include < queue>
14
15
#include < spdlog/spdlog.h>
15
16
#include < sstream>
16
17
#include < thread>
17
18
#include < vector>
18
- #include < map>
19
19
20
20
namespace ix
21
21
{
22
22
class QueueManager
23
23
{
24
24
public:
25
- QueueManager (size_t maxQueueSize,
26
- std::atomic<bool > &stop) :
27
- _maxQueueSize (maxQueueSize),
28
- _stop (stop) {}
25
+ QueueManager (size_t maxQueueSize, std::atomic<bool >& stop)
26
+ : _maxQueueSize(maxQueueSize)
27
+ , _stop(stop)
28
+ {
29
+ }
29
30
30
31
Json::Value pop ();
31
32
void add (Json::Value msg);
@@ -136,95 +137,90 @@ namespace ix
136
137
137
138
std::thread t1 (timer);
138
139
139
- auto sentrySender = [&queueManager,
140
- verbose,
141
- &errorSending,
142
- &sentCount,
143
- &stop,
144
- &throttled,
145
- &dsn] {
146
- SentryClient sentryClient (dsn);
147
-
148
- while (true )
149
- {
150
- Json::Value msg = queueManager.pop ();
151
-
152
- if (msg.isNull ()) continue ;
153
- if (stop) return ;
154
-
155
- auto ret = sentryClient.send (msg, verbose);
156
- HttpResponsePtr response = ret.first ;
140
+ auto sentrySender =
141
+ [&queueManager, verbose, &errorSending, &sentCount, &stop, &throttled, &dsn] {
142
+ SentryClient sentryClient (dsn);
157
143
158
- if (!response )
144
+ while ( true )
159
145
{
160
- spdlog::warn (" Null HTTP Response" );
161
- continue ;
162
- }
146
+ Json::Value msg = queueManager.pop ();
163
147
164
- if (verbose)
165
- {
166
- for (auto it : response->headers )
167
- {
168
- spdlog::info (" {}: {}" , it.first , it.second );
169
- }
148
+ if (msg.isNull ()) continue ;
149
+ if (stop) return ;
170
150
171
- spdlog::info ( " Upload size: {} " , response-> uploadSize );
172
- spdlog::info ( " Download size: {} " , response-> downloadSize ) ;
151
+ auto ret = sentryClient. send (msg, verbose );
152
+ HttpResponsePtr response = ret. first ;
173
153
174
- spdlog::info (" Status: {}" , response->statusCode );
175
- if (response->errorCode != HttpErrorCode::Ok)
154
+ if (!response)
176
155
{
177
- spdlog::info (" error message: {}" , response->errorMsg );
156
+ spdlog::warn (" Null HTTP Response" );
157
+ continue ;
178
158
}
179
159
180
- if (response-> headers [ " Content-Type " ] != " application/octet-stream " )
160
+ if (verbose )
181
161
{
182
- spdlog::info (" payload: {}" , response->payload );
183
- }
184
- }
162
+ for (auto it : response->headers )
163
+ {
164
+ spdlog::info (" {}: {}" , it.first , it.second );
165
+ }
185
166
186
- if (response->statusCode != 200 )
187
- {
188
- spdlog::error (" Error sending data to sentry: {}" , response->statusCode );
189
- spdlog::error (" Body: {}" , ret.second );
190
- spdlog::error (" Response: {}" , response->payload );
191
- errorSending = true ;
167
+ spdlog::info (" Upload size: {}" , response->uploadSize );
168
+ spdlog::info (" Download size: {}" , response->downloadSize );
192
169
193
- // Error 429 Too Many Requests
194
- if (response->statusCode == 429 )
195
- {
196
- auto retryAfter = response->headers [" Retry-After" ];
197
- std::stringstream ss;
198
- ss << retryAfter;
199
- int seconds;
200
- ss >> seconds;
170
+ spdlog::info (" Status: {}" , response->statusCode );
171
+ if (response->errorCode != HttpErrorCode::Ok)
172
+ {
173
+ spdlog::info (" error message: {}" , response->errorMsg );
174
+ }
201
175
202
- if (!ss. eof () || ss. fail () )
176
+ if (response-> headers [ " Content-Type " ] != " application/octet-stream " )
203
177
{
204
- seconds = 30 ;
205
- spdlog::warn (" Error parsing Retry-After header. "
206
- " Using {} for the sleep duration" ,
207
- seconds);
178
+ spdlog::info (" payload: {}" , response->payload );
208
179
}
180
+ }
209
181
210
- spdlog::warn (" Error 429 - Too Many Requests. ws will sleep "
211
- " and retry after {} seconds" ,
212
- retryAfter);
182
+ if (response->statusCode != 200 )
183
+ {
184
+ spdlog::error (" Error sending data to sentry: {}" , response->statusCode );
185
+ spdlog::error (" Body: {}" , ret.second );
186
+ spdlog::error (" Response: {}" , response->payload );
187
+ errorSending = true ;
213
188
214
- throttled = true ;
215
- auto duration = std::chrono::seconds (seconds);
216
- std::this_thread::sleep_for (duration);
217
- throttled = false ;
189
+ // Error 429 Too Many Requests
190
+ if (response->statusCode == 429 )
191
+ {
192
+ auto retryAfter = response->headers [" Retry-After" ];
193
+ std::stringstream ss;
194
+ ss << retryAfter;
195
+ int seconds;
196
+ ss >> seconds;
197
+
198
+ if (!ss.eof () || ss.fail ())
199
+ {
200
+ seconds = 30 ;
201
+ spdlog::warn (" Error parsing Retry-After header. "
202
+ " Using {} for the sleep duration" ,
203
+ seconds);
204
+ }
205
+
206
+ spdlog::warn (" Error 429 - Too Many Requests. ws will sleep "
207
+ " and retry after {} seconds" ,
208
+ retryAfter);
209
+
210
+ throttled = true ;
211
+ auto duration = std::chrono::seconds (seconds);
212
+ std::this_thread::sleep_for (duration);
213
+ throttled = false ;
214
+ }
215
+ }
216
+ else
217
+ {
218
+ ++sentCount;
218
219
}
219
- }
220
- else
221
- {
222
- ++sentCount;
223
- }
224
220
225
- if (stop) return ;
226
- }
227
- };
221
+ if (stop) return ;
222
+ }
223
+ };
228
224
229
225
// Create a thread pool
230
226
spdlog::info (" Starting {} sentry sender jobs" , jobs);
@@ -241,12 +237,11 @@ namespace ix
241
237
verbose,
242
238
&throttled,
243
239
&receivedCount,
244
- &queueManager](
245
- ix::CobraConnectionEventType eventType,
246
- const std::string& errMsg,
247
- const ix::WebSocketHttpHeaders& headers,
248
- const std::string& subscriptionId,
249
- CobraConnection::MsgId msgId) {
240
+ &queueManager](ix::CobraConnectionEventType eventType,
241
+ const std::string& errMsg,
242
+ const ix::WebSocketHttpHeaders& headers,
243
+ const std::string& subscriptionId,
244
+ CobraConnection::MsgId msgId) {
250
245
if (eventType == ix::CobraConnection_EventType_Open)
251
246
{
252
247
spdlog::info (" Subscriber connected" );
@@ -265,11 +260,8 @@ namespace ix
265
260
spdlog::info (" Subscriber authenticated" );
266
261
conn.subscribe (channel,
267
262
filter,
268
- [&jsonWriter,
269
- verbose,
270
- &throttled,
271
- &receivedCount,
272
- &queueManager](const Json::Value& msg) {
263
+ [&jsonWriter, verbose, &throttled, &receivedCount, &queueManager](
264
+ const Json::Value& msg) {
273
265
if (verbose)
274
266
{
275
267
spdlog::info (jsonWriter.write (msg));
0 commit comments