From eba8cfe34b055d931ed0fb7f86319b77eed4f736 Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Thu, 22 Sep 2022 13:43:23 -0400 Subject: [PATCH] Flow Metrics Enrichment (#464) --- golang/pkg/client/types.go | 9 +- src/handlers/flow/FlowStreamHandler.cpp | 139 ++++++++++++++++++++++-- src/handlers/flow/FlowStreamHandler.h | 55 +++++++++- src/handlers/flow/tests/test_flows.cpp | 35 ++++++ 4 files changed, 217 insertions(+), 21 deletions(-) diff --git a/golang/pkg/client/types.go b/golang/pkg/client/types.go index 35837f0bd..390519da4 100644 --- a/golang/pkg/client/types.go +++ b/golang/pkg/client/types.go @@ -188,7 +188,6 @@ type FlowPayload struct { Ipv4 int64 `mapstructure:"ipv4"` Ipv6 int64 `mapstructure:"ipv6"` OtherL4 int64 `mapstructure:"other_l4"` - PayloadSize Quantiles `mapstructure:"payload_size"` TCP int64 `mapstructure:"tcp"` UDP int64 `mapstructure:"udp"` TopGeoLocBytes []NameCount `mapstructure:"top_geoLoc_bytes"` @@ -201,10 +200,10 @@ type FlowPayload struct { TopDstIpsPackets []NameCount `mapstructure:"top_dst_ips_packets"` TopDstPortsBytes []NameCount `mapstructure:"top_dst_ports_bytes"` TopDstPortsPackets []NameCount `mapstructure:"top_dst_ports_packets"` - TopInIfIndexBytes []NameCount `mapstructure:"top_in_if_index_bytes"` - TopInIfIndexPackets []NameCount `mapstructure:"top_in_if_index_packets"` - TopOutIfIndexBytes []NameCount `mapstructure:"top_out_if_index_bytes"` - TopOutIfIndexPackets []NameCount `mapstructure:"top_out_if_index_packets"` + TopInInterfacesBytes []NameCount `mapstructure:"top_in_interfaces_bytes"` + TopInInterfacesPackets []NameCount `mapstructure:"top_in_interfaces_packets"` + TopOutInterfacesBytes []NameCount `mapstructure:"top_out_interfaces_bytes"` + TopOutInterfacesPackets []NameCount `mapstructure:"top_out_interfaces_packets"` TopSrcIpsAndPortBytes []NameCount `mapstructure:"top_src_ips_and_port_bytes"` TopSrcIpsAndPortPackets []NameCount `mapstructure:"top_src_ips_and_port_packets"` TopConversationsBytes []NameCount `mapstructure:"top_conversations_bytes"` diff --git a/src/handlers/flow/FlowStreamHandler.cpp b/src/handlers/flow/FlowStreamHandler.cpp index aa2b56d5f..805359d2f 100644 --- a/src/handlers/flow/FlowStreamHandler.cpp +++ b/src/handlers/flow/FlowStreamHandler.cpp @@ -19,6 +19,24 @@ namespace visor::handler::flow { +template +static void split(const std::string &s, char delim, Out result) +{ + std::stringstream ss; + ss.str(s); + std::string item; + while (std::getline(ss, item, delim)) { + *(result++) = item; + } +} + +static std::vector split(const std::string &s, char delim) +{ + std::vector elems; + split(s, delim, std::back_inserter(elems)); + return elems; +} + FlowStreamHandler::FlowStreamHandler(const std::string &name, InputEventProxy *proxy, const Configurable *window_config) : visor::StreamMetricsHandler(name, window_config) , _sample_rate_scaling(true) @@ -47,6 +65,54 @@ void FlowStreamHandler::start() process_groups(_group_defs); + // Setup Configs + if (config_exists("recorded_stream")) { + _metrics->set_recorded_stream(); + } + + EnrichMap enrich_data; + if (config_exists("device_map")) { + for (const auto &device_info : config_get("device_map")) { + std::vector data = split(device_info, ','); + if (data.size() < 2) { + // should at least contain device name and ip + continue; + } + DeviceEnrich *device{nullptr}; + if (auto it = enrich_data.find(data[1]); it != enrich_data.end()) { + device = &it->second; + } else { + enrich_data[data[1]] = DeviceEnrich{data[0], {}}; + device = &enrich_data[data[1]]; + } + if (data.size() < 4) { + // should have interface information + continue; + } + auto if_index = static_cast(std::stol(data[3])); + if (auto it = device->interfaces.find(if_index); it == device->interfaces.end()) { + if (data.size() > 4) { + device->interfaces[if_index] = InterfaceEnrich{data[2], data[4]}; + } else { + device->interfaces[if_index] = InterfaceEnrich{data[2], std::string()}; + } + } + } + } + + std::unordered_map concat_if; + if (config_exists("first_filter_if_as_label") && config_get("first_filter_if_as_label") && config_exists("only_interfaces")) { + concat_if["default"] = config_get("only_interfaces")[0]; + auto interface = static_cast(std::stoul(config_get("only_interfaces")[0])); + for (const auto &data : enrich_data) { + auto it = data.second.interfaces.find(interface); + if (it != data.second.interfaces.end()) { + concat_if[data.first] = it->second.name; + } + } + } + _metrics->set_enrich_data(std::move(concat_if), std::move(enrich_data)); + // Setup Filters if (config_exists("only_ips")) { _parse_host_specs(config_get("only_ips")); @@ -80,10 +146,6 @@ void FlowStreamHandler::start() _sample_rate_scaling = false; } - if (config_exists("recorded_stream")) { - _metrics->set_recorded_stream(); - } - if (_flow_proxy) { _sflow_connection = _flow_proxy->sflow_signal.connect(&FlowStreamHandler::process_sflow_cb, this); _netflow_connection = _flow_proxy->netflow_signal.connect(&FlowStreamHandler::process_netflow_cb, this); @@ -471,7 +533,6 @@ void FlowMetricsBucket::specialized_merge(const AbstractMetricsBucket &o, Metric for (const auto &device : other._devices_metrics) { const auto &deviceId = device.first; - const auto &device_data = device.second; if (group_enabled(group::FlowMetrics::Counters)) { _devices_metrics[deviceId]->counters.UDP += device.second->counters.UDP; @@ -542,7 +603,22 @@ void FlowMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap a for (const auto &device : _devices_metrics) { auto device_labels = add_labels; - device_labels["device"] = device.first; + auto deviceId = device.first; + DeviceEnrich *dev{nullptr}; + if (_enrich_data) { + if (auto it = _enrich_data->find(deviceId); it != _enrich_data->end()) { + dev = &it->second; + deviceId = it->second.name; + } + } + device_labels["device"] = deviceId; + if (_concat_if) { + if (auto it = _concat_if->find(device.first); (it != _concat_if->end()) && !it->second.empty()) { + device_labels["device_interface"] = deviceId + "|" + it->second; + } else { + device_labels["device_interface"] = deviceId + "|" + _concat_if->at("default"); + } + } if (group_enabled(group::FlowMetrics::Counters)) { device.second->counters.UDP.to_prometheus(out, device_labels); @@ -574,8 +650,22 @@ void FlowMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap a if (group_enabled(group::FlowMetrics::Conversations)) { device.second->topByBytes.topConversations.to_prometheus(out, device_labels); } - device.second->topByBytes.topInIfIndex.to_prometheus(out, device_labels, [](const uint32_t &val) { return std::to_string(val); }); - device.second->topByBytes.topOutIfIndex.to_prometheus(out, device_labels, [](const uint32_t &val) { return std::to_string(val); }); + device.second->topByBytes.topInIfIndex.to_prometheus(out, device_labels, [dev](const uint32_t &val) { + if (dev) { + if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) { + return it->second.name; + } + } + return std::to_string(val); + }); + device.second->topByBytes.topOutIfIndex.to_prometheus(out, device_labels, [dev](const uint32_t &val) { + if (dev) { + if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) { + return it->second.name; + } + } + return std::to_string(val); + }); if (group_enabled(group::FlowMetrics::TopGeo)) { device.second->topByBytes.topGeoLoc.to_prometheus(out, device_labels); device.second->topByBytes.topASN.to_prometheus(out, device_labels); @@ -613,6 +703,21 @@ void FlowMetricsBucket::to_json(json &j) const for (const auto &device : _devices_metrics) { auto deviceId = device.first; + DeviceEnrich *dev{nullptr}; + if (_enrich_data) { + auto it = _enrich_data->find(deviceId); + if (it != _enrich_data->end()) { + dev = &it->second; + deviceId = it->second.name; + } + } + if (_concat_if) { + if (auto it = _concat_if->find(device.first); (it != _concat_if->end()) && !it->second.empty()) { + deviceId += "|" + it->second; + } else { + deviceId += "|" + _concat_if->at("default"); + } + } if (group_enabled(group::FlowMetrics::Counters)) { device.second->counters.UDP.to_json(j["devices"][deviceId]); @@ -644,8 +749,22 @@ void FlowMetricsBucket::to_json(json &j) const if (group_enabled(group::FlowMetrics::Conversations)) { device.second->topByBytes.topConversations.to_json(j["devices"][deviceId]); } - device.second->topByBytes.topInIfIndex.to_json(j["devices"][deviceId], [](const uint32_t &val) { return std::to_string(val); }); - device.second->topByBytes.topOutIfIndex.to_json(j["devices"][deviceId], [](const uint32_t &val) { return std::to_string(val); }); + device.second->topByBytes.topInIfIndex.to_json(j["devices"][deviceId], [dev](const uint32_t &val) { + if (dev) { + if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) { + return it->second.name; + } + } + return std::to_string(val); + }); + device.second->topByBytes.topOutIfIndex.to_json(j["devices"][deviceId], [dev](const uint32_t &val) { + if (dev) { + if (auto it = dev->interfaces.find(val); it != dev->interfaces.end()) { + return it->second.name; + } + } + return std::to_string(val); + }); if (group_enabled(group::FlowMetrics::TopGeo)) { device.second->topByBytes.topGeoLoc.to_json(j["devices"][deviceId]); device.second->topByBytes.topASN.to_json(j["devices"][deviceId]); diff --git a/src/handlers/flow/FlowStreamHandler.h b/src/handlers/flow/FlowStreamHandler.h index bc70fdf02..467e11d8c 100644 --- a/src/handlers/flow/FlowStreamHandler.h +++ b/src/handlers/flow/FlowStreamHandler.h @@ -34,6 +34,18 @@ enum FlowMetrics : visor::MetricGroupIntType { }; } +struct InterfaceEnrich { + std::string name; + std::string descr; +}; + +struct DeviceEnrich { + std::string name; + std::unordered_map interfaces; +}; + +typedef std::unordered_map EnrichMap; + struct FlowData { bool is_ipv6; IP_PROTOCOL l4; @@ -83,8 +95,8 @@ struct FlowTopN { , topSrcIPandPort(FLOW_SCHEMA, "ip_port", {"top_src_ips_and_port_" + metric}, "Top source IP addresses and port by " + metric) , topDstIPandPort(FLOW_SCHEMA, "ip_port", {"top_dst_ips_and_port_" + metric}, "Top destination IP addresses and port by " + metric) , topConversations(FLOW_SCHEMA, "conversations", {"top_conversations_" + metric}, "Top source IP addresses and port by " + metric) - , topInIfIndex(FLOW_SCHEMA, "index", {"top_in_if_index_" + metric}, "Top input interface indexes by " + metric) - , topOutIfIndex(FLOW_SCHEMA, "index", {"top_out_if_index_" + metric}, "Top output interface indexes by " + metric) + , topInIfIndex(FLOW_SCHEMA, "interface", {"top_in_interfaces_" + metric}, "Top input interfaces by " + metric) + , topOutIfIndex(FLOW_SCHEMA, "interface", {"top_out_interfaces_" + metric}, "Top output interfaces by " + metric) , topGeoLoc(FLOW_SCHEMA, "geo_loc", {"top_geoLoc_" + metric}, "Top GeoIP locations by " + metric) , topASN(FLOW_SCHEMA, "asn", {"top_ASN_" + metric}, "Top ASNs by IP by " + metric) { @@ -157,10 +169,10 @@ struct FlowDevice { class FlowMetricsBucket final : public visor::AbstractMetricsBucket { - protected: mutable std::shared_mutex _mutex; - + EnrichMap *_enrich_data{nullptr}; + std::unordered_map *_concat_if{nullptr}; struct counters { Counter filtered; Counter total; @@ -173,8 +185,6 @@ class FlowMetricsBucket final : public visor::AbstractMetricsBucket counters _counters; size_t _topn_count{10}; uint64_t _topn_percentile_threshold{0}; - - using InterfacePair = std::pair; // std::map> _devices_metrics; @@ -203,6 +213,12 @@ class FlowMetricsBucket final : public visor::AbstractMetricsBucket _topn_percentile_threshold = percentile_threshold; } + inline void set_enrich_data(std::unordered_map *concat, EnrichMap *enrich_data) + { + _concat_if = concat; + _enrich_data = enrich_data; + } + inline void process_filtered(uint64_t filtered) { std::unique_lock lock(_mutex); @@ -213,12 +229,28 @@ class FlowMetricsBucket final : public visor::AbstractMetricsBucket class FlowMetricsManager final : public visor::AbstractMetricsManager { + EnrichMap _enrich_data; + std::unordered_map _concat_if; + public: FlowMetricsManager(const Configurable *window_config) : visor::AbstractMetricsManager(window_config) { } + inline void set_enrich_data(std::unordered_map concat, EnrichMap enrich_data) + { + _concat_if = concat; + _enrich_data = enrich_data; + if (!_concat_if.empty() && !_enrich_data.empty()) { + live_bucket()->set_enrich_data(&_concat_if, &_enrich_data); + } else if (!_concat_if.empty()) { + live_bucket()->set_enrich_data(&_concat_if, nullptr); + } else if (!_concat_if.empty()) { + live_bucket()->set_enrich_data(nullptr, &_enrich_data); + } + } + inline void process_filtered(timespec stamp, uint64_t filtered) { // base event, no sample @@ -226,6 +258,17 @@ class FlowMetricsManager final : public visor::AbstractMetricsManagerprocess_filtered(filtered); } void process_flow(const FlowPacket &payload); + + void on_period_shift([[maybe_unused]] timespec stamp, [[maybe_unused]] const FlowMetricsBucket *maybe_expiring_bucket) override + { + if (!_concat_if.empty() && !_enrich_data.empty()) { + live_bucket()->set_enrich_data(&_concat_if, &_enrich_data); + } else if (!_concat_if.empty()) { + live_bucket()->set_enrich_data(&_concat_if, nullptr); + } else if (!_concat_if.empty()) { + live_bucket()->set_enrich_data(nullptr, &_enrich_data); + } + } }; class FlowStreamHandler final : public visor::StreamMetricsHandler diff --git a/src/handlers/flow/tests/test_flows.cpp b/src/handlers/flow/tests/test_flows.cpp index 99d0e4c89..7f17651a2 100644 --- a/src/handlers/flow/tests/test_flows.cpp +++ b/src/handlers/flow/tests/test_flows.cpp @@ -49,6 +49,41 @@ TEST_CASE("Parse sflow stream", "[sflow][flow]") CHECK(j["devices"]["192.168.0.13"]["top_src_ips_and_port_bytes"][0]["name"] == "10.4.1.2:57420"); } +TEST_CASE("Parse sflow with enrichment", "[sflow][flow]") +{ + FlowInputStream stream{"sflow-test"}; + stream.config_set("flow_type", "sflow"); + stream.config_set("pcap_file", "tests/fixtures/ecmp.pcap"); + + visor::Config c; + auto stream_proxy = stream.add_event_proxy(c); + c.config_set("num_periods", 1); + FlowStreamHandler flow_handler{"flow-test", stream_proxy, &c}; + flow_handler.config_set("device_map", {"route1,192.168.0.11,eth0,37,provide Y", "route2,192.168.0.12,eth3,4"}); + flow_handler.config_set("only_interfaces", {"37", "4", "52"}); + flow_handler.config_set("first_filter_if_as_label", true); + + flow_handler.start(); + stream.start(); + stream.stop(); + flow_handler.stop(); + + auto counters = flow_handler.metrics()->bucket(0)->counters(); + auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked(); + + // confirmed with wireshark + CHECK(event_data.num_events->value() == 9279); + CHECK(event_data.num_samples->value() == 9279); + CHECK(counters.filtered.value() == 8573); + CHECK(counters.total.value() == 44212); + + nlohmann::json j; + flow_handler.metrics()->bucket(0)->to_json(j); + CHECK(j["devices"]["route1|eth0"]["top_in_interfaces_bytes"][0]["name"] == "eth0"); + CHECK(j["devices"]["route2|37"]["top_in_interfaces_bytes"][0]["name"] == "eth3"); + CHECK(j["devices"]["192.168.0.13|37"]["top_in_interfaces_bytes"][0]["name"] == "52"); +} + TEST_CASE("Parse sflow stream without sampling", "[sflow][flow]") {