Skip to content

Commit

Permalink
Flow Metrics Enrichment (#464)
Browse files Browse the repository at this point in the history
  • Loading branch information
Leonardo Parente authored Sep 22, 2022
1 parent 1244d92 commit eba8cfe
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 21 deletions.
9 changes: 4 additions & 5 deletions golang/pkg/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ type FlowPayload struct {
Ipv4 int64 `mapstructure:"ipv4"`
Ipv6 int64 `mapstructure:"ipv6"`
OtherL4 int64 `mapstructure:"other_l4"`
PayloadSize Quantiles `mapstructure:"payload_size"`
TCP int64 `mapstructure:"tcp"`
UDP int64 `mapstructure:"udp"`
TopGeoLocBytes []NameCount `mapstructure:"top_geoLoc_bytes"`
Expand All @@ -201,10 +200,10 @@ type FlowPayload struct {
TopDstIpsPackets []NameCount `mapstructure:"top_dst_ips_packets"`
TopDstPortsBytes []NameCount `mapstructure:"top_dst_ports_bytes"`
TopDstPortsPackets []NameCount `mapstructure:"top_dst_ports_packets"`
TopInIfIndexBytes []NameCount `mapstructure:"top_in_if_index_bytes"`
TopInIfIndexPackets []NameCount `mapstructure:"top_in_if_index_packets"`
TopOutIfIndexBytes []NameCount `mapstructure:"top_out_if_index_bytes"`
TopOutIfIndexPackets []NameCount `mapstructure:"top_out_if_index_packets"`
TopInInterfacesBytes []NameCount `mapstructure:"top_in_interfaces_bytes"`
TopInInterfacesPackets []NameCount `mapstructure:"top_in_interfaces_packets"`
TopOutInterfacesBytes []NameCount `mapstructure:"top_out_interfaces_bytes"`
TopOutInterfacesPackets []NameCount `mapstructure:"top_out_interfaces_packets"`
TopSrcIpsAndPortBytes []NameCount `mapstructure:"top_src_ips_and_port_bytes"`
TopSrcIpsAndPortPackets []NameCount `mapstructure:"top_src_ips_and_port_packets"`
TopConversationsBytes []NameCount `mapstructure:"top_conversations_bytes"`
Expand Down
139 changes: 129 additions & 10 deletions src/handlers/flow/FlowStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,24 @@

namespace visor::handler::flow {

template <typename Out>
static void split(const std::string &s, char delim, Out result)
{
std::stringstream ss;
ss.str(s);
std::string item;
while (std::getline(ss, item, delim)) {
*(result++) = item;
}
}

static std::vector<std::string> split(const std::string &s, char delim)
{
std::vector<std::string> elems;
split(s, delim, std::back_inserter(elems));
return elems;
}

FlowStreamHandler::FlowStreamHandler(const std::string &name, InputEventProxy *proxy, const Configurable *window_config)
: visor::StreamMetricsHandler<FlowMetricsManager>(name, window_config)
, _sample_rate_scaling(true)
Expand Down Expand Up @@ -47,6 +65,54 @@ void FlowStreamHandler::start()

process_groups(_group_defs);

// Setup Configs
if (config_exists("recorded_stream")) {
_metrics->set_recorded_stream();
}

EnrichMap enrich_data;
if (config_exists("device_map")) {
for (const auto &device_info : config_get<StringList>("device_map")) {
std::vector<std::string> data = split(device_info, ',');
if (data.size() < 2) {
// should at least contain device name and ip
continue;
}
DeviceEnrich *device{nullptr};
if (auto it = enrich_data.find(data[1]); it != enrich_data.end()) {
device = &it->second;
} else {
enrich_data[data[1]] = DeviceEnrich{data[0], {}};
device = &enrich_data[data[1]];
}
if (data.size() < 4) {
// should have interface information
continue;
}
auto if_index = static_cast<uint32_t>(std::stol(data[3]));
if (auto it = device->interfaces.find(if_index); it == device->interfaces.end()) {
if (data.size() > 4) {
device->interfaces[if_index] = InterfaceEnrich{data[2], data[4]};
} else {
device->interfaces[if_index] = InterfaceEnrich{data[2], std::string()};
}
}
}
}

std::unordered_map<std::string, std::string> concat_if;
if (config_exists("first_filter_if_as_label") && config_get<bool>("first_filter_if_as_label") && config_exists("only_interfaces")) {
concat_if["default"] = config_get<StringList>("only_interfaces")[0];
auto interface = static_cast<uint32_t>(std::stoul(config_get<StringList>("only_interfaces")[0]));
for (const auto &data : enrich_data) {
auto it = data.second.interfaces.find(interface);
if (it != data.second.interfaces.end()) {
concat_if[data.first] = it->second.name;
}
}
}
_metrics->set_enrich_data(std::move(concat_if), std::move(enrich_data));

// Setup Filters
if (config_exists("only_ips")) {
_parse_host_specs(config_get<StringList>("only_ips"));
Expand Down Expand Up @@ -80,10 +146,6 @@ void FlowStreamHandler::start()
_sample_rate_scaling = false;
}

if (config_exists("recorded_stream")) {
_metrics->set_recorded_stream();
}

if (_flow_proxy) {
_sflow_connection = _flow_proxy->sflow_signal.connect(&FlowStreamHandler::process_sflow_cb, this);
_netflow_connection = _flow_proxy->netflow_signal.connect(&FlowStreamHandler::process_netflow_cb, this);
Expand Down Expand Up @@ -471,7 +533,6 @@ void FlowMetricsBucket::specialized_merge(const AbstractMetricsBucket &o, Metric

for (const auto &device : other._devices_metrics) {
const auto &deviceId = device.first;
const auto &device_data = device.second;

if (group_enabled(group::FlowMetrics::Counters)) {
_devices_metrics[deviceId]->counters.UDP += device.second->counters.UDP;
Expand Down Expand Up @@ -542,7 +603,22 @@ void FlowMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap a

for (const auto &device : _devices_metrics) {
auto device_labels = add_labels;
device_labels["device"] = device.first;
auto deviceId = device.first;
DeviceEnrich *dev{nullptr};
if (_enrich_data) {
if (auto it = _enrich_data->find(deviceId); it != _enrich_data->end()) {
dev = &it->second;
deviceId = it->second.name;
}
}
device_labels["device"] = deviceId;
if (_concat_if) {
if (auto it = _concat_if->find(device.first); (it != _concat_if->end()) && !it->second.empty()) {
device_labels["device_interface"] = deviceId + "|" + it->second;
} else {
device_labels["device_interface"] = deviceId + "|" + _concat_if->at("default");
}
}

if (group_enabled(group::FlowMetrics::Counters)) {
device.second->counters.UDP.to_prometheus(out, device_labels);
Expand Down Expand Up @@ -574,8 +650,22 @@ void FlowMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap a
if (group_enabled(group::FlowMetrics::Conversations)) {
device.second->topByBytes.topConversations.to_prometheus(out, device_labels);
}
device.second->topByBytes.topInIfIndex.to_prometheus(out, device_labels, [](const uint32_t &val) { return std::to_string(val); });
device.second->topByBytes.topOutIfIndex.to_prometheus(out, device_labels, [](const uint32_t &val) { return std::to_string(val); });
device.second->topByBytes.topInIfIndex.to_prometheus(out, device_labels, [dev](const uint32_t &val) {
if (dev) {
if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) {
return it->second.name;
}
}
return std::to_string(val);
});
device.second->topByBytes.topOutIfIndex.to_prometheus(out, device_labels, [dev](const uint32_t &val) {
if (dev) {
if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) {
return it->second.name;
}
}
return std::to_string(val);
});
if (group_enabled(group::FlowMetrics::TopGeo)) {
device.second->topByBytes.topGeoLoc.to_prometheus(out, device_labels);
device.second->topByBytes.topASN.to_prometheus(out, device_labels);
Expand Down Expand Up @@ -613,6 +703,21 @@ void FlowMetricsBucket::to_json(json &j) const

for (const auto &device : _devices_metrics) {
auto deviceId = device.first;
DeviceEnrich *dev{nullptr};
if (_enrich_data) {
auto it = _enrich_data->find(deviceId);
if (it != _enrich_data->end()) {
dev = &it->second;
deviceId = it->second.name;
}
}
if (_concat_if) {
if (auto it = _concat_if->find(device.first); (it != _concat_if->end()) && !it->second.empty()) {
deviceId += "|" + it->second;
} else {
deviceId += "|" + _concat_if->at("default");
}
}

if (group_enabled(group::FlowMetrics::Counters)) {
device.second->counters.UDP.to_json(j["devices"][deviceId]);
Expand Down Expand Up @@ -644,8 +749,22 @@ void FlowMetricsBucket::to_json(json &j) const
if (group_enabled(group::FlowMetrics::Conversations)) {
device.second->topByBytes.topConversations.to_json(j["devices"][deviceId]);
}
device.second->topByBytes.topInIfIndex.to_json(j["devices"][deviceId], [](const uint32_t &val) { return std::to_string(val); });
device.second->topByBytes.topOutIfIndex.to_json(j["devices"][deviceId], [](const uint32_t &val) { return std::to_string(val); });
device.second->topByBytes.topInIfIndex.to_json(j["devices"][deviceId], [dev](const uint32_t &val) {
if (dev) {
if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) {
return it->second.name;
}
}
return std::to_string(val);
});
device.second->topByBytes.topOutIfIndex.to_json(j["devices"][deviceId], [dev](const uint32_t &val) {
if (dev) {
if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) {
return it->second.name;
}
}
return std::to_string(val);
});
if (group_enabled(group::FlowMetrics::TopGeo)) {
device.second->topByBytes.topGeoLoc.to_json(j["devices"][deviceId]);
device.second->topByBytes.topASN.to_json(j["devices"][deviceId]);
Expand Down
55 changes: 49 additions & 6 deletions src/handlers/flow/FlowStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ enum FlowMetrics : visor::MetricGroupIntType {
};
}

struct InterfaceEnrich {
std::string name;
std::string descr;
};

struct DeviceEnrich {
std::string name;
std::unordered_map<uint32_t, InterfaceEnrich> interfaces;
};

typedef std::unordered_map<std::string, DeviceEnrich> EnrichMap;

struct FlowData {
bool is_ipv6;
IP_PROTOCOL l4;
Expand Down Expand Up @@ -83,8 +95,8 @@ struct FlowTopN {
, topSrcIPandPort(FLOW_SCHEMA, "ip_port", {"top_src_ips_and_port_" + metric}, "Top source IP addresses and port by " + metric)
, topDstIPandPort(FLOW_SCHEMA, "ip_port", {"top_dst_ips_and_port_" + metric}, "Top destination IP addresses and port by " + metric)
, topConversations(FLOW_SCHEMA, "conversations", {"top_conversations_" + metric}, "Top source IP addresses and port by " + metric)
, topInIfIndex(FLOW_SCHEMA, "index", {"top_in_if_index_" + metric}, "Top input interface indexes by " + metric)
, topOutIfIndex(FLOW_SCHEMA, "index", {"top_out_if_index_" + metric}, "Top output interface indexes by " + metric)
, topInIfIndex(FLOW_SCHEMA, "interface", {"top_in_interfaces_" + metric}, "Top input interfaces by " + metric)
, topOutIfIndex(FLOW_SCHEMA, "interface", {"top_out_interfaces_" + metric}, "Top output interfaces by " + metric)
, topGeoLoc(FLOW_SCHEMA, "geo_loc", {"top_geoLoc_" + metric}, "Top GeoIP locations by " + metric)
, topASN(FLOW_SCHEMA, "asn", {"top_ASN_" + metric}, "Top ASNs by IP by " + metric)
{
Expand Down Expand Up @@ -157,10 +169,10 @@ struct FlowDevice {

class FlowMetricsBucket final : public visor::AbstractMetricsBucket
{

protected:
mutable std::shared_mutex _mutex;

EnrichMap *_enrich_data{nullptr};
std::unordered_map<std::string, std::string> *_concat_if{nullptr};
struct counters {
Counter filtered;
Counter total;
Expand All @@ -173,8 +185,6 @@ class FlowMetricsBucket final : public visor::AbstractMetricsBucket
counters _counters;
size_t _topn_count{10};
uint64_t _topn_percentile_threshold{0};

using InterfacePair = std::pair<uint32_t, uint32_t>;
// <DeviceId, FlowDevice>
std::map<std::string, std::unique_ptr<FlowDevice>> _devices_metrics;

Expand Down Expand Up @@ -203,6 +213,12 @@ class FlowMetricsBucket final : public visor::AbstractMetricsBucket
_topn_percentile_threshold = percentile_threshold;
}

inline void set_enrich_data(std::unordered_map<std::string, std::string> *concat, EnrichMap *enrich_data)
{
_concat_if = concat;
_enrich_data = enrich_data;
}

inline void process_filtered(uint64_t filtered)
{
std::unique_lock lock(_mutex);
Expand All @@ -213,19 +229,46 @@ class FlowMetricsBucket final : public visor::AbstractMetricsBucket

class FlowMetricsManager final : public visor::AbstractMetricsManager<FlowMetricsBucket>
{
EnrichMap _enrich_data;
std::unordered_map<std::string, std::string> _concat_if;

public:
FlowMetricsManager(const Configurable *window_config)
: visor::AbstractMetricsManager<FlowMetricsBucket>(window_config)
{
}

inline void set_enrich_data(std::unordered_map<std::string, std::string> concat, EnrichMap enrich_data)
{
_concat_if = concat;
_enrich_data = enrich_data;
if (!_concat_if.empty() && !_enrich_data.empty()) {
live_bucket()->set_enrich_data(&_concat_if, &_enrich_data);
} else if (!_concat_if.empty()) {
live_bucket()->set_enrich_data(&_concat_if, nullptr);
} else if (!_concat_if.empty()) {
live_bucket()->set_enrich_data(nullptr, &_enrich_data);
}
}

inline void process_filtered(timespec stamp, uint64_t filtered)
{
// base event, no sample
new_event(stamp, false);
live_bucket()->process_filtered(filtered);
}
void process_flow(const FlowPacket &payload);

void on_period_shift([[maybe_unused]] timespec stamp, [[maybe_unused]] const FlowMetricsBucket *maybe_expiring_bucket) override
{
if (!_concat_if.empty() && !_enrich_data.empty()) {
live_bucket()->set_enrich_data(&_concat_if, &_enrich_data);
} else if (!_concat_if.empty()) {
live_bucket()->set_enrich_data(&_concat_if, nullptr);
} else if (!_concat_if.empty()) {
live_bucket()->set_enrich_data(nullptr, &_enrich_data);
}
}
};

class FlowStreamHandler final : public visor::StreamMetricsHandler<FlowMetricsManager>
Expand Down
35 changes: 35 additions & 0 deletions src/handlers/flow/tests/test_flows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,41 @@ TEST_CASE("Parse sflow stream", "[sflow][flow]")
CHECK(j["devices"]["192.168.0.13"]["top_src_ips_and_port_bytes"][0]["name"] == "10.4.1.2:57420");
}

TEST_CASE("Parse sflow with enrichment", "[sflow][flow]")
{
FlowInputStream stream{"sflow-test"};
stream.config_set("flow_type", "sflow");
stream.config_set("pcap_file", "tests/fixtures/ecmp.pcap");

visor::Config c;
auto stream_proxy = stream.add_event_proxy(c);
c.config_set<uint64_t>("num_periods", 1);
FlowStreamHandler flow_handler{"flow-test", stream_proxy, &c};
flow_handler.config_set<visor::Configurable::StringList>("device_map", {"route1,192.168.0.11,eth0,37,provide Y", "route2,192.168.0.12,eth3,4"});
flow_handler.config_set<visor::Configurable::StringList>("only_interfaces", {"37", "4", "52"});
flow_handler.config_set<bool>("first_filter_if_as_label", true);

flow_handler.start();
stream.start();
stream.stop();
flow_handler.stop();

auto counters = flow_handler.metrics()->bucket(0)->counters();
auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked();

// confirmed with wireshark
CHECK(event_data.num_events->value() == 9279);
CHECK(event_data.num_samples->value() == 9279);
CHECK(counters.filtered.value() == 8573);
CHECK(counters.total.value() == 44212);

nlohmann::json j;
flow_handler.metrics()->bucket(0)->to_json(j);
CHECK(j["devices"]["route1|eth0"]["top_in_interfaces_bytes"][0]["name"] == "eth0");
CHECK(j["devices"]["route2|37"]["top_in_interfaces_bytes"][0]["name"] == "eth3");
CHECK(j["devices"]["192.168.0.13|37"]["top_in_interfaces_bytes"][0]["name"] == "52");
}

TEST_CASE("Parse sflow stream without sampling", "[sflow][flow]")
{

Expand Down

0 comments on commit eba8cfe

Please sign in to comment.