Skip to content

Commit 5896d37

Browse files
committed
(ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel
1 parent 73b9c0b commit 5896d37

File tree

7 files changed

+96
-2
lines changed

7 files changed

+96
-2
lines changed

docs/CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
All changes to this project will be documented in this file.
44

5+
## [10.3.2] - 2020-08-31
6+
7+
(ws + cobra bots) add a cobra_to_cobra ws subcommand to subscribe to a channel and republish received events to a different channel
8+
59
## [10.3.1] - 2020-08-28
610

711
(socket servers) merge the ConnectionInfo class with the ConnectionState one, which simplify all the server apis

ixbots/CMakeLists.txt

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
set (IXBOTS_SOURCES
77
ixbots/IXCobraBot.cpp
8+
ixbots/IXCobraToCobraBot.cpp
89
ixbots/IXCobraToSentryBot.cpp
910
ixbots/IXCobraToStatsdBot.cpp
1011
ixbots/IXCobraToStdoutBot.cpp
@@ -16,6 +17,7 @@ set (IXBOTS_SOURCES
1617
set (IXBOTS_HEADERS
1718
ixbots/IXCobraBot.h
1819
ixbots/IXCobraBotConfig.h
20+
ixbots/IXCobraToCobraBot.h
1921
ixbots/IXCobraToSentryBot.h
2022
ixbots/IXCobraToStatsdBot.h
2123
ixbots/IXCobraToStdoutBot.h

ixbots/ixbots/IXCobraToCobraBot.cpp

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* IXCobraToCobraBot.cpp
3+
* Author: Benjamin Sergeant
4+
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
5+
*/
6+
7+
#include "IXCobraToCobraBot.h"
8+
9+
#include "IXCobraBot.h"
10+
#include <ixcobra/IXCobraMetricsPublisher.h>
11+
#include <sstream>
12+
13+
namespace ix
14+
{
15+
int64_t cobra_to_cobra_bot(const ix::CobraBotConfig& cobraBotConfig,
16+
const std::string& republishChannel,
17+
const std::string& publisherRolename,
18+
const std::string& publisherRolesecret)
19+
{
20+
CobraBot bot;
21+
22+
CobraMetricsPublisher cobraMetricsPublisher;
23+
CobraConfig cobraPublisherConfig = cobraBotConfig.cobraConfig;
24+
cobraPublisherConfig.rolename = publisherRolename;
25+
cobraPublisherConfig.rolesecret = publisherRolesecret;
26+
cobraMetricsPublisher.configure(cobraPublisherConfig, republishChannel);
27+
28+
bot.setOnBotMessageCallback(
29+
[&republishChannel, &cobraMetricsPublisher](const Json::Value& msg,
30+
const std::string& /*position*/,
31+
std::atomic<bool>& /*throttled*/,
32+
std::atomic<bool>& /*fatalCobraError*/,
33+
std::atomic<uint64_t>& sentCount) -> void {
34+
Json::Value msgWithNoId(msg);
35+
msgWithNoId.removeMember("id");
36+
37+
cobraMetricsPublisher.push(republishChannel, msg);
38+
sentCount++;
39+
});
40+
41+
return bot.run(cobraBotConfig);
42+
}
43+
} // namespace ix

ixbots/ixbots/IXCobraToCobraBot.h

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* IXCobraToCobraBot.h
3+
* Author: Benjamin Sergeant
4+
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
5+
*/
6+
#pragma once
7+
8+
#include <cstdint>
9+
#include <ixbots/IXStatsdClient.h>
10+
#include "IXCobraBotConfig.h"
11+
#include <stddef.h>
12+
#include <string>
13+
14+
namespace ix
15+
{
16+
int64_t cobra_to_cobra_bot(const ix::CobraBotConfig& config,
17+
const std::string& republishChannel,
18+
const std::string& publisherRolename,
19+
const std::string& publisherRolesecret);
20+
} // namespace ix

ixcobra/ixcobra/IXCobraMetricsThreadedPublisher.cpp

+7-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ namespace ix
2424
{
2525
_cobra_connection.setEventCallback([](const CobraEventPtr& event) {
2626
std::stringstream ss;
27+
ix::LogLevel logLevel = LogLevel::Info;
2728

2829
if (event->type == ix::CobraEventType::Open)
2930
{
@@ -41,6 +42,7 @@ namespace ix
4142
else if (event->type == ix::CobraEventType::Error)
4243
{
4344
ss << "Error: " << event->errMsg;
45+
logLevel = ix::LogLevel::Error;
4446
}
4547
else if (event->type == ix::CobraEventType::Closed)
4648
{
@@ -57,6 +59,7 @@ namespace ix
5759
else if (event->type == ix::CobraEventType::Published)
5860
{
5961
ss << "Published message " << event->msgId << " acked";
62+
logLevel = ix::LogLevel::Debug;
6063
}
6164
else if (event->type == ix::CobraEventType::Pong)
6265
{
@@ -65,17 +68,20 @@ namespace ix
6568
else if (event->type == ix::CobraEventType::HandshakeError)
6669
{
6770
ss << "Handshake error: " << event->errMsg;
71+
logLevel = ix::LogLevel::Error;
6872
}
6973
else if (event->type == ix::CobraEventType::AuthenticationError)
7074
{
7175
ss << "Authentication error: " << event->errMsg;
76+
logLevel = ix::LogLevel::Error;
7277
}
7378
else if (event->type == ix::CobraEventType::SubscriptionError)
7479
{
7580
ss << "Subscription error: " << event->errMsg;
81+
logLevel = ix::LogLevel::Error;
7682
}
7783

78-
CoreLogger::log(ss.str().c_str());
84+
CoreLogger::log(ss.str().c_str(), logLevel);
7985
});
8086
}
8187

ixwebsocket/IXWebSocketVersion.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66

77
#pragma once
88

9-
#define IX_WEBSOCKET_VERSION "10.3.1"
9+
#define IX_WEBSOCKET_VERSION "10.3.2"

ws/ws.cpp

+19
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <fstream>
1919
#include <iostream>
2020
#include <ixbots/IXCobraMetricsToRedisBot.h>
21+
#include <ixbots/IXCobraToCobraBot.h>
2122
#include <ixbots/IXCobraToPythonBot.h>
2223
#include <ixbots/IXCobraToSentryBot.h>
2324
#include <ixbots/IXCobraToStatsdBot.h>
@@ -2813,6 +2814,8 @@ int main(int argc, char** argv)
28132814
std::string logfile;
28142815
std::string moduleName;
28152816
std::string republishChannel;
2817+
std::string publisherRolename;
2818+
std::string publisherRolesecret;
28162819
std::string sendMsg("hello world");
28172820
ix::SocketTLSOptions tlsOptions;
28182821
ix::CobraConfig cobraConfig;
@@ -3077,6 +3080,17 @@ int main(int argc, char** argv)
30773080
addTLSOptions(cobra2statsd);
30783081
addCobraBotConfig(cobra2statsd);
30793082

3083+
CLI::App* cobra2cobra = app.add_subcommand("cobra_to_cobra", "Cobra to Cobra");
3084+
cobra2cobra->fallthrough();
3085+
cobra2cobra->add_option("--republish", republishChannel, "Republish channel");
3086+
cobra2cobra->add_option("--publisher_rolename", publisherRolename, "Publisher Role name")
3087+
->required();
3088+
cobra2cobra->add_option("--publisher_rolesecret", publisherRolesecret, "Publisher Role secret")
3089+
->required();
3090+
cobra2cobra->add_flag("-q", quiet, "Quiet");
3091+
addTLSOptions(cobra2cobra);
3092+
addCobraBotConfig(cobra2cobra);
3093+
30803094
CLI::App* cobra2python = app.add_subcommand("cobra_to_python", "Cobra to python");
30813095
cobra2python->fallthrough();
30823096
cobra2python->add_option("--host", hostname, "Statsd host");
@@ -3408,6 +3422,11 @@ int main(int argc, char** argv)
34083422
ret = (int) ix::cobra_metrics_to_redis_bot(cobraBotConfig, redisClient, verbose);
34093423
}
34103424
}
3425+
else if (app.got_subcommand("cobra_to_cobra"))
3426+
{
3427+
ret = (int) ix::cobra_to_cobra_bot(
3428+
cobraBotConfig, republishChannel, publisherRolename, publisherRolesecret);
3429+
}
34113430
else if (app.got_subcommand("snake"))
34123431
{
34133432
ret = ix::ws_snake_main(port,

0 commit comments

Comments
 (0)