From 468893d3122815813de9807d8fd5ee0d0d31dcd5 Mon Sep 17 00:00:00 2001 From: max197616 Date: Fri, 15 Jun 2018 12:01:34 +0300 Subject: [PATCH] Version 0.97 --- configure.ac | 2 +- include/dtypes.h | 6 ++ include/flow.h | 5 ++ include/http.h | 2 + include/main.h | 4 +- include/params.h | 4 ++ include/sender.h | 8 ++- include/stats.h | 11 +++- include/worker.h | 35 ++++++++-- src/cmdlinetask.cpp | 12 +++- src/http.cpp | 27 +++----- src/main.cpp | 149 ++++++++++++++++++++++-------------------- src/sender.cpp | 97 ++++++++++++++------------- src/statistictask.cpp | 8 ++- src/tries.cpp | 35 ++++------ src/worker.cpp | 41 ++++++------ 16 files changed, 247 insertions(+), 199 deletions(-) diff --git a/configure.ac b/configure.ac index 75bcc51..86be8ec 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. AC_PREREQ([2.69]) -AC_INIT(extFilter, 0.95, max1976@mail.ru) +AC_INIT(extFilter, 0.97, max1976@mail.ru) DPDK_HOME= DPDK_TARGET= diff --git a/include/dtypes.h b/include/dtypes.h index 5bae49f..3126256 100644 --- a/include/dtypes.h +++ b/include/dtypes.h @@ -26,6 +26,12 @@ enum port_types P_TYPE_SENDER }; +enum operation_modes +{ + OP_MODE_MIRROR, + OP_MODE_INLINE +}; + struct rte_mempool; struct pool_holder_t diff --git a/include/flow.h b/include/flow.h index fa6154d..52bfb6d 100644 --- a/include/flow.h +++ b/include/flow.h @@ -36,6 +36,7 @@ #include "params.h" #include "arr.h" +#include "dtypes.h" //#define _SIMPLE_HASH 1 @@ -77,6 +78,10 @@ struct flow_base_t free(infos.tracking.ssl_information[0].pkt_buffer); if(infos.tracking.ssl_information[1].pkt_buffer != nullptr) free(infos.tracking.ssl_information[1].pkt_buffer); + if(infos.tracking.ssl_information[0].mempool != nullptr) + rte_mempool_put(((struct pool_holder_t*)infos.tracking.ssl_information[0].mempool)->mempool, infos.tracking.ssl_information[0].mempool); + if(infos.tracking.ssl_information[1].mempool != nullptr) + rte_mempool_put(((struct pool_holder_t*)infos.tracking.ssl_information[1].mempool)->mempool, infos.tracking.ssl_information[1].mempool); infos.tracking.flow_specific_user_data = nullptr; infos.tracking.http_informations[0].temp_buffer = nullptr; infos.tracking.http_informations[1].temp_buffer = nullptr; diff --git a/include/http.h b/include/http.h index a1de9a7..239ae0b 100644 --- a/include/http.h +++ b/include/http.h @@ -20,6 +20,8 @@ #pragma once #include +#include +#include #include #include "dtypes.h" diff --git a/include/main.h b/include/main.h index b174ea1..e0e3d03 100644 --- a/include/main.h +++ b/include/main.h @@ -47,6 +47,7 @@ struct lcore_params { uint8_t port_type; uint8_t queue_id; uint8_t lcore_id; + uint8_t mapto; } __rte_cache_aligned; struct lcore_rx_queue { @@ -228,8 +229,6 @@ class extFilter: public Poco::Util::ServerApplication static uint64_t _tsc_hz; - int _num_of_senders; - int _numa_on; uint32_t _enabled_port_mask; @@ -255,6 +254,7 @@ class extFilter: public Poco::Util::ServerApplication Poco::Net::IPAddress _cmdline_ip; uint8_t _dpdk_send_port; TriesManager _tries; + operation_modes _operation_mode; }; diff --git a/include/params.h b/include/params.h index faaad09..dcee8fd 100644 --- a/include/params.h +++ b/include/params.h @@ -19,6 +19,9 @@ #pragma once +#include +#include + class FlowStorage; struct rte_mempool; class NotifyManager; @@ -45,6 +48,7 @@ struct memory_configs_t memory_config_t ipv4; memory_config_t ipv6; uint32_t http_entries; + uint32_t ssl_entries; }; diff --git a/include/sender.h b/include/sender.h index 73878a3..b91c288 100644 --- a/include/sender.h +++ b/include/sender.h @@ -321,6 +321,8 @@ class ESender : public BSender struct params params; uint8_t *mac; uint8_t *to_mac; + int answer_duplication; + struct rte_mempool *clone_pool; }; ESender(struct nparams ¶ms, uint8_t port, struct rte_mempool *mp, WorkerThread *wt); ~ESender(); @@ -332,8 +334,8 @@ class ESender : public BSender return size; } - void sendPacketIPv4(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh); - void sendPacketIPv6(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh); + void sendPacketIPv4(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server = false); + void sendPacketIPv6(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server = false); void HTTPRedirectIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh, const char *redir_url, size_t r_len); void HTTPRedirectIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh, const char *redir_url, size_t r_len); void SendRSTIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum); @@ -345,7 +347,9 @@ class ESender : public BSender uint8_t _port; struct ether_hdr _eth_hdr; struct rte_mempool *_mp; + struct rte_mempool *_clone_pool; WorkerThread *_wt; + int _answer_duplication; }; #endif diff --git a/include/stats.h b/include/stats.h index 1bd8717..dbb45dd 100644 --- a/include/stats.h +++ b/include/stats.h @@ -19,12 +19,17 @@ #pragma once +#include +#include + struct LatencyCounters { uint64_t total_cycles; uint64_t total_pkts; uint64_t blocked_cycles; + uint64_t unblocked_cycles; uint64_t blocked_pkts; + uint64_t unblocked_pkts; }; struct ThreadStats @@ -45,6 +50,10 @@ struct ThreadStats uint64_t reassembled_flows; struct LatencyCounters latency_counters; uint64_t dpi_no_mempool_http; + uint64_t dpi_no_mempool_ssl; + uint64_t dpi_ssl_partial_packets; + uint64_t dpi_alloc_ssl; + uint64_t dpi_alloc_http; uint64_t ssl_packets; uint64_t http_packets; @@ -91,5 +100,5 @@ struct ThreadStats { memset(this, 0, sizeof(ThreadStats)); } -}; +} __rte_cache_aligned;; diff --git a/include/worker.h b/include/worker.h index 31dc9aa..8a479d6 100644 --- a/include/worker.h +++ b/include/worker.h @@ -34,11 +34,7 @@ #include "dpdk.h" #include "sender.h" #include "http.h" - -//#define EXTF_GC_INTERVAL 1000 // us -//#define EXTF_ALL_GC_INTERVAL 1 // seconds - -//#define EXT_DPI_FLOW_TABLE_MAX_IDLE_TIME 30 /** In seconds. **/ +#include "ssl.h" #define EXTFILTER_CAPTURE_BURST_SIZE 32 #define EXTFILTER_WORKER_BURST_SIZE 32 @@ -98,13 +94,15 @@ class WorkerThread : public DpdkWorkerThread struct rte_mbuf* _sender_buf[EXTFILTER_WORKER_BURST_SIZE]; ESender *_snd; struct rte_mempool *_dpi_http_mempool; + struct rte_mempool *_dpi_ssl_mempool; + struct rte_mempool *_pkt_info_mempool; uint8_t _worker_id; uint32_t ipv4_flow_mask; uint32_t ipv6_flow_mask; public: - WorkerThread(uint8_t worker_id, const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, int socketid, struct ESender::nparams &sp, struct rte_mempool *mp, struct rte_mempool *dpi_http_mempool); + WorkerThread(uint8_t worker_id, const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, int socketid, struct ESender::nparams &sp, struct rte_mempool *mp, struct rte_mempool *dpi_http_mempool, struct rte_mempool *dpi_ssl_mempool); ~WorkerThread(); bool checkURLBlocked(const char *host, size_t host_len, const char *uri, size_t uri_len, dpi_pkt_infos_t* pkt); @@ -159,9 +157,34 @@ class WorkerThread : public DpdkWorkerThread } res->init(); res->mempool = _dpi_http_mempool; + m_ThreadStats.dpi_alloc_http++; + return res; + } + + inline struct rte_mempool *getHTTPMempool() + { + return _dpi_http_mempool; + } + + inline struct ssl_state *allocateSSLState() + { + struct ssl_state *res; + if(rte_mempool_get(_dpi_ssl_mempool, (void **)&res) != 0) + { + _logger.error("Unable to allocate memory for the ssl buffer"); + return nullptr; + } + res->init(); + res->mempool = _dpi_ssl_mempool; + m_ThreadStats.dpi_alloc_ssl++; return res; } + inline struct rte_mempool *getSSLMempool() + { + return _dpi_ssl_mempool; + } + inline uint8_t getWorkerID() { return _worker_id; diff --git a/src/cmdlinetask.cpp b/src/cmdlinetask.cpp index f41b6e1..7a9af02 100644 --- a/src/cmdlinetask.cpp +++ b/src/cmdlinetask.cpp @@ -372,10 +372,15 @@ static void display_worker_stats(struct cmdline* cl,const ThreadStats &stats) cmdline_printf(cl, " Total bytes: %" PRIu64 "\n", stats.total_bytes); cmdline_printf(cl, " HTTP packets: %" PRIu64 "\n", stats.http_packets); cmdline_printf(cl, " SSL/TLS packets: %" PRIu64 "\n", stats.ssl_packets); + cmdline_printf(cl, " SSL/TLS partial packets: %" PRIu64 "\n", stats.dpi_ssl_partial_packets); - cmdline_printf(cl, " SSL/TLS max packet size: %" PRIu32 "\n", ssl_max_packet_size); - cmdline_printf(cl, " SSL/TLS mallocs: %" PRIu64 "\n", ssl_mallocs); - cmdline_printf(cl, " SSL/TLS reallocs: %" PRIu64 "\n", ssl_reallocs); + cmdline_printf(cl, " Allocs:\n"); + cmdline_printf(cl, " HTTP: %" PRIu64 "\n", stats.dpi_alloc_http); + cmdline_printf(cl, " SSL: %" PRIu64 "\n", stats.dpi_alloc_ssl); + +// cmdline_printf(cl, " SSL/TLS max packet size: %" PRIu32 "\n", ssl_max_packet_size); +// cmdline_printf(cl, " SSL/TLS mallocs: %" PRIu64 "\n", ssl_mallocs); +// cmdline_printf(cl, " SSL/TLS reallocs: %" PRIu64 "\n", ssl_reallocs); if(stats.ip_packets && stats.total_bytes) { @@ -412,6 +417,7 @@ static void display_worker_stats(struct cmdline* cl,const ThreadStats &stats) cmdline_printf(cl, " SSL : %" PRIu64 "\n", stats.seen_already_blocked_ssl_ipv6); cmdline_printf(cl, " DPI errors:\n"); cmdline_printf(cl, " No memory http: %" PRIu64 "\n",stats.dpi_no_mempool_http); + cmdline_printf(cl, " No memory ssl: %" PRIu64 "\n",stats.dpi_no_mempool_ssl); cmdline_printf(cl, " Flows:\n"); cmdline_printf(cl, " IPv4:\n"); cmdline_printf(cl, " New: %" PRIu64 "\n", stats.new_flow); diff --git a/src/http.cpp b/src/http.cpp index 877aaa1..bbc5e34 100644 --- a/src/http.cpp +++ b/src/http.cpp @@ -44,14 +44,10 @@ int on_url_ext (http_parser *p, const char* at, size_t length, dpi_pkt_infos_t* length = d->uri.buf_size - d->uri.length; if(likely(length > 0)) { - if(d->uri.length != 0) - { - rte_memcpy(d->uri.buf + d->uri.length, at, length); - d->uri.length += length; - } else { - rte_memcpy(d->uri.buf, at, length); - d->uri.length = length; - } + for(size_t i = 0; i < length; i++) + d->uri.buf[d->uri.length++] = at[i]; +// rte_memcpy(d->uri.buf + d->uri.length, at, length); +// d->uri.length += length; } } return 0; @@ -108,17 +104,10 @@ int on_header_value_ext(http_parser *p, const char *at, size_t length, dpi_pkt_i case http::hstate_host: if(d->host_r.length + length > d->host_r.buf_size) length = d->host_r.buf_size - d->host_r.length; - if(length > 0) - { - if(d->host_r.length != 0) - { - rte_memcpy(d->host_r.buf + d->host_r.length, at, length); - d->host_r.length += length; - } else { - rte_memcpy(d->host_r.buf, at, length); - d->host_r.length = length; - } - } + for(size_t i = 0; i < length; i++) + d->host_r.buf[d->host_r.length++] = at[i]; +// rte_memcpy(d->host_r.buf + d->host_r.length, at, length); +// d->host_r.length += length; break; default: break; diff --git a/src/main.cpp b/src/main.cpp index 4279a95..589f558 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -32,6 +32,7 @@ #include "notification.h" #include "config.h" #include "tries.h" +#include "ssli.h" #define MBUF_CACHE_SIZE 256 @@ -140,7 +141,7 @@ void extFilter::initParams() prm->memory_configs.ipv4.flows_number = config().getInt("dpi.max_active_flows_ipv4", 0); if(prm->memory_configs.ipv4.flows_number == 0) { - prm->memory_configs.ipv4.flows_number = _calc_scale(scale, 500000, 10000000); + prm->memory_configs.ipv4.flows_number = _calc_scale(scale, 500000, 20000000); } prm->memory_configs.ipv6.flows_number = config().getInt("dpi.max_active_flows_ipv6", 0); @@ -196,6 +197,7 @@ void extFilter::initParams() prm->flow_lifetime[1] = 300; prm->memory_configs.http_entries = _calc_scale(scale, 70000, 250000); + prm->memory_configs.ssl_entries = _calc_scale(scale, 120000, 360000); prm->answer_duplication = config().getInt("answer_duplication", 0); if(prm->answer_duplication > 3) @@ -279,6 +281,7 @@ static inline void em_parse_ptype(struct rte_mbuf *m) packet_type |= RTE_PTYPE_L3_IPV6_EXT_UNKNOWN; } m->packet_type = packet_type; +// m->udata64 = ACL::ACL_DEFAULT_POLICY; struct packet_info *pkt_info; if(rte_mempool_get(extFilter::getPktInfoPool(), (void **)&pkt_info) != 0) { @@ -723,85 +726,31 @@ void extFilter::initialize(Application& self) { loadConfiguration(); ServerApplication::initialize(self); -/* - std::string fl("/usr/local/etc/extfilter/domains"); - std::string ur("/usr/local/etc/extfilter/urls"); - std::string sn("/usr/local/etc/extfilter/ssl_host"); - - const char *bl = "notify.tushino.com/blacklist"; - if(_tries.getBLManager()->init(fl, ur, sn, bl, strlen(bl))) - { - std::cout << "error!" << std::endl; - exit(0); - } - std::cout << "everything is ok" << std::endl; - - const char *host = "archive.is"; - const char *uri = "/20150813064134/http://www.maxi24-az.com/ru/obyavlenie/amfetamin-skorost-89612877418-krasnodar-stimulyator-metamfetamin_1690872.html"; - char *redir_url = nullptr; - int z = 0; - if((z=_tries.checkURLBlocked(0, host, strlen(host), uri, strlen(uri), &redir_url))) - { - std::cout << "URL is blocked" << std::endl; - if(redir_url) - { - std::cout << "redir to: " << redir_url << ", length: " << z << std::endl; - } - } else { - std::cout << "URL is not blocked" << std::endl; - } - exit(0); -*/ - -/* std::string emp; - fl = "/usr/local/etc/extfilter/ssl_host"; - _tries.getSNIBlacklist()->load(fl, emp); - if(_tries.checkSNIBlocked(0, host, strlen(host))) - { - std::cout << "SNI is blocked" << std::endl; - } else { - std::cout << "SNI is not blocked" << std::endl; - } -*/ - -/*#include "utils.h" - const char *b = "f\x09ucked"; - char buf[4096]; - url_encode(buf, b, strlen(b)); - std::cout << "buf: '" << buf << "'" << std::endl; - - const char abc[]="abc"; - std::cout << "size: " << sizeof(abc) << std::endl; -*/ -// exit(0); - - _num_of_senders = config().getInt("num_of_senders", 1); _block_ssl_no_sni = config().getBool("block_ssl_no_sni", false); _statistic_interval = config().getInt("statistic_interval", 0); _urlsFile = config().getString("urllist",""); _domainsFile = config().getString("domainlist",""); - _sslIpsFile = config().getString("sslips",""); - if(!_block_ssl_no_sni) + if(_block_ssl_no_sni) { - _sslIpsFile.assign(""); + _sslIpsFile = config().getString("sslips",""); } _sslFile = config().getString("ssllist",""); _hostsFile = config().getString("hostlist",""); _statisticsFile = config().getString("statisticsfile",""); - std::string http_code=config().getString("http_code",""); + std::string http_code = config().getString("http_code",""); if(!http_code.empty()) { http_code.erase(std::remove(http_code.begin(), http_code.end(), '"'), http_code.end()); - _sender_params.code=http_code; + _sender_params.code = http_code; logger().debug("HTTP code set to %s", http_code); } std::string redirect_url = config().getString("redirect_url",""); _sender_params.redirect_url = redirect_url; - _sender_params.send_rst_to_server=config().getBool("rst_to_server",false); - _sender_params.mtu=config().getInt("out_mtu",1500); + _sender_params.send_rst_to_server = config().getBool("rst_to_server",false); + _sender_params.mtu = config().getInt("out_mtu",1500); _notify_enabled = config().getBool("notify_enabled", false); _notify_acl_file = config().getString("notify_acl_file",""); @@ -817,8 +766,14 @@ void extFilter::initialize(Application& self) int _mem_channels = config().getInt("memory_channels", 2); - int coreMaskToUse=config().getInt("core_mask", 0); + int coreMaskToUse = config().getInt("core_mask", 0); + std::string operation_mode = config().getString("operation_mode","mirror"); + _operation_mode = OP_MODE_MIRROR; + if(operation_mode == "inline") + { + _operation_mode = OP_MODE_INLINE; + } // initialize DPDK std::stringstream dpdkParamsStream; @@ -904,6 +859,8 @@ void extFilter::initialize(Application& self) _nb_lcore_params=0; int cnt_sender = 0; + int networks_ports = 0; + int subscribers_ports = 0; for(uint32_t i=0; i < n_ports; i++) { std::string key("port "+std::to_string(i)); @@ -915,8 +872,10 @@ void extFilter::initialize(Application& self) if(type == "network") { port_type = P_TYPE_NETWORK; + networks_ports++; } else if (type == "subscriber") { + subscribers_ports++; } else if (type == "sender") { if(cnt_sender > 0) @@ -950,7 +909,7 @@ void extFilter::initialize(Application& self) } continue; } - std::string p=config().getString(key+".queues", ""); + std::string p = config().getString(key+".queues", ""); if(p.empty()) { logger().fatal("Port IDs are not sequential (port %d missing)", (int) i); @@ -962,6 +921,16 @@ void extFilter::initialize(Application& self) logger().fatal("Exceeded max number of worker threads: %z", restTokenizer.count()); throw Poco::Exception("Configuration error"); } + uint8_t mapto = 255; + if(_operation_mode == OP_MODE_INLINE) + { + mapto = config().getInt(key + ".mapto", 255); + if(mapto == 255) + { + logger().fatal("In the inline operation mode 'mapto' must be defined int the port %d", (int) i); + throw Poco::Exception("Configuration error"); + } + } int nb_lcores_per_port = 0; for(auto itr=restTokenizer.begin(); itr!=restTokenizer.end(); ++itr) { @@ -977,6 +946,7 @@ void extFilter::initialize(Application& self) _lcore_params_array[_nb_lcore_params].port_type = port_type; _lcore_params_array[_nb_lcore_params].queue_id = queue_id; _lcore_params_array[_nb_lcore_params].lcore_id = lcore_id; + _lcore_params_array[_nb_lcore_params].mapto = mapto; _nb_lcore_params++; nb_lcores_per_port++; } @@ -988,12 +958,28 @@ void extFilter::initialize(Application& self) throw Poco::Exception("Configuration error"); } - if(!cnt_sender) + if(_operation_mode == OP_MODE_MIRROR && !cnt_sender) { - logger().fatal("The senders port is not defined"); + logger().fatal("The senders port is not defined in the mirror operation mode"); throw Poco::Exception("Configuration error"); } + if(_operation_mode == OP_MODE_INLINE) + { + if(subscribers_ports != networks_ports) + { + logger().fatal("Number of subscribers and networks ports not equal in the inline operation mode"); + throw Poco::Exception("Configuration error"); + } + // TODO fill array for determine in <-> out + // like: + // inout_ports[0] = 1 + // inout_ports[1] = 0 + // inout_ports[2] = 3 + // inout_ports[3] = 2 + // then we can extract port for the output packets... + } + initParams(); _nb_ports = rte_eth_dev_count(); @@ -1022,7 +1008,6 @@ void extFilter::initialize(Application& self) if(loadACL()) throw Poco::Exception("Can't init ACL"); - if(_tries.getBLManager()->init(_domainsFile, _urlsFile, _sslFile, redirect_url.empty() ? nullptr : redirect_url.c_str(), redirect_url.empty() ? 0 : redirect_url.length())) { logger().fatal("Unable to load blacklists"); @@ -1163,6 +1148,7 @@ int extFilter::main(const ArgVec& args) } struct rte_mempool *_mp = nullptr; + struct rte_mempool *clone_pool = nullptr; std::vector ports; for (uint8_t portid = 0; portid < _nb_ports; portid++) @@ -1176,10 +1162,22 @@ int extFilter::main(const ArgVec& args) logger().fatal("Cannot initialize port %d", (int) portid); return Poco::Util::Application::EXIT_CONFIG; } - _mp = rte_pktmbuf_pool_create("SenderBuffer", 8192, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); + unsigned n = global_prm->workers_number*2; // 1 to client + 1 to server + n = 8192; + logger().information("Set number of entries of the sender buffer to %u", n); + _mp = rte_pktmbuf_pool_create("SenderBuffer", n, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); if(_mp == nullptr) { - logger().fatal("Unable to allocate mempool for sender"); + logger().fatal("Unable to allocate mempool for sender: %d", (int) rte_errno); + return Poco::Util::Application::EXIT_CONFIG; + } + n = global_prm->answer_duplication * global_prm->workers_number; + n = 1024; + logger().information("Set number of entries of the clone buffer to %u", n); + clone_pool = rte_pktmbuf_pool_create("clone_pool", n, MBUF_CACHE_SIZE, 0, 0, rte_socket_id()); + if(clone_pool == nullptr) + { + logger().fatal("Unable to allocate mempool for clone buffer"); return Poco::Util::Application::EXIT_CONFIG; } } else { @@ -1209,6 +1207,15 @@ int extFilter::main(const ArgVec& args) return Poco::Util::Application::EXIT_CONFIG; } + pool_name.assign("DPISSLPool"); + logger().information("Create pool '%s' for the ssl dissector with number of entries: %u, element size %z size: %Lu bytes", pool_name, global_prm->memory_configs.ssl_entries, sizeof(ssl_state),(uint64_t)(global_prm->memory_configs.ssl_entries * sizeof(ssl_state))); + struct rte_mempool *dpi_ssl_mempool = rte_mempool_create(pool_name.c_str(), global_prm->memory_configs.ssl_entries, sizeof(ssl_state), 0, 0, NULL, NULL, NULL, NULL, 0, 0); + if(dpi_ssl_mempool == nullptr) + { + logger().fatal("Unable to create mempool for the ssl dissector."); + return Poco::Util::Application::EXIT_CONFIG; + } + initFlowStorages(); uint8_t worker_id = 0; uint16_t tx_queue_id = 0; @@ -1230,6 +1237,7 @@ int extFilter::main(const ArgVec& args) dpi_protocol_t protocol; protocol.l4prot = IPPROTO_TCP; + protocol.l7prot = DPI_PROTOCOL_TCP_HTTP; dpi_set_protocol(dpi_state, protocol); @@ -1261,10 +1269,11 @@ int extFilter::main(const ArgVec& args) prms.params = _sender_params; prms.mac = (uint8_t *)&ports_eth_addr[1]; prms.to_mac = &sender_mac[0]; + prms.answer_duplication = global_prm->answer_duplication; + prms.clone_pool = clone_pool; } - - WorkerThread* newWorker = new WorkerThread(worker_id, workerName, workerConfigArr[worker_id], dpi_state, rte_lcore_to_socket_id(lcore_id), prms, _mp, dpi_http_mempool); + WorkerThread* newWorker = new WorkerThread(worker_id, workerName, workerConfigArr[worker_id], dpi_state, rte_lcore_to_socket_id(lcore_id), prms, _mp, dpi_http_mempool, dpi_ssl_mempool); int err = rte_eal_remote_launch(dpdkWorkerThreadStart, newWorker, lcore_id); if (err != 0) diff --git a/src/sender.cpp b/src/sender.cpp index 9418f12..3b1a1c2 100644 --- a/src/sender.cpp +++ b/src/sender.cpp @@ -225,7 +225,9 @@ int DSender::Send(uint8_t *buffer, int size, void *addr, int addr_size) ESender::ESender(struct nparams &prm, uint8_t port, struct rte_mempool *mp, WorkerThread *wt) : BSender("ESender", prm.params), _port(port), _mp(mp), - _wt(wt) + _clone_pool(prm.clone_pool), + _wt(wt), + _answer_duplication(prm.answer_duplication) { rte_memcpy(&_eth_hdr.s_addr, prm.mac, 6); rte_memcpy(&_eth_hdr.d_addr, prm.to_mac, 6); @@ -274,7 +276,8 @@ void ESender::sendPacket(void *ip_from, void *ip_to, int ip_ver, int port_from, return; } -void ESender::sendPacketIPv4(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh) + +void ESender::sendPacketIPv4(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server) { struct rte_mbuf *pkt = rte_pktmbuf_alloc(_mp); if(unlikely(pkt == nullptr)) @@ -291,12 +294,26 @@ void ESender::sendPacketIPv4(const uint8_t *l3_pkt, uint32_t acknum, uint32_t se ether_addr_copy(&_eth_hdr.d_addr, ð_hdr->d_addr); eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) pkt_buf; +/* ip_hdr->hdr_checksum = 0; + pkt->ol_flags |= PKT_TX_IPV4 | PKT_TX_IP_CKSUM;*/ ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr); if(likely(_wt->_n_send_pkts < EXTFILTER_WORKER_BURST_SIZE)) { - _wt->_sender_buf[_wt->_n_send_pkts] = pkt; - _wt->_n_send_pkts++; + if(!to_server && _answer_duplication > 0 && _wt->_n_send_pkts+_answer_duplication < EXTFILTER_WORKER_BURST_SIZE) + { + for(uint8_t z = 0; z < _answer_duplication; z++) + { + struct rte_mbuf *clone = rte_pktmbuf_clone(pkt, _clone_pool); + if(clone != nullptr) + _wt->_sender_buf[_wt->_n_send_pkts++] = clone; + else { + _logger.error("Unable to create clone packet."); + break; + } + } + } + _wt->_sender_buf[_wt->_n_send_pkts++] = pkt; } else { _logger.error("Can't send packet. Buffer is full."); rte_pktmbuf_free(pkt); @@ -305,7 +322,7 @@ void ESender::sendPacketIPv4(const uint8_t *l3_pkt, uint32_t acknum, uint32_t se return; } -void ESender::sendPacketIPv6(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh) +void ESender::sendPacketIPv6(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server) { struct rte_mbuf *pkt = rte_pktmbuf_alloc(_mp); if(unlikely(pkt == nullptr)) @@ -324,8 +341,20 @@ void ESender::sendPacketIPv6(const uint8_t *l3_pkt, uint32_t acknum, uint32_t se if(likely(_wt->_n_send_pkts < EXTFILTER_WORKER_BURST_SIZE)) { - _wt->_sender_buf[_wt->_n_send_pkts] = pkt; - _wt->_n_send_pkts++; + if(!to_server && _answer_duplication > 0 && _wt->_n_send_pkts+_answer_duplication < EXTFILTER_WORKER_BURST_SIZE) + { + for(uint8_t z = 0; z < _answer_duplication; z++) + { + struct rte_mbuf *clone = rte_pktmbuf_clone(pkt, _clone_pool); + if(clone != nullptr) + _wt->_sender_buf[_wt->_n_send_pkts++] = clone; + else { + _logger.error("Unable to create clone packet."); + break; + } + } + } + _wt->_sender_buf[_wt->_n_send_pkts++] = pkt; } else { _logger.error("Can't send packet. Buffer is full."); rte_pktmbuf_free(pkt); @@ -341,36 +370,26 @@ void ESender::HTTPRedirectIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seq const char *payload_ptr = f_lines; if(redir_url != nullptr && r_len + OUR_REDIR_SIZE < OUR_PAYLOAD_SIZE) { - rte_memcpy(payload, r_line1, sizeof(r_line1)-1); - rte_memcpy(payload + sizeof(r_line1) - 1, r_line2, sizeof(r_line2) -1); + memcpy(payload, r_line1, sizeof(r_line1)-1); + memcpy(payload + sizeof(r_line1) - 1, r_line2, sizeof(r_line2) -1); rte_memcpy(payload + sizeof(r_line1) - 1 + sizeof(r_line2) - 1 , redir_url, r_len); - rte_memcpy(payload + sizeof(r_line1) - 1 + sizeof(r_line2) - 1 + r_len, r_line3, sizeof(r_line3) - 1); + memcpy(payload + sizeof(r_line1) - 1 + sizeof(r_line2) - 1 + r_len, r_line3, sizeof(r_line3) - 1); payload_size = sizeof(r_line1) - 1 + sizeof(r_line2) - 1 + sizeof(r_line3) - 1 + r_len; payload_ptr = payload; } sendPacketIPv4(pkt, acknum, seqnum, payload_ptr, payload_size, false, f_psh); - if(global_prm->answer_duplication > 0) - { - for(uint8_t z = 0; z < global_prm->answer_duplication; z++) - sendPacketIPv4(pkt, acknum, seqnum, payload_ptr, payload_size, false, f_psh); - } // sendPacketIPv4(pkt, rte_cpu_to_be_32(rte_be_to_cpu_32(acknum) + payload_size), 0, nullptr, 0, true, false); // send rst... // And reset session with server, if needed if(_parameters.send_rst_to_server) - this->sendPacketIPv4(pkt, seqnum, acknum, nullptr, 0, 1, 0); + this->sendPacketIPv4(pkt, seqnum, acknum, nullptr, 0, 1, 0, true); } void ESender::HTTPForbiddenIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh) { sendPacketIPv4(pkt, acknum, seqnum, f_lines, sizeof(f_lines)-1, 0, f_psh); - if(global_prm->answer_duplication > 0) - { - for(uint8_t z = 0; z < global_prm->answer_duplication; z++) - sendPacketIPv4(pkt, acknum, seqnum, f_lines, sizeof(f_lines)-1, 0, f_psh); - } // And reset session with server, if needed if(_parameters.send_rst_to_server) - this->sendPacketIPv4(pkt, seqnum, acknum, nullptr, 0, 1, 0); + this->sendPacketIPv4(pkt, seqnum, acknum, nullptr, 0, 1, 0, true); } void ESender::HTTPRedirectIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh, const char *redir_url, size_t r_len) @@ -380,36 +399,26 @@ void ESender::HTTPRedirectIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seq const char *payload_ptr = f_lines; if(redir_url != nullptr && r_len + OUR_REDIR_SIZE < OUR_PAYLOAD_SIZE) { - rte_memcpy(payload, r_line1, sizeof(r_line1)-1); - rte_memcpy(payload + sizeof(r_line1) - 1, r_line2, sizeof(r_line2) -1); + memcpy(payload, r_line1, sizeof(r_line1)-1); + memcpy(payload + sizeof(r_line1) - 1, r_line2, sizeof(r_line2) -1); rte_memcpy(payload + sizeof(r_line1) - 1 + sizeof(r_line2) - 1 , redir_url, r_len); - rte_memcpy(payload + sizeof(r_line1) - 1 + sizeof(r_line2) - 1 + r_len, r_line3, sizeof(r_line3) - 1); + memcpy(payload + sizeof(r_line1) - 1 + sizeof(r_line2) - 1 + r_len, r_line3, sizeof(r_line3) - 1); payload_size = sizeof(r_line1) - 1 + sizeof(r_line2) - 1 + sizeof(r_line3) - 1 + r_len; payload_ptr = payload; } sendPacketIPv6(pkt, acknum, seqnum, payload_ptr, payload_size, false, f_psh); - if(global_prm->answer_duplication > 0) - { - for(uint8_t z = 0; z < global_prm->answer_duplication; z++) - sendPacketIPv6(pkt, acknum, seqnum, payload_ptr, payload_size, false, f_psh); - } // sendPacketIPv6(pkt, rte_cpu_to_be_32(rte_be_to_cpu_32(acknum) + payload_size), 0, nullptr, 0, true, false); // send rst... // And reset session with server, if needed if(_parameters.send_rst_to_server) - sendPacketIPv6(pkt, seqnum, acknum, nullptr, 0, 1, 0); + sendPacketIPv6(pkt, seqnum, acknum, nullptr, 0, 1, 0, true); } void ESender::HTTPForbiddenIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh) { sendPacketIPv6(pkt, acknum, seqnum, f_lines, sizeof(f_lines)-1, 0, f_psh); - if(global_prm->answer_duplication > 0) - { - for(uint8_t z = 0; z < global_prm->answer_duplication; z++) - sendPacketIPv6(pkt, acknum, seqnum, f_lines, sizeof(f_lines)-1, 0, f_psh); - } // And reset session with server, if needed if(_parameters.send_rst_to_server) - this->sendPacketIPv6(pkt, seqnum, acknum, nullptr, 0, 1, 0); + this->sendPacketIPv6(pkt, seqnum, acknum, nullptr, 0, 1, 0, true); } @@ -417,26 +426,16 @@ void ESender::SendRSTIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum) { // send rst to the client sendPacketIPv4(pkt, acknum, seqnum, nullptr, 0, true, false); - if(global_prm->answer_duplication > 0) - { - for(uint8_t z = 0; z < global_prm->answer_duplication; z++) - sendPacketIPv4(pkt, acknum, seqnum, nullptr, 0, true, false); - } // send rst to the server if(_parameters.send_rst_to_server) - sendPacketIPv4(pkt, seqnum, acknum, nullptr, 0, true, false); + sendPacketIPv4(pkt, seqnum, acknum, nullptr, 0, true, false, true); } void ESender::SendRSTIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum) { // send rst to the client sendPacketIPv6(pkt, acknum, seqnum, nullptr, 0, true, false); - if(global_prm->answer_duplication > 0) - { - for(uint8_t z = 0; z < global_prm->answer_duplication; z++) - sendPacketIPv6(pkt, acknum, seqnum, nullptr, 0, true, false); - } // send rst to the server if(_parameters.send_rst_to_server) - sendPacketIPv6(pkt, seqnum, acknum, nullptr, 0, true, false); + sendPacketIPv6(pkt, seqnum, acknum, nullptr, 0, true, false, true); } diff --git a/src/statistictask.cpp b/src/statistictask.cpp index 5488dfa..2f21b6a 100644 --- a/src/statistictask.cpp +++ b/src/statistictask.cpp @@ -188,7 +188,13 @@ void StatisticTask::OutStatistic() app.logger().information("Thread matched by ip/port: %" PRIu64 ", ssl SNI: %" PRIu64 ", ssl/ip: %" PRIu64 ", http IPv4: %" PRIu64 ", http IPv6: %" PRIu64, stats.matched_ip_port, stats.matched_ssl_sni, stats.matched_ssl_ip, stats.matched_http_bl_ipv4, stats.matched_http_bl_ipv6); app.logger().information("Thread redirected blocked http IPv4: %" PRIu64 ", redirected http IPv6: %" PRIu64 ", sended forbidden IPv4: %" PRIu64 ", sended forbidden IPv6: %" PRIu64 ", rst sended IPv4: %" PRIu64 ", rst sended IPv6: %" PRIu64, stats.redirected_http_bl_ipv4, stats.redirected_http_bl_ipv6, stats.sended_forbidden_ipv4, stats.sended_forbidden_ipv6, stats.sended_rst_ipv4, stats.sended_rst_ipv6); if(stats.latency_counters.blocked_pkts != 0 && stats.latency_counters.total_pkts != 0) - app.logger().information("Thread packets latency all packets: %" PRIu64 " cycles (%.0f ns), blocked packets: %" PRIu64 " (%.0f ns)", (stats.latency_counters.total_cycles / stats.latency_counters.total_pkts), cycles_to_ns(stats.latency_counters.total_cycles / stats.latency_counters.total_pkts), (stats.latency_counters.blocked_cycles / stats.latency_counters.blocked_pkts), cycles_to_ns(stats.latency_counters.blocked_cycles / stats.latency_counters.blocked_pkts)); + app.logger().information("Thread packets latency all packets: %" PRIu64 " cycles (%.0f ns), unblocked packets: %" PRIu64 " cycles (%.0f ns), blocked packets: %" PRIu64 " (%.0f ns)", + (stats.latency_counters.total_cycles / stats.latency_counters.total_pkts), + cycles_to_ns(stats.latency_counters.total_cycles / stats.latency_counters.total_pkts), + (stats.latency_counters.unblocked_cycles / stats.latency_counters.unblocked_pkts), + cycles_to_ns(stats.latency_counters.unblocked_cycles / stats.latency_counters.unblocked_pkts), + (stats.latency_counters.blocked_cycles / stats.latency_counters.blocked_pkts), + cycles_to_ns(stats.latency_counters.blocked_cycles / stats.latency_counters.blocked_pkts)); if(!_statisticsFile.empty()) { std::string worker_name("worker.core."+std::to_string(core)); diff --git a/src/tries.cpp b/src/tries.cpp index b3c1a87..dda9806 100644 --- a/src/tries.cpp +++ b/src/tries.cpp @@ -87,7 +87,7 @@ TriesControl::TriesControl(): } -int read_keys(std::istream &input, marisa::Keyset *m_domains, marisa::Keyset *urls, bool is_domains = false) +int read_keys(std::istream &input, marisa::Keyset *m_domains, marisa::Keyset *urls) { int lines = 0; std::string line; @@ -96,34 +96,21 @@ int read_keys(std::istream &input, marisa::Keyset *m_domains, marisa::Keyset *ur lines++; if(line[0] == '#' || line[0] == ';') continue; - if(is_domains) + std::size_t pos = line.find("*."); + if(pos != line.npos) { - std::size_t pos = line.find("*."); - if(pos != line.npos) - { - std::string s = line.substr(pos+1, line.length()-1); - std::string s1(s.c_str()+1); - s1 += "/"; - urls->push_back(s1.c_str(), s1.length()); // store domain without previous dot - std::reverse(s.begin(), s.end()); - m_domains->push_back(s.c_str(), s.length()); // store reverse - } else { - if(is_domains) - { - std::string s(line.c_str()); - s += "/"; - urls->push_back(s.c_str(), s.length()); - } - } + std::string s = line.substr(pos+1, line.length()-1); + urls->push_back(s.c_str()+1, s.length()-1); // store domain without previous dot + std::reverse(s.begin(), s.end()); + m_domains->push_back(s.c_str(), s.length()); // store reverse } else { urls->push_back(line.c_str(), line.length()); } - } return lines; } -bool TriesControl::load(std::string &domains_f, std::string &urls_f, bool is_sni) +bool TriesControl::load(std::string &domains_f, std::string &urls_f) { marisa::Keyset m_domains; marisa::Keyset urls; @@ -149,7 +136,7 @@ bool TriesControl::load(std::string &domains_f, std::string &urls_f, bool is_sni _logger.error("Failed to open domains file '%s'", domains_f); return true; } - domains_lines = read_keys(domains_file, &m_domains, &urls, is_sni ? false : true); + domains_lines = read_keys(domains_file, &m_domains, &urls); } catch (const marisa::Exception &ex) { _logger.error("Working with domains failed: %s", std::string(ex.what())); @@ -376,7 +363,7 @@ bool BlacklistsManager::init(std::string &_domains_file, std::string &_urls_file fillProfile(0, _domains_file, _urls_file, _sni_file, redir_url, url_length); _http_bl.load(_domains_file, _urls_file); std::string empty_s; - _sni_bl.load(_sni_file, empty_s, true); + _sni_bl.load(_sni_file, empty_s); _active_profile = 0; return false; } @@ -386,5 +373,5 @@ bool BlacklistsManager::update() if(_http_bl.load(_sp[_active_profile].domains_file, _sp[_active_profile].urls_file)) return true; std::string empty_s; - return _sni_bl.load(_sp[_active_profile].sni_file, empty_s, true); + return _sni_bl.load(_sp[_active_profile].sni_file, empty_s); } diff --git a/src/worker.cpp b/src/worker.cpp index f73140f..ed898d7 100644 --- a/src/worker.cpp +++ b/src/worker.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include "worker.h" #include "main.h" @@ -46,6 +47,7 @@ #include "utils.h" #include "http.h" #include "dtypes.h" +#include "ssli.h" #define tcphdr(x) ((struct tcphdr *)(x)) @@ -67,14 +69,7 @@ int on_header_complete_ext(http_parser* p, dpi_pkt_infos_t* pkt_informations, vo return 1; // no need to check body... } - -void ssl_cert_cb(char *certificate, int size, void *user_data, dpi_pkt_infos_t *pkt) -{ - WorkerThread *obj = (WorkerThread *) user_data; - obj->setNeedBlock(obj->checkSNIBlocked((const char *)certificate, size > 255 ? 255 : size, pkt)); -} - -WorkerThread::WorkerThread(uint8_t worker_id,const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, int socketid, struct ESender::nparams &sp, struct rte_mempool *mp, struct rte_mempool *dpi_http_mempool) : +WorkerThread::WorkerThread(uint8_t worker_id,const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, int socketid, struct ESender::nparams &sp, struct rte_mempool *mp, struct rte_mempool *dpi_http_mempool, struct rte_mempool *dpi_ssl_mempool) : m_WorkerConfig(workerConfig), m_Stop(true), _logger(Poco::Logger::get(name)), dpi_state(state), @@ -90,8 +85,7 @@ WorkerThread::WorkerThread(uint8_t worker_id,const std::string& name, WorkerConf }; dpi_http_activate_ext_callbacks(dpi_state, &ext_callbacks, this); - static dpi_ssl_callbacks_t ssl_callback = {.certificate_callback = ssl_cert_cb }; - dpi_ssl_activate_callbacks(state, &ssl_callback, this); + dpi_ssl_activate_external_inspector(state, ssl_inspector, this); ipv4_flow_mask = global_prm->memory_configs.ipv4.mask_parts_flow; ipv6_flow_mask = global_prm->memory_configs.ipv6.mask_parts_flow; @@ -104,6 +98,8 @@ WorkerThread::WorkerThread(uint8_t worker_id,const std::string& name, WorkerConf throw Poco::Exception("ESender is null!"); } _dpi_http_mempool = dpi_http_mempool; + _dpi_ssl_mempool = dpi_ssl_mempool; + _pkt_info_mempool = extFilter::getPktInfoPool(); } WorkerThread::~WorkerThread() @@ -168,12 +164,9 @@ bool WorkerThread::checkURLBlocked(const char *host, size_t host_len, const char return false; } - - dpi_identification_result_t WorkerThread::identifyAppProtocol(const unsigned char* pkt, u_int32_t length, u_int32_t current_time, uint8_t *host_key, uint32_t sig) { dpi_identification_result_t r; - r.status = DPI_STATUS_OK; dpi_pkt_infos_t infos = { 0 }; u_int8_t l3_status; @@ -208,8 +201,6 @@ dpi_identification_result_t WorkerThread::identifyAppProtocol(const unsigned cha return r; } - - dpi_identification_result_t WorkerThread::getAppProtocol(uint8_t *host_key, uint64_t timestamp, uint32_t sig, dpi_pkt_infos_t *pkt_infos) { dpi_identification_result_t r; @@ -616,7 +607,9 @@ bool WorkerThread::analyzePacket(struct rte_mbuf* m, uint64_t timestamp) m_ThreadStats.analyzed_packets++; uint32_t acl_action = pkt_info->acl_res & ACL_POLICY_MASK; - if(payload_len > 0 && acl_action == ACL::ACL_DROP) +// uint32_t acl_action = m->udata64 & ACL_POLICY_MASK; + + if(unlikely(payload_len > 0 && acl_action == ACL::ACL_DROP)) { m_ThreadStats.matched_ip_port++; if(ip_version == 4) @@ -768,7 +761,6 @@ bool WorkerThread::run(uint32_t coreId) return true; } -// m_CoreId = coreId; m_Stop = false; struct rte_mbuf *buf; @@ -787,7 +779,6 @@ bool WorkerThread::run(uint32_t coreId) for (int i = 0; i < qconf->n_rx_queue; i++) { portid = qconf->rx_queue_list[i].port_id; -// stats[lcore_id].port_id = portid; queueid = qconf->rx_queue_list[i].queue_id; _logger.information("-- lcoreid=%d portid=%d rxqueueid=%d", (int)lcore_id, (int)portid, (int)queueid); } @@ -828,11 +819,11 @@ bool WorkerThread::run(uint32_t coreId) m_ThreadStats.total_packets += nb_rx; // prefetch packets... - for(uint16_t i = 0; i < PREFETCH_OFFSET && i < nb_rx; i++) +/* for(uint16_t i = 0; i < PREFETCH_OFFSET && i < nb_rx; i++) { rte_prefetch0(rte_pktmbuf_mtod(bufs[i], void *)); } - +*/ struct ACL::acl_search_t acl_search; prepare_acl_parameter(bufs, &acl_search, nb_rx); @@ -844,6 +835,7 @@ bool WorkerThread::run(uint32_t coreId) { if(acl_search.res_ipv4[acli] != 0) { +// acl_search.m_ipv4[acli]->udata64 = acl_search.res_ipv4[acli]; ((struct packet_info *)acl_search.m_ipv4[acli]->userdata)->acl_res=acl_search.res_ipv4[acli]; } } @@ -855,15 +847,18 @@ bool WorkerThread::run(uint32_t coreId) { if(acl_search.res_ipv6[acli] != 0) { +// acl_search.m_ipv6[acli]->udata64 = acl_search.res_ipv6[acli]; ((struct packet_info *)acl_search.m_ipv6[acli]->userdata)->acl_res=acl_search.res_ipv6[acli]; } } } uint64_t cycles = 0; uint64_t blocked_cycles = 0; + uint64_t unblocked_cycles = 0; for(uint16_t i = 0; i < nb_rx; i++) { buf = bufs[i]; + rte_prefetch0(rte_pktmbuf_mtod(buf, void *)); if(likely(buf->userdata && port_type == P_TYPE_SUBSCRIBER)) { bool need_block = analyzePacket(buf, last_sec); @@ -872,14 +867,18 @@ bool WorkerThread::run(uint32_t coreId) { blocked_cycles += now - ((struct packet_info *)buf->userdata)->timestamp; m_ThreadStats.latency_counters.blocked_pkts++; + } else { + unblocked_cycles += now - ((struct packet_info *)buf->userdata)->timestamp; + m_ThreadStats.latency_counters.unblocked_pkts++; } cycles += now - ((struct packet_info *)buf->userdata)->timestamp; - rte_mempool_put(extFilter::getPktInfoPool(), buf->userdata); // free packet_info + rte_mempool_put(_pkt_info_mempool, buf->userdata); // free packet_info } rte_pktmbuf_free(buf); } m_ThreadStats.latency_counters.total_cycles += cycles; m_ThreadStats.latency_counters.blocked_cycles += blocked_cycles; + m_ThreadStats.latency_counters.unblocked_cycles += unblocked_cycles; m_ThreadStats.latency_counters.total_pkts += nb_rx; if(_n_send_pkts > 0) {