From 82a2664b0a55dbcd26a019fc2f2ca9a0bd6b7347 Mon Sep 17 00:00:00 2001 From: Max Date: Sun, 6 Aug 2017 20:21:37 +0300 Subject: [PATCH] Version 0.85 --- configure.ac | 4 +- etc/extfilter.ini | 15 +- include/config.h | 6 +- include/dpi.h | 1 - include/dtypes.h | 5 +- include/flow.h | 100 ++++++ include/main.h | 30 +- include/sender.h | 73 +++- include/sendertask.h | 28 +- include/stats.h | 5 +- include/worker.h | 71 +++- src/Makefile.am | 2 +- src/cmdlinetask.cpp | 2 + src/flow.cpp | 95 +++++ src/main.cpp | 323 +++++++++++------ src/notification.cpp | 2 +- src/reloadtask.cpp | 26 +- src/sender.cpp | 275 ++++++++++---- src/sendertask.cpp | 10 +- src/statistictask.cpp | 4 +- src/worker.cpp | 816 ++++++++++++++++++++++++++++++------------ 21 files changed, 1402 insertions(+), 491 deletions(-) diff --git a/configure.ac b/configure.ac index e968eff..e132c11 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. AC_PREREQ([2.69]) -AC_INIT(extFilter, 0.80, max1976@mail.ru) +AC_INIT(extFilter, 0.85, max1976@mail.ru) DPDK_HOME= DPDK_TARGET= @@ -49,7 +49,7 @@ AC_MSG_CHECKING([for debug enabled]) if test x"$debug" = x"true"; then CXXFLAGS="$CXXFLAGS -std=c++11 -O0 -g -Wall -pthread -msse -msse2 -msse3 -mssse3" else - CXXFLAGS="$CXXFLAGS -std=c++11 -O3 -Wall -fno-stack-protector -pthread -msse -msse2 -msse3 -mssse3" + CXXFLAGS="$CXXFLAGS -std=c++11 -O3 -Wall -fno-stack-protector -pthread -msse -msse2 -msse3 -mssse3 -march=native" fi AC_COMPILE_IFELSE([AC_LANG_SOURCE( diff --git a/etc/extfilter.ini b/etc/extfilter.ini index feb34ab..fa560ac 100644 --- a/etc/extfilter.ini +++ b/etc/extfilter.ini @@ -1,5 +1,5 @@ ; Переводить имя хоста в прописные буквы. Если url_normalization установлен в true, то не имеет значения. -lower_host = true +;lower_host = false domainlist = /usr/local/etc/extfilter/domains urllist = /usr/local/etc/extfilter/urls @@ -20,8 +20,7 @@ redirect_url = http://notify.example.com? http_code = 302 Found ; Что добавлять в redirect_url, line - строка из файла url, url - запрещенный url, none - ничего -url_additional_info=line - +url_additional_info = none ; посылать tcp rst в сторону сервера от имени клиента. Default: false rst_to_server = false @@ -32,8 +31,8 @@ statistic_interval = 300 ; Default: false match_url_exactly = false -; Default: false -block_undetected_ssl = false +; Блокировать ssl по ip из файла с ip адресами в случае отсутствия SNI. Default: false +block_ssl_no_sni = false ; Какие ядра использовать. Default: все ядра, кроме management. ; core_mask = 7 @@ -79,6 +78,12 @@ block_undetected_ssl = false ;[port 0] ;queues = 0,1; 1,2 +; Порт для отправки уведомлений через dpdk +;[port 1] +;type = sender +; На какой mac адрес отправлять пакеты +;mac = 00:01:02:03:04:05 + ; Группа оповещения 0 ;[notify 0] ;http_code = 302 Found diff --git a/include/config.h b/include/config.h index caa2394..fc0ccce 100644 --- a/include/config.h +++ b/include/config.h @@ -56,7 +56,7 @@ #define PACKAGE_NAME "extFilter" /* Define to the full name and version of this package. */ -#define PACKAGE_STRING "extFilter 0.80" +#define PACKAGE_STRING "extFilter 0.85" /* Define to the one symbol short name of this package. */ #define PACKAGE_TARNAME "extfilter" @@ -65,10 +65,10 @@ #define PACKAGE_URL "" /* Define to the version of this package. */ -#define PACKAGE_VERSION "0.80" +#define PACKAGE_VERSION "0.85" /* Define to 1 if you have the ANSI C header files. */ #define STDC_HEADERS 1 /* Version number of package */ -#define VERSION "0.80" +#define VERSION "0.85" diff --git a/include/dpi.h b/include/dpi.h index 704fd30..5d4e8b5 100644 --- a/include/dpi.h +++ b/include/dpi.h @@ -18,4 +18,3 @@ struct dpi_flow_info } }; -dpi_identification_result_t dpi_stateful_identify_application_protocol_new(dpi_library_state_t* state, const unsigned char* pkt, u_int32_t length, u_int32_t current_time, uint32_t hash); \ No newline at end of file diff --git a/include/dtypes.h b/include/dtypes.h index 7ba4032..727e190 100644 --- a/include/dtypes.h +++ b/include/dtypes.h @@ -8,14 +8,15 @@ enum entry_types { - E_TYPE_DOMAIN, + E_TYPE_DOMAIN=0, E_TYPE_URL }; enum port_types { P_TYPE_SUBSCRIBER, - P_TYPE_NETWORK + P_TYPE_NETWORK, + P_TYPE_SENDER }; struct entry_data diff --git a/include/flow.h b/include/flow.h index 5b39106..4aa8f82 100644 --- a/include/flow.h +++ b/include/flow.h @@ -12,9 +12,52 @@ #include #include #include +#include + +//#define _SIMPLE_HASH 1 + +extern "C" void dpi_reordering_tcp_delete_all_fragments(dpi_tracking_informations_t *victim); #define IPV6_ADDR_LEN 16 +struct ext_dpi_flow_info +{ + + u_int16_t srcport; + u_int16_t dstport; + u_int8_t l4prot; + + union src_addr{ /** Addresses mantained in network byte order. **/ + struct in6_addr ipv6_srcaddr; + u_int32_t ipv4_srcaddr; + } src_addr_t; + union dst_addr{ + struct in6_addr ipv6_dstaddr; + u_int32_t ipv4_dstaddr; + } dst_addr_t; + + dpi_flow_infos_t infos; + uint64_t last_timestamp; +// u_int64_t bytes; +// u_int32_t packets; + + inline void free_mem(dpi_flow_cleaner_callback* flow_cleaner_callback) + { + if(flow_cleaner_callback != nullptr) + (*(flow_cleaner_callback))(infos.tracking.flow_specific_user_data); + if(infos.tracking.http_informations[0].temp_buffer != nullptr) + free(infos.tracking.http_informations[0].temp_buffer); + if(infos.tracking.http_informations[1].temp_buffer != nullptr) + free(infos.tracking.http_informations[1].temp_buffer); + if(infos.tracking.ssl_information[0].pkt_buffer != nullptr) + free(infos.tracking.ssl_information[0].pkt_buffer); + if(infos.tracking.ssl_information[1].pkt_buffer != nullptr) + free(infos.tracking.ssl_information[1].pkt_buffer); + dpi_reordering_tcp_delete_all_fragments(&(infos.tracking)); + } +}; + + union ipv4_5tuple_host { struct { uint8_t pad0; @@ -66,6 +109,29 @@ struct ip_5tuple uint8_t proto; } __attribute__((__packed__)); + +#ifdef __SIMPLE_HASH +static inline uint32_t ipv4_hash_crc(const void *data, __rte_unused uint32_t data_len, uint32_t init_val) +{ + const union ipv4_5tuple_host *in = (const union ipv4_5tuple_host *)data; + return in->port_src+in->port_dst+in->ip_src+in->ip_dst+in->proto+init_val; +} + + +static inline uint32_t ipv6_hash_crc(const void *data, __rte_unused uint32_t data_len, uint32_t init_val) +{ + const union ipv6_5tuple_host *in = (const union ipv6_5tuple_host *)data; + u_int8_t i; + u_int32_t partsrc = 0, partdst = 0; + for(i=0; i< 16; i++){ + partsrc += in->ip_src[i]; + partdst += in->ip_dst[i]; + } + return in->port_src+in->port_dst+partsrc+partdst+in->proto+init_val; +} + +#else + static inline uint32_t ipv4_hash_crc(const void *data, __rte_unused uint32_t data_len, uint32_t init_val) { const union ipv4_5tuple_host *k; @@ -113,6 +179,40 @@ static inline uint32_t ipv6_hash_crc(const void *data, __rte_unused uint32_t dat return init_val; } +#endif + +/// rte_hash holder +class flowHash +{ +private: + Poco::Logger& _logger; + struct rte_hash *ipv4_FlowHash; + struct rte_hash *ipv6_FlowHash; + uint32_t _flowHashSizeIPv4; + uint32_t _flowHashSizeIPv6; +public: + flowHash(int socket_id, int thread_id, uint32_t flowHashSizeIPv4, uint32_t flowHashSizeIPv6); + ~flowHash(); + inline struct rte_hash *getIPv4Hash() + { + return ipv4_FlowHash; + } + inline struct rte_hash *getIPv6Hash() + { + return ipv6_FlowHash; + } + inline uint32_t getHashSizeIPv4() + { + return _flowHashSizeIPv4; + } + inline uint32_t getHashSizeIPv6() + { + return _flowHashSizeIPv6; + } + +}; + + #if defined(__SSE2__) static inline xmm_t em_mask_key(void *key, xmm_t mask) diff --git a/include/main.h b/include/main.h index 0b391bd..fe38603 100644 --- a/include/main.h +++ b/include/main.h @@ -17,6 +17,8 @@ #define RTE_TEST_RX_DESC_DEFAULT 128 #define RTE_TEST_TX_DESC_DEFAULT 512 +#define EXTF_MAX_PKT_BURST 32 + class AhoCorasickPlus; class Patricia; class ACL; @@ -34,14 +36,21 @@ struct lcore_rx_queue { uint8_t queue_id; } __rte_cache_aligned; +struct mbuf_table +{ + uint16_t len; + struct rte_mbuf* m_table[EXTF_MAX_PKT_BURST]; +}; + struct lcore_conf { uint16_t n_rx_queue; struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE]; - uint16_t tx_queue_id[RTE_MAX_ETHPORTS]; struct rte_acl_ctx *cur_acx_ipv4, *new_acx_ipv4; struct rte_acl_ctx *cur_acx_ipv6, *new_acx_ipv6; - // TODO add WorkerConfig??? -/* struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];*/ + uint8_t sender_port; + uint16_t tx_queue; + uint16_t tx_queue_id[RTE_MAX_ETHPORTS]; + struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS]; } __rte_cache_aligned; @@ -66,17 +75,12 @@ class extFilter: public Poco::Util::ServerApplication /** Load domains for blocking. **/ - void loadDomains(std::string &fn, AhoCorasickPlus *_dm_atm,DomainsMatchType *_dm_map); - - /** - Load URLs for blocking. - **/ - void loadURLs(std::string &fn, AhoCorasickPlus *dm_atm); + void loadDomains(std::string &fn, AhoCorasickPlus *_dm_atm); /** Load domains and urls into one database. **/ - void loadDomainsURLs(std::string &domains, std::string &urls, AhoCorasickPlus *dm_atm, EntriesData *ed); + void loadDomainsURLs(std::string &domains, std::string &urls, AhoCorasickPlus *dm_atm); std::string &getSSLFile() { @@ -176,7 +180,8 @@ class extFilter: public Poco::Util::ServerApplication static struct ether_addr ports_eth_addr[RTE_MAX_ETHPORTS]; private: - int initPort(uint8_t port, struct ether_addr *addr); + int initPort(uint8_t port, struct ether_addr *addr, bool no_promisc = false); + int initSenderPort(uint8_t port, struct ether_addr *addr, uint8_t nb_tx_queue); int initMemory(uint8_t nb_ports); int initACL(); @@ -199,7 +204,7 @@ class extFilter: public Poco::Util::ServerApplication bool _lower_host; bool _match_url_exactly; - bool _block_undetected_ssl; + bool _block_ssl_no_sni; bool _http_redirect; bool _url_normalization; bool _remove_dot; @@ -243,6 +248,7 @@ class extFilter: public Poco::Util::ServerApplication std::string _notify_acl_file; int _cmdline_port; Poco::Net::IPAddress _cmdline_ip; + uint8_t _dpdk_send_port; }; diff --git a/include/sender.h b/include/sender.h index 59bb870..49a7852 100644 --- a/include/sender.h +++ b/include/sender.h @@ -31,12 +31,13 @@ #include #include #include - +#include +#include #include #include - -class CSender { +class BSender +{ public: struct params { @@ -48,19 +49,69 @@ class CSender { int mtu; params() : code("302 Moved Temporarily"), send_rst_to_server(false), ttl(250), ip6_hops(250), mtu(1500) { } }; - CSender( std::string url ); - CSender(struct params &prm); - ~CSender(); - void Redirect(int user_port, int dst_port, void *ip_from, void *ip_to, int ip_ver, uint32_t acknum, uint32_t seqnum, int f_psh, std::string &additional_param); - void sendPacket(void *ip_from, void *ip_to, int ip_ver, int port_from, int port_to, uint32_t acknum, uint32_t seqnum, std::string &dt, int f_reset, int f_psh); + + BSender(const char *, struct params &prm); + virtual ~BSender(); + + void Redirect(int user_port, int dst_port, void *ip_from, void *ip_to, int ip_ver, uint32_t acknum, uint32_t seqnum, int f_psh, const char *add_prm); + virtual void sendPacket(void *ip_from, void *ip_to, int ip_ver, int port_from, int port_to, uint32_t acknum, uint32_t seqnum, std::string &dt, int f_reset, int f_psh); void SendRST(int user_port, int dst_port, void *ip_from, void *ip_to, int ip_ver, uint32_t acknum, uint32_t seqnum, int f_psh); + int makePacket(void *ip_from, void *ip_to, int ip_ver, int port_from, int port_to, uint32_t acknum, uint32_t seqnum, std::string &dt, int f_reset, int f_psh, uint8_t *buffer); + + virtual int Send(uint8_t *buffer, int size, void *addr, int addr_size) = 0; + + Poco::Logger& _logger; + std::string rHeader; + struct params _parameters; + uint16_t pkt_id; +}; + +class CSender : public BSender +{ +public: + CSender(struct BSender::params &prm); + ~CSender(); + int Send(uint8_t *buffer, int size, void *addr, int addr_size); private: int s; int s6; - std::string rHeader; - Poco::Logger& _logger; - struct params _parameters; }; +class DSender : public BSender +{ +public: + DSender(struct BSender::params &prm, uint8_t port, uint8_t *mac, uint8_t *to_mac, struct rte_mempool *mp); + ~DSender(); + int Send(uint8_t *buffer, int size, void *addr, int addr_size); +private: + uint8_t _port; + struct ether_hdr _eth_hdr; + struct rte_mempool *_mp; +}; + +class WorkerThread; + +class ESender : public BSender +{ +public: + struct nparams + { + struct params params; + uint8_t *mac; + uint8_t *to_mac; + }; + ESender(struct nparams ¶ms, uint8_t port, struct rte_mempool *mp, WorkerThread *wt); + ~ESender(); + void sendPacket(void *ip_from, void *ip_to, int ip_ver, int port_from, int port_to, uint32_t acknum, uint32_t seqnum, std::string &dt, int f_reset, int f_psh); + int Send(uint8_t *buffer, int size, void *addr, int addr_size) + { + return size; + } +private: + uint8_t _port; + struct ether_hdr _eth_hdr; + struct rte_mempool *_mp; + WorkerThread *_wt; +}; #endif diff --git a/include/sendertask.h b/include/sendertask.h index 2d58bc0..5f25fc3 100644 --- a/include/sendertask.h +++ b/include/sendertask.h @@ -186,21 +186,43 @@ class RedirectNotificationG: public Poco::Notification bool _is_rst; }; +struct redirectEvent +{ + uint16_t _user_port; + uint16_t _dst_port; + union + { + uint32_t ipv4; + __m128i ipv6; + } _user_ip; + union + { + uint32_t ipv4; + __m128i ipv6; + } _dst_ip; + uint8_t _ip_version; + uint32_t _acknum; + uint32_t _seqnum; + uint8_t _f_psh; + char *_additional_param; + uint8_t _is_rst; +}; + + /// Данная задача отсылает редирект заданному клиенту class SenderTask: public Poco::Task { public: - SenderTask(struct CSender::params &prm, int instance); + SenderTask(BSender *snd, int instance); ~SenderTask(); void runTask(); // очередь, куда необходимо писать отправные данные... static Poco::NotificationQueue queue; - private: - CSender *sender; + BSender *sender; Poco::Logger& _logger; }; diff --git a/include/stats.h b/include/stats.h index 5136f75..58b4242 100644 --- a/include/stats.h +++ b/include/stats.h @@ -39,9 +39,10 @@ class ThreadStats uint64_t already_detected_blocked; uint64_t reassembled_flows; struct LatencyCounters latency_counters; - ThreadStats() : redirected_domains(0), redirected_urls(0), sended_rst(0), ip_packets(0), total_bytes(0), matched_ssl(0), matched_ssl_ip(0), matched_ip_port(0),total_packets(0), analyzed_packets(0), matched_domains(0), matched_urls(0), ipv4_packets(0), ipv6_packets(0), ndpi_flows_count(0), ndpi_ipv4_flows_count(0), ndpi_ipv6_flows_count(0), max_ipv4_flows(0), max_ipv6_flows(0), ndpi_flows_deleted(0), missed_packets(0), enqueued_packets(0), ipv4_short_packets(0), ipv4_fragments(0), ipv6_fragments(0), already_detected_blocked(0), reassembled_flows(0), latency_counters{0,0,0,0} {} + uint64_t ndpi_flows_expired; + ThreadStats() : redirected_domains(0), redirected_urls(0), sended_rst(0), ip_packets(0), total_bytes(0), matched_ssl(0), matched_ssl_ip(0), matched_ip_port(0),total_packets(0), analyzed_packets(0), matched_domains(0), matched_urls(0), ipv4_packets(0), ipv6_packets(0), ndpi_flows_count(0), ndpi_ipv4_flows_count(0), ndpi_ipv6_flows_count(0), max_ipv4_flows(0), max_ipv6_flows(0), ndpi_flows_deleted(0), missed_packets(0), enqueued_packets(0), ipv4_short_packets(0), ipv4_fragments(0), ipv6_fragments(0), already_detected_blocked(0), reassembled_flows(0), latency_counters{0,0,0,0}, ndpi_flows_expired(0) {} - void clear() { redirected_domains = 0; redirected_urls = 0; sended_rst = 0; ip_packets = 0; total_bytes = 0; matched_ssl = 0; matched_ssl_ip = 0; matched_ip_port = 0; total_packets = 0; analyzed_packets = 0; matched_domains = 0; matched_urls = 0; ipv4_packets = 0; ipv6_packets = 0; ndpi_flows_count = 0; ndpi_flows_deleted = 0; missed_packets = 0; enqueued_packets = 0; ipv4_short_packets = 0; ipv4_fragments = 0; ipv6_fragments = 0; ndpi_ipv4_flows_count = 0; ndpi_ipv6_flows_count = 0; max_ipv4_flows = 0; max_ipv6_flows = 0; already_detected_blocked = 0; reassembled_flows = 0; latency_counters = {0, 0, 0, 0}; } + void clear() { redirected_domains = 0; redirected_urls = 0; sended_rst = 0; ip_packets = 0; total_bytes = 0; matched_ssl = 0; matched_ssl_ip = 0; matched_ip_port = 0; total_packets = 0; analyzed_packets = 0; matched_domains = 0; matched_urls = 0; ipv4_packets = 0; ipv6_packets = 0; ndpi_flows_count = 0; ndpi_flows_deleted = 0; missed_packets = 0; enqueued_packets = 0; ipv4_short_packets = 0; ipv4_fragments = 0; ipv6_fragments = 0; ndpi_ipv4_flows_count = 0; ndpi_ipv6_flows_count = 0; max_ipv4_flows = 0; max_ipv6_flows = 0; already_detected_blocked = 0; reassembled_flows = 0; latency_counters = {0, 0, 0, 0}; ndpi_flows_expired = 0; } }; diff --git a/include/worker.h b/include/worker.h index 9741c94..212da8d 100644 --- a/include/worker.h +++ b/include/worker.h @@ -15,9 +15,12 @@ #include "flow.h" #include "stats.h" #include "dpdk.h" +#include "sender.h" -#define SIZE_IPv4_FLOW_TABLE 32767 -#define SIZE_IPv6_FLOW_TABLE 32767 +#define EXTF_GC_INTERVAL 1000 // us +#define EXTF_ALL_GC_INTERVAL 1 // seconds + +#define EXT_DPI_FLOW_TABLE_MAX_IDLE_TIME 30 /** In seconds. **/ #define EXTFILTER_CAPTURE_BURST_SIZE 32 #define EXTFILTER_WORKER_BURST_SIZE 32 @@ -29,53 +32,58 @@ #define PREFETCH_OFFSET 3 class NotifyManager; +class ESender; struct WorkerConfig { uint32_t CoreId; - int port; + + uint8_t port; + AhoCorasickPlus *atm; - Poco::FastMutex atmLock; // для загрузки url + Poco::FastMutex *atmLock; // для загрузки url AhoCorasickPlus *atmSSLDomains; - DomainsMatchType *SSLdomainsMatchType; - Poco::FastMutex atmSSLDomainsLock; // для загрузки domains + Poco::FastMutex *atmSSLDomainsLock; // для загрузки domains bool match_url_exactly; bool lower_host; - bool block_undetected_ssl; + bool block_ssl_no_sni; bool http_redirect; enum ADD_P_TYPES add_p_type; - EntriesData *entriesData; - bool url_normalization; bool remove_dot; bool notify_enabled; NotifyManager *nm; + uint8_t sender_port; + uint16_t tx_queue_id; + WorkerConfig() { CoreId = RTE_MAX_LCORE+1; atm = NULL; atmSSLDomains = NULL; - SSLdomainsMatchType = NULL; match_url_exactly = false; lower_host = false; - block_undetected_ssl = false; + block_ssl_no_sni = false; http_redirect = true; add_p_type = A_TYPE_NONE; url_normalization = true; remove_dot = true; notify_enabled = false; nm = nullptr; + atmLock = nullptr; + atmSSLDomainsLock = nullptr; } }; class WorkerThread : public DpdkWorkerThread { + friend class ESender; private: - WorkerConfig &m_WorkerConfig; + WorkerConfig m_WorkerConfig; bool m_Stop; Poco::Logger& _logger; ThreadStats m_ThreadStats; @@ -89,13 +97,48 @@ class WorkerThread : public DpdkWorkerThread std::string certificate; bool analyzePacket(struct rte_mbuf* mBuf, uint64_t timestamp); + ext_dpi_flow_info *getFlow(uint8_t *host_key, uint64_t timestamp, int32_t *idx, uint32_t sig, dpi_pkt_infos_t *pkt_infos); + dpi_identification_result_t getAppProtocol(uint8_t *host_key, uint64_t timestamp, uint32_t sig, dpi_pkt_infos_t *pkt_infos); + dpi_identification_result_t identifyAppProtocol(const unsigned char* pkt, u_int32_t length, u_int32_t current_time, uint8_t *host_key, uint32_t sig); + + bool checkSSL(); std::string _name; - + bool _need_block; + uint16_t _partition_id; + + struct ext_dpi_flow_info **ipv4_flows; + struct ext_dpi_flow_info **ipv6_flows; + + struct rte_mempool *flows_pool; + + flowHash *m_FlowHash; + /// for sender through dpdk + int _n_send_pkts; + struct rte_mbuf* _sender_buf[EXTFILTER_WORKER_BURST_SIZE]; + ESender *_snd; public: - WorkerThread(const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, int socketid); + WorkerThread(const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, int socketid, flowHash *fh, struct ESender::nparams &sp, struct rte_mempool *mp); ~WorkerThread(); + bool checkHTTP(std::string &uri, dpi_pkt_infos_t *pkt); + bool checkSSL(std::string &certificate, dpi_pkt_infos_t *pkt); + + inline std::string &getUri() + { + return uri; + } + + inline std::string &getCert() + { + return certificate; + } + + inline void setNeedBlock(bool b) + { + _need_block = b; + } + bool run(uint32_t coreId); void stop() diff --git a/src/Makefile.am b/src/Makefile.am index bc75df2..507d921 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -6,5 +6,5 @@ bin_PROGRAMS = extFilter extFilter_LDFLAGS = -Wl,--whole-archive -lrte_acl -lrte_pmd_bond -lrte_pmd_vmxnet3_uio -lrte_pmd_virtio -lrte_pmd_enic -lrte_pmd_i40e -lrte_pmd_fm10k -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ring -lrte_pmd_af_packet -lrte_ethdev -lrte_eal -lrte_mbuf -lrte_mempool -lrte_mempool_ring -lrte_mempool_stack -lrte_ring -lrte_kvargs -lrte_hash -lrte_cmdline -lrte_net -Wl,--no-whole-archive -extFilter_SOURCES = main.cpp worker.cpp AhoCorasickPlus.cpp ahocorasick.cpp node.cpp mpool.cpp replace.cpp sender.cpp sendertask.cpp statistictask.cpp reloadtask.cpp flow.cpp acl.cpp cmdlinetask.cpp notification.cpp dpi.cpp +extFilter_SOURCES = main.cpp worker.cpp AhoCorasickPlus.cpp ahocorasick.cpp node.cpp mpool.cpp replace.cpp sender.cpp sendertask.cpp statistictask.cpp reloadtask.cpp flow.cpp acl.cpp cmdlinetask.cpp notification.cpp diff --git a/src/cmdlinetask.cpp b/src/cmdlinetask.cpp index 60449b9..fa0f6b3 100644 --- a/src/cmdlinetask.cpp +++ b/src/cmdlinetask.cpp @@ -371,6 +371,8 @@ static void display_worker_stats(struct cmdline* cl,const ThreadStats &stats) cmdline_printf(cl, " Active flows: %" PRIu64 "\n", stats.ndpi_flows_count); cmdline_printf(cl, " IPv4: %" PRIu64 "\n", stats.ndpi_ipv4_flows_count); cmdline_printf(cl, " IPv6: %" PRIu64 "\n", stats.ndpi_ipv6_flows_count); + cmdline_printf(cl, " Flows deleted: %" PRIu64 "\n", stats.ndpi_flows_deleted); + cmdline_printf(cl, " Flows expired: %" PRIu64 "\n", stats.ndpi_flows_expired); cmdline_printf(cl, " Reassembled flows: %" PRIu64 "\n", stats.reassembled_flows); } diff --git a/src/flow.cpp b/src/flow.cpp index 21f2c3d..ed99357 100644 --- a/src/flow.cpp +++ b/src/flow.cpp @@ -17,4 +17,99 @@ rte_xmm_t mask0 = {.u32 = {BIT_8_TO_15, ALL_32_BITS, ALL_32_BITS, ALL_32_BITS} } rte_xmm_t mask1 = {.u32 = {BIT_16_TO_23, ALL_32_BITS, ALL_32_BITS, ALL_32_BITS} }; rte_xmm_t mask2 = {.u32 = {ALL_32_BITS, ALL_32_BITS, 0, 0} }; +static int compare_ipv4(const void *key1, const void *key2, size_t key_len) +{ + ipv4_5tuple_host *flow = (ipv4_5tuple_host *) key1; + ipv4_5tuple_host *pkt_infos = (ipv4_5tuple_host *) key2; + + return !((flow->ip_src == pkt_infos->ip_src && + flow->ip_dst == pkt_infos->ip_dst && + flow->port_src == pkt_infos->port_src && + flow->port_dst == pkt_infos->port_dst) || + (flow->ip_src == pkt_infos->ip_dst && + flow->ip_dst == pkt_infos->ip_src && + flow->port_src == pkt_infos->port_dst && + flow->port_dst == pkt_infos->port_src)) && + flow->proto == pkt_infos->proto; +} + +static int compare_ipv6(const void *key1, const void *key2, size_t key_len) +{ + ipv6_5tuple_host *flow = (ipv6_5tuple_host *) key1; + ipv6_5tuple_host *pkt_infos = (ipv6_5tuple_host *) key2; + + u_int8_t i; + + /*1: src=src and dst=dst. 2: src=dst and dst=src. */ + u_int8_t direction=0; + + for(i=0; i< 16; i++) + { + if(direction!=2 && + pkt_infos->ip_src[i] == flow->ip_src[i] && + pkt_infos->ip_dst[i] == flow->ip_dst[i]) + { + direction=1; + }else if(direction!=1 && + pkt_infos->ip_src[i] == flow->ip_dst[i] && + pkt_infos->ip_dst[i] == flow->ip_src[i]) + { + direction=2; + }else + return 1; + } + + if(direction==1) + return !(flow->port_src == pkt_infos->port_src && + flow->port_dst == pkt_infos->port_dst && + flow->proto == pkt_infos->proto); + else if(direction==2) + return !(flow->port_src == pkt_infos->port_dst && + flow->port_src == pkt_infos->port_dst && + flow->proto ==pkt_infos->proto); + else + return 1; + +} + +flowHash::flowHash(int socket_id, int thread_id, uint32_t flowHashSizeIPv4, uint32_t flowHashSizeIPv6) : _logger(Poco::Logger::get("FlowHash_" + std::to_string(thread_id))), + _flowHashSizeIPv4(flowHashSizeIPv4), + _flowHashSizeIPv6(flowHashSizeIPv6) +{ + struct rte_hash_parameters ipv4_hash_params = {0}; + std::string ipv4_hash_name("ipv4_flow_hash_" + std::to_string(thread_id)); + ipv4_hash_params.entries = _flowHashSizeIPv4; + ipv4_hash_params.key_len = sizeof(union ipv4_5tuple_host); + ipv4_hash_params.hash_func = ipv4_hash_crc; + ipv4_hash_params.hash_func_init_val = 0; + ipv4_hash_params.name = ipv4_hash_name.c_str(); + ipv4_FlowHash = rte_hash_create(&ipv4_hash_params); + if(!ipv4_FlowHash) + { + _logger.fatal("Unable to create ipv4 flow hash"); + throw Poco::Exception("Unable to create ipv4 flow hash"); + } + rte_hash_set_cmp_func(ipv4_FlowHash, compare_ipv4); + std::string ipv6_hash_name("ipv6_flow_hash_" + std::to_string(thread_id)); + struct rte_hash_parameters ipv6_hash_params = {0}; + ipv6_hash_params.entries = _flowHashSizeIPv6; + ipv6_hash_params.key_len = sizeof(union ipv6_5tuple_host); + ipv6_hash_params.hash_func = ipv6_hash_crc; + ipv6_hash_params.hash_func_init_val = 0; + ipv6_hash_params.name = ipv6_hash_name.c_str(); + ipv6_FlowHash = rte_hash_create(&ipv6_hash_params); + if(!ipv4_FlowHash) + { + _logger.fatal("Unable to create ipv6 flow hash"); + throw Poco::Exception("Unable to create ipv6 flow hash"); + } + rte_hash_set_cmp_func(ipv6_FlowHash, compare_ipv6); +} + + +flowHash::~flowHash() +{ + rte_hash_free(ipv4_FlowHash); + rte_hash_free(ipv6_FlowHash); +} diff --git a/src/main.cpp b/src/main.cpp index 155b5b5..ce8f8ad 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -18,7 +19,6 @@ #include #include "worker.h" #include "main.h" -#include "dpi.h" #include #include "AhoCorasickPlus.h" @@ -28,6 +28,7 @@ #include "acl.h" #include "cmdlinetask.h" #include "notification.h" +#include "dpi.h" #define MBUF_CACHE_SIZE 256 @@ -48,8 +49,11 @@ extFilter *extFilter::_instance = NULL; struct rte_mempool *extFilter::packet_info_pool[NB_SOCKETS]; struct ether_addr extFilter::ports_eth_addr[RTE_MAX_ETHPORTS]; +uint8_t port_types[RTE_MAX_ETHPORTS]; struct lcore_conf extFilter::_lcore_conf[RTE_MAX_LCORE]; +uint8_t sender_mac[6]; + void flow_delete_cb(void* flow_specific_user_data) { if(flow_specific_user_data) @@ -190,12 +194,12 @@ static inline void em_parse_ptype(struct rte_mbuf *m) pkt_info->keys.ipv6_key.xmm[0] = em_mask_key(data0, mask1.x); pkt_info->keys.ipv6_key.xmm[1] = _mm_loadu_si128((__m128i *)(data1)); pkt_info->keys.ipv6_key.xmm[2] = em_mask_key(data2, mask2.x); -// m->hash.usr = ipv6_hash_crc(&pkt_info->keys.ipv6_key,0,0); + m->hash.usr = ipv6_hash_crc(&pkt_info->keys.ipv6_key,0,0); } else if (tcp_or_udp && (l3_ptypes == RTE_PTYPE_L3_IPV4)) { void *ipv4_hdr = l3 + offsetof(struct ipv4_hdr, time_to_live); pkt_info->keys.ipv4_key.xmm = em_mask_key(ipv4_hdr, mask0.x); -// m->hash.usr = ipv4_hash_crc(&pkt_info->keys.ipv4_key,0,0); + m->hash.usr = ipv4_hash_crc(&pkt_info->keys.ipv4_key,0,0); } } @@ -207,8 +211,82 @@ uint16_t cb_parse_ptype(uint8_t port __rte_unused, uint16_t queue __rte_unused, return nb_pkts; } +int extFilter::initSenderPort(uint8_t port, struct ether_addr *addr, uint8_t nb_tx_queue) +{ + int retval; + struct rte_eth_conf portConf; + memset(&portConf,0,sizeof(rte_eth_conf)); + portConf.rxmode.split_hdr_size = DPDK_CONFIG_SPLIT_HEADER_SIZE; + portConf.rxmode.header_split = DPDK_CONFIG_HEADER_SPLIT; + portConf.rxmode.hw_ip_checksum = DPDK_CONFIG_HW_IP_CHECKSUM; + portConf.rxmode.hw_vlan_filter = DPDK_CONFIG_HW_VLAN_FILTER; + portConf.rxmode.jumbo_frame = DPDK_CONFIG_JUMBO_FRAME; + portConf.rxmode.hw_strip_crc = DPDK_CONFIG_HW_STRIP_CRC; + portConf.rxmode.mq_mode = DPDK_CONFIG_MQ_MODE; + portConf.rx_adv_conf.rss_conf.rss_key = m_RSSKey; + portConf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_IPV4 | ETH_RSS_IPV6; + portConf.txmode.mq_mode = ETH_MQ_TX_NONE; + + retval = rte_eth_dev_configure(port, 1, nb_tx_queue, &portConf); + if (retval != 0) + return retval; + + struct rte_mempool *mpool = rte_pktmbuf_pool_create("Sender TX", 1000, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_eth_dev_socket_id(port)); -int extFilter::initPort(uint8_t port, struct ether_addr *addr) + logger().information("sender port=%d rx_queueid=%d nb_rxd=%d", (int) port, (int) 0, (int) _nb_rxd); + retval = rte_eth_rx_queue_setup(port, 0, _nb_rxd, rte_eth_dev_socket_id(port), NULL, mpool); + if (retval < 0) + { + logger().fatal("rte_eth_rx_queue_setup: err=%d (%s) port=%d", (int) retval, rte_strerror(-retval), (int)port); + return retval; + } + + + for(auto z=0; z < nb_tx_queue; z++) + { + logger().information("sender port=%d tx_queueid=%d tb_rxd=%d", (int) port, (int) z, (int) _nb_txd); + retval = rte_eth_tx_queue_setup(port, z, _nb_txd, rte_eth_dev_socket_id(port), NULL); + if (retval < 0) + { + logger().fatal("rte_eth_tx_queue_setup: err=%d (%s), port=%d, nb_tx_queue=%d, nb_txd=%d, socketid=%d", retval, rte_strerror(-retval), (int)port, (int)z, (int)_nb_txd, (int) rte_eth_dev_socket_id(port)); + return retval; + } + } + + rte_eth_macaddr_get(port, addr); + char buffer[100]; + sprintf(buffer,"%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8, addr->addr_bytes[0], addr->addr_bytes[1], addr->addr_bytes[2], addr->addr_bytes[3],addr->addr_bytes[4], addr->addr_bytes[5]); + std::string mac_addr(buffer); + logger().information("Port %d MAC: %s", (int)port, mac_addr); + + retval = rte_eth_dev_start(port); + if (retval < 0) + return retval; + +#define CHECK_INTERVAL 100 /* 100ms */ +#define MAX_CHECK_TIME 90 /* 9s (90 * 100ms) in total */ + struct rte_eth_link link; + + for (int count = 0; count <= MAX_CHECK_TIME; count++) + { + rte_eth_link_get_nowait(port, &link); + if(!link.link_status) + { + rte_delay_ms(CHECK_INTERVAL); + } else { + break; + } + } + + if (!link.link_status) + { + logger().warning("Link down on port %d", (int) port); + } + + return 0; +} + +int extFilter::initPort(uint8_t port, struct ether_addr *addr, bool no_promisc) { int16_t queueid; unsigned lcore_id; @@ -284,7 +362,7 @@ int extFilter::initPort(uint8_t port, struct ether_addr *addr) } queueid = qconf->rx_queue_list[queue].queue_id; - logger().debug("port=%d rx_queueid=%d nb_rxd=%d core=%d", (int) port, (int) queueid, (int) nb_rx_queue, (int) lcore_id); + logger().information("port=%d rx_queueid=%d nb_rxd=%d core=%d", (int) port, (int) queueid, (int) _nb_rxd, (int) lcore_id); retval = rte_eth_rx_queue_setup(port, queueid, _nb_rxd, socketid, NULL, _pktmbuf_pool[socketid]); if (retval < 0) { @@ -300,11 +378,12 @@ int extFilter::initPort(uint8_t port, struct ether_addr *addr) } if (queueid == -1) { // no rx_queue set, don't need to setup tx_queue for - // that clore + // that core continue; } // retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, rte_eth_dev_socket_id(port), NULL); + logger().information("port=%d tx_queueid=%d nb_txd=%d core=%d", (int) port, (int) nb_tx_queue, (int) _nb_txd, (int) lcore_id); retval = rte_eth_tx_queue_setup(port, nb_tx_queue, _nb_txd, socketid, NULL); if (retval < 0) { @@ -345,7 +424,8 @@ int extFilter::initPort(uint8_t port, struct ether_addr *addr) std::string mac_addr(buffer); logger().information("Port %d MAC: %s", (int)port, mac_addr); - rte_eth_promiscuous_enable(port); + if(!no_promisc) + rte_eth_promiscuous_enable(port); return 0; @@ -539,7 +619,7 @@ void extFilter::initialize(Application& self) ServerApplication::initialize(self); _dpi_max_active_flows_ipv4 = config().getInt("dpi.max_active_flows_ipv4", 1000000); - _dpi_max_active_flows_ipv6 = config().getInt("dpi.max_active_flows_ipv6", 1000000); + _dpi_max_active_flows_ipv6 = config().getInt("dpi.max_active_flows_ipv6", 20000); _dpi_fragmentation_ipv6_state = config().getBool("dpi.fragmentation_ipv6_state", true); _dpi_fragmentation_ipv4_state = config().getBool("dpi.fragmentation_ipv4_state", true); if(_dpi_fragmentation_ipv4_state) @@ -551,7 +631,7 @@ void extFilter::initialize(Application& self) _num_of_senders = config().getInt("num_of_senders", 1); _lower_host = config().getBool("lower_host", false); _match_url_exactly = config().getBool("match_url_exactly", false); - _block_undetected_ssl = config().getBool("block_undetected_ssl", false); + _block_ssl_no_sni = config().getBool("block_ssl_no_sni", false); _http_redirect = config().getBool("http_redirect", true); _url_normalization = config().getBool("url_normalization", true); _remove_dot = config().getBool("remove_dot", true); @@ -559,7 +639,7 @@ void extFilter::initialize(Application& self) _urlsFile = config().getString("urllist",""); _domainsFile = config().getString("domainlist",""); _sslIpsFile = config().getString("sslips",""); - if(!_block_undetected_ssl) + if(!_block_ssl_no_sni) { _sslIpsFile.assign(""); } @@ -693,15 +773,10 @@ void extFilter::initialize(Application& self) } _nb_lcore_params=0; + int cnt_sender = 0; for(uint32_t i=0; i < n_ports; i++) { std::string key("port "+std::to_string(i)); - std::string p=config().getString(key+".queues", ""); - if(p.empty()) - { - logger().fatal("Port IDs are not sequential (port %d missing)", (int) i); - throw Poco::Exception("Congfiguration error"); - } _enabled_port_mask |= (1 << i); std::string type = config().getString(key+".type", ""); uint8_t port_type = P_TYPE_SUBSCRIBER; @@ -712,11 +787,46 @@ void extFilter::initialize(Application& self) port_type = P_TYPE_NETWORK; } else if (type == "subscriber") { + } else if (type == "sender") + { + if(cnt_sender > 0) + { + logger().fatal("Too many sender ports"); + throw Poco::Exception("Congfiguration error"); + } + ++cnt_sender; + port_type = P_TYPE_SENDER; + _dpdk_send_port = i; } else { logger().fatal("Unknown port type %s", type); throw Poco::Exception("Congfiguration error"); } } + port_types[i] = port_type; + if(port_type == P_TYPE_SENDER) + { + std::string mac = config().getString(key+".mac",""); + if(mac.empty()) + { + logger().fatal("Destination mac address not found for port %d", (int)i); + throw Poco::Exception("Congfiguration error"); + + } + int last = 0; + int rc = sscanf(mac.c_str(), "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx%n", sender_mac + 0, sender_mac + 1, sender_mac + 2, sender_mac + 3, sender_mac + 4, sender_mac + 5, &last); + if(rc != 6 || mac.size() != last) + { + logger().fatal("Invalid mac address '%s' for port %d", mac, (int)i); + throw Poco::Exception("Congfiguration error"); + } + continue; + } + std::string p=config().getString(key+".queues", ""); + if(p.empty()) + { + logger().fatal("Port IDs are not sequential (port %d missing)", (int) i); + throw Poco::Exception("Congfiguration error"); + } Poco::StringTokenizer restTokenizer(p, ";"); std::vector lcores; int nb_lcores_per_port = 0; @@ -891,15 +1001,28 @@ int extFilter::main(const ArgVec& args) return Poco::Util::Application::EXIT_CONFIG; } + struct rte_mempool *_mp = nullptr; + std::vector ports; for (uint8_t portid = 0; portid < _nb_ports; portid++) { if ((_enabled_port_mask & (1 << portid)) == 0) continue; - if(initPort(portid, &ports_eth_addr[portid]) != 0) + if(port_types[portid] == P_TYPE_SENDER) { - logger().fatal("Cannot initialize port %d", (int) portid); - return Poco::Util::Application::EXIT_CONFIG; + if(initSenderPort(portid, &ports_eth_addr[portid], _nb_lcore_params) != 0) + { + logger().fatal("Cannot initialize port %d", (int) portid); + return Poco::Util::Application::EXIT_CONFIG; + } + + _mp = rte_pktmbuf_pool_create("SenderBuffer", 1000, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); + } else { + if(initPort(portid, &ports_eth_addr[portid]) != 0) + { + logger().fatal("Cannot initialize port %d", (int) portid); + return Poco::Util::Application::EXIT_CONFIG; + } } ports.push_back(portid); } @@ -907,12 +1030,22 @@ int extFilter::main(const ArgVec& args) WorkerConfig workerConfigArr[RTE_MAX_LCORE]; Poco::TaskManager tm; - for(int i=1; i <= _num_of_senders; i++) - tm.start(new SenderTask(_sender_params,i)); + if(_mp == nullptr) + { + for(int i=1; i <= _num_of_senders; i++) + { + tm.start(new SenderTask(new CSender(_sender_params), i)); + } + } NotifyManager *nm = new NotifyManager(20000, _notify_groups); tm.start(nm); + + int max_ipv4_flows_per_core = ceil((float)_dpi_max_active_flows_ipv4/(float)(_nb_lcore_params)); + int max_ipv6_flows_per_core = ceil((float)_dpi_max_active_flows_ipv6/(float)(_nb_lcore_params)); + + uint16_t tx_queue_id = 0; /* launch per-lcore init on every lcore */ RTE_LCORE_FOREACH(lcore_id) { @@ -922,16 +1055,14 @@ int extFilter::main(const ArgVec& args) if(!_domainsFile.empty() && !_urlsFile.empty()) { workerConfigArr[lcore_id].atm = new AhoCorasickPlus(); - workerConfigArr[lcore_id].entriesData = new EntriesData(); - loadDomainsURLs(_domainsFile, _urlsFile, workerConfigArr[lcore_id].atm, workerConfigArr[lcore_id].entriesData); + loadDomainsURLs(_domainsFile, _urlsFile, workerConfigArr[lcore_id].atm); workerConfigArr[lcore_id].atm->finalize(); } - workerConfigArr[lcore_id].block_undetected_ssl = _block_undetected_ssl; + workerConfigArr[lcore_id].block_ssl_no_sni = _block_ssl_no_sni; if(!_sslFile.empty()) { workerConfigArr[lcore_id].atmSSLDomains = new AhoCorasickPlus(); - workerConfigArr[lcore_id].SSLdomainsMatchType = new DomainsMatchType; - loadDomains(_sslFile, workerConfigArr[lcore_id].atmSSLDomains, workerConfigArr[lcore_id].SSLdomainsMatchType); + loadDomains(_sslFile, workerConfigArr[lcore_id].atmSSLDomains); workerConfigArr[lcore_id].atmSSLDomains->finalize(); } workerConfigArr[lcore_id].match_url_exactly = _match_url_exactly; @@ -942,27 +1073,58 @@ int extFilter::main(const ArgVec& args) workerConfigArr[lcore_id].add_p_type = _add_p_type; workerConfigArr[lcore_id].notify_enabled = _notify_enabled; workerConfigArr[lcore_id].nm = nm; + workerConfigArr[lcore_id].atmLock = new Poco::FastMutex(); + workerConfigArr[lcore_id].atmSSLDomainsLock = new Poco::FastMutex(); + workerConfigArr[lcore_id].sender_port = _dpdk_send_port; + workerConfigArr[lcore_id].tx_queue_id = tx_queue_id; + + logger().information("Initializing dpi flow hash with ipv4 max flows %d, ipv6 max flows %d.", max_ipv4_flows_per_core, max_ipv6_flows_per_core); + flowHash *mFlowHash = new flowHash(rte_lcore_to_socket_id(lcore_id), lcore_id, max_ipv4_flows_per_core, max_ipv6_flows_per_core); + +// dpi_library_state_t* dpi_state = dpi_init_stateful(ipv4_buckets, ipv6_buckets, _dpi_max_active_flows_ipv4, _dpi_max_active_flows_ipv6); + dpi_library_state_t* dpi_state = dpi_init_stateless(); + dpi_set_max_trials(dpi_state, 1); + dpi_inspect_nothing(dpi_state); + + dpi_protocol_t protocol; + protocol.l4prot = IPPROTO_TCP; + protocol.l7prot = DPI_PROTOCOL_TCP_HTTP; + dpi_set_protocol(dpi_state, protocol); + + protocol.l7prot = DPI_PROTOCOL_TCP_SSL; + dpi_set_protocol(dpi_state, protocol); + - dpi_library_state_t* dpi_state=dpi_init_stateful(SIZE_IPv4_FLOW_TABLE, SIZE_IPv6_FLOW_TABLE, _dpi_max_active_flows_ipv4, _dpi_max_active_flows_ipv6); dpi_set_flow_cleaner_callback(dpi_state, &flow_delete_cb); if(!_dpi_tcp_reordering) dpi_tcp_reordering_disable(dpi_state); else dpi_tcp_reordering_enable(dpi_state); + if(_dpi_fragmentation_ipv4_state) dpi_ipv4_fragmentation_enable(dpi_state, _dpi_fragmentation_ipv4_table_size); else dpi_ipv4_fragmentation_disable(dpi_state); + if(_dpi_fragmentation_ipv6_state) dpi_ipv6_fragmentation_enable(dpi_state, _dpi_fragmentation_ipv6_table_size); else dpi_ipv6_fragmentation_disable(dpi_state); + + std::string workerName("WorkerThread-" + std::to_string(lcore_id)); logger().debug("Preparing thread '%s'", workerName); - WorkerThread* newWorker = new WorkerThread(workerName, workerConfigArr[lcore_id], dpi_state, rte_lcore_to_socket_id(lcore_id)); + ESender::nparams prms; + if(_mp != nullptr) + { + prms.params = _sender_params; + prms.mac = (uint8_t *)&ports_eth_addr[1]; + prms.to_mac = &sender_mac[0]; + } + WorkerThread* newWorker = new WorkerThread(workerName, workerConfigArr[lcore_id], dpi_state, rte_lcore_to_socket_id(lcore_id), mFlowHash, prms, _mp); int err = rte_eal_remote_launch(dpdkWorkerThreadStart, newWorker, lcore_id); if (err != 0) @@ -972,6 +1134,7 @@ int extFilter::main(const ArgVec& args) } _workerThreadVec.push_back(newWorker); pthread_setname_np(lcore_config[lcore_id].thread_id, workerName.c_str()); + tx_queue_id++; } } @@ -1011,7 +1174,7 @@ int extFilter::main(const ArgVec& args) return Poco::Util::Application::EXIT_OK; } -void extFilter::loadDomainsURLs(std::string &domains, std::string &urls, AhoCorasickPlus *dm_atm, EntriesData *ed) +void extFilter::loadDomainsURLs(std::string &domains, std::string &urls, AhoCorasickPlus *dm_atm) { logger().debug("Loading domains from file %s",domains); Poco::FileInputStream df(domains); @@ -1029,15 +1192,18 @@ void extFilter::loadDomainsURLs(std::string &domains, std::string &urls, AhoCora if(str[0] == '#' || str[0] == ';') continue; AhoCorasickPlus::EnumReturnStatus status; - AhoCorasickPlus::PatternId patId = entry_id; + AhoCorasickPlus::PatternId patId = lineno; std::size_t pos = str.find("*."); bool exact_match=true; std::string insert=str; + patId <<= 2; if(pos != std::string::npos) { exact_match=false; insert=str.substr(pos+2,str.length()-2); } + patId |= exact_match; + patId |= E_TYPE_DOMAIN << 1; status = dm_atm->addPattern(insert, patId); if (status != AhoCorasickPlus::RETURNSTATUS_SUCCESS) { @@ -1048,17 +1214,6 @@ void extFilter::loadDomainsURLs(std::string &domains, std::string &urls, AhoCora } else { logger().error("Failed to add '%s' from line %d from file %s",insert,lineno,domains); } - } else { - entry_data e; - e.type = E_TYPE_DOMAIN; - e.match_exactly = exact_match; - e.lineno = lineno; - std::pair res=ed->insert(EntriesData::ValueType(entry_id,e)); - if(!res.second) - { - logger().fatal("Logic error: found duplicate in the EntriesData. Domain '%s' line %d from file '%s'", str, lineno, domains); - throw Poco::Exception("Logic error: found duplicate in the EntriesData."); - } } } entry_id++; @@ -1082,13 +1237,31 @@ void extFilter::loadDomainsURLs(std::string &domains, std::string &urls, AhoCora if(str[0] == '#' || str[0] == ';') continue; AhoCorasickPlus::EnumReturnStatus status; - AhoCorasickPlus::PatternId patId = entry_id; + AhoCorasickPlus::PatternId patId = lineno; + if(_url_normalization) + { + std::string url = "http://" + str; + try + { + Poco::URI url_p(url); + url_p.normalize(); + url.assign(url_p.toString()); + } catch (Poco::SyntaxException &ex) + { + logger().error("An SyntaxException occured: '%s' on URI: '%s'", ex.displayText(), url); + } + str.assign(url.c_str()+7, url.length()-7); + } /* std::string url = str; std::size_t http_pos = url.find("http://"); if(http_pos == std::string::npos || http_pos > 0) { url.insert(0,"http://"); }*/ + patId <<= 2; + patId |= 0; + patId |= E_TYPE_URL << 1; + status = dm_atm->addPattern(str, patId); if (status != AhoCorasickPlus::RETURNSTATUS_SUCCESS) { @@ -1099,17 +1272,6 @@ void extFilter::loadDomainsURLs(std::string &domains, std::string &urls, AhoCora } else { logger().error("Failed to add '%s' from line %d from file %s",str,lineno,urls); } - } else { - entry_data e; - e.type = E_TYPE_URL; - e.match_exactly = false; - e.lineno = lineno; - std::pair res=ed->insert(EntriesData::ValueType(entry_id,e)); - if(!res.second) - { - logger().fatal("Logic error: found duplicate in the EntriesData. URL '%s' line %d from file '%s'", str, lineno, urls); - throw Poco::Exception("Logic error: found duplicate in the EntriesData."); - } } } entry_id++; @@ -1120,49 +1282,7 @@ void extFilter::loadDomainsURLs(std::string &domains, std::string &urls, AhoCora logger().debug("Finish loading URLS"); } -void extFilter::loadURLs(std::string &fn, AhoCorasickPlus *dm_atm) -{ - logger().debug("Loading URLS from file %s",fn); - Poco::FileInputStream uf(fn); - if(uf.good()) - { - int lineno=1; - while(!uf.eof()) - { - std::string str; - getline(uf,str); - if(!str.empty()) - { - if(str[0] == '#' || str[0] == ';') - continue; - AhoCorasickPlus::EnumReturnStatus status; - AhoCorasickPlus::PatternId patId = lineno; - std::string url = str; - std::size_t http_pos = url.find("http://"); - if(http_pos == std::string::npos || http_pos > 0) - { - url.insert(0,"http://"); - } - status = dm_atm->addPattern(url, patId); - if (status!=AhoCorasickPlus::RETURNSTATUS_SUCCESS) - { - if(status == AhoCorasickPlus::RETURNSTATUS_DUPLICATE_PATTERN) - { - logger().warning("Pattern '%s' already present in the URL database from file %s",str,fn); - } else { - logger().error("Failed to add '%s' from line %d from file %s",str,lineno,fn); - } - } - } - lineno++; - } - } else - throw Poco::OpenFileException(fn); - uf.close(); - logger().debug("Finish loading URLS"); -} - -void extFilter::loadDomains(std::string &fn, AhoCorasickPlus *dm_atm,DomainsMatchType *dm_map) +void extFilter::loadDomains(std::string &fn, AhoCorasickPlus *dm_atm) { logger().debug("Loading domains from file %s",fn); Poco::FileInputStream df(fn); @@ -1187,6 +1307,9 @@ void extFilter::loadDomains(std::string &fn, AhoCorasickPlus *dm_atm,DomainsMatc exact_match=false; insert=str.substr(pos+2,str.length()-2); } + patId <<= 2; + patId |= exact_match; + patId |= E_TYPE_DOMAIN << 1; status = dm_atm->addPattern(insert, patId); if (status!=AhoCorasickPlus::RETURNSTATUS_SUCCESS) { @@ -1196,14 +1319,6 @@ void extFilter::loadDomains(std::string &fn, AhoCorasickPlus *dm_atm,DomainsMatc } else { logger().error("Failed to add '%s' from line %d from file %s",insert,lineno,fn); } - } else { - std::pair res=dm_map->insert(DomainsMatchType::ValueType(lineno,exact_match)); - if(res.second) - { -// logger().debug("Inserted domain: '%s' from line %d from file %s",str,lineno,fn); - } else { - logger().debug("Updated domain: '%s' from line %d from file %s",str,lineno,fn); - } } } lineno++; diff --git a/src/notification.cpp b/src/notification.cpp index 73c992e..0ec48e2 100644 --- a/src/notification.cpp +++ b/src/notification.cpp @@ -99,7 +99,7 @@ void NotifyManager::runTask() { _logger.error("Unable to find sender for group with id %d", notify_group); } else { - r->second->Redirect(pNotifyNf->user_port(), pNotifyNf->dst_port(), pNotifyNf->user_ip(), pNotifyNf->dst_ip(), pNotifyNf->ip_version(), pNotifyNf->acknum(), pNotifyNf->seqnum(), pNotifyNf->f_psh(), pNotifyNf->additional_param()); + r->second->Redirect(pNotifyNf->user_port(), pNotifyNf->dst_port(), pNotifyNf->user_ip(), pNotifyNf->dst_ip(), pNotifyNf->ip_version(), pNotifyNf->acknum(), pNotifyNf->seqnum(), pNotifyNf->f_psh(), pNotifyNf->additional_param().c_str()); // struct redirect_params rp = r->second; // std::string full_url("@HTTP/1.1 "+rp.code+"\r\nLocation: " + rp.redirect_url + pNotifyNf->additional_param() + "\r\nConnection: close\r\n"); // sender->Redirect(pNotifyNf->user_port(), pNotifyNf->dst_port(), pNotifyNf->user_ip(), pNotifyNf->dst_ip(), pNotifyNf->ip_version(), pNotifyNf->acknum(), pNotifyNf->seqnum(), pNotifyNf->f_psh(), full_url); diff --git a/src/reloadtask.cpp b/src/reloadtask.cpp index 50568a6..a55903e 100644 --- a/src/reloadtask.cpp +++ b/src/reloadtask.cpp @@ -64,57 +64,43 @@ void ReloadTask::runTask() continue; WorkerConfig& config=(static_cast(*it))->getConfig(); AhoCorasickPlus *to_del_atm; - DomainsMatchType *to_del_dm; AhoCorasickPlus *atm_new; - DomainsMatchType *dm_new; - EntriesData *datas_new; if(!_parent->getSSLFile().empty()) { atm_new = new AhoCorasickPlus(); - dm_new = new DomainsMatchType; try { - _parent->loadDomains(_parent->getSSLFile(), atm_new, dm_new); + _parent->loadDomains(_parent->getSSLFile(), atm_new); atm_new->finalize(); - config.atmSSLDomainsLock.lock(); + config.atmSSLDomainsLock->lock(); to_del_atm = config.atmSSLDomains; - to_del_dm = config.SSLdomainsMatchType; config.atmSSLDomains = atm_new; - config.SSLdomainsMatchType = dm_new; - config.atmSSLDomainsLock.unlock(); + config.atmSSLDomainsLock->unlock(); delete to_del_atm; - delete to_del_dm; _logger.information("Reloaded data for ssl domains list for core %u", (*it)->getCoreId()); } catch (Poco::Exception &excep) { _logger.error("Got exception while reload ssl data: %s", excep.displayText()); delete atm_new; - delete dm_new; } } if(!_parent->getDomainsFile().empty() && !_parent->getURLsFile().empty()) { atm_new = new AhoCorasickPlus(); - datas_new = new EntriesData(); - EntriesData *datas_del; try { - _parent->loadDomainsURLs(_parent->getDomainsFile(), _parent->getURLsFile(), atm_new, datas_new); + _parent->loadDomainsURLs(_parent->getDomainsFile(), _parent->getURLsFile(), atm_new); atm_new->finalize(); - config.atmLock.lock(); + config.atmLock->lock(); to_del_atm = config.atm; config.atm = atm_new; - datas_del = config.entriesData; - config.entriesData = datas_new; - config.atmLock.unlock(); + config.atmLock->unlock(); delete to_del_atm; - delete datas_del; _logger.information("Reloaded data for domains and urls list for core %u", (*it)->getCoreId()); } catch (Poco::Exception &excep) { _logger.error("Got exception while reload domains and urls data: %s", excep.displayText()); delete atm_new; - delete datas_new; } } } diff --git a/src/sender.cpp b/src/sender.cpp index 1b846e6..b19abf3 100644 --- a/src/sender.cpp +++ b/src/sender.cpp @@ -17,13 +17,18 @@ * */ +#define __STDC_FORMAT_MACROS +#include #include "sender.h" #include #include #include #include #include +#include #include +#include +#include "worker.h" struct pseudo_header { @@ -43,106 +48,74 @@ struct ipv6_pseudo_hdr nexthdr: 8; }; -CSender::CSender(struct params &prm) : _logger(Poco::Logger::get("CSender")), _parameters(prm) +BSender::BSender(const char *cn, struct params &prm) : _logger(Poco::Logger::get(cn)), _parameters(prm) { - this->s = ::socket( PF_INET, SOCK_RAW, IPPROTO_RAW ); - if( s == -1 ) { - _logger.error("Failed to create IPv4 socket!"); - return; - } - this->s6 = ::socket( PF_INET6, SOCK_RAW, IPPROTO_RAW ); - if( s6 == -1 ) { - _logger.error("Failed to create IPv6 socket!"); - return; - } - - int one = 1; - const int *val = &one; - if( ::setsockopt(this->s, IPPROTO_IP, IP_HDRINCL, val, sizeof(one)) < 0 ) - { - _logger.error("Error setting IP_HDRINCL for IPv4 socket"); - return; - } - this->rHeader = "HTTP/1.1 "+_parameters.code+"\r\nLocation: " + _parameters.redirect_url + "\r\nConnection: close\r\n\r\n"; _logger.debug("Default header is %s", rHeader); + pkt_id = 1; } -CSender::~CSender() +BSender::~BSender() { - ::close(s); - ::close(s6); + } -void CSender::sendPacket(void *ip_from, void *ip_to, int ip_ver, int port_from, int port_to, uint32_t acknum, uint32_t seqnum, std::string &dt, int f_reset, int f_psh) +int BSender::makePacket(void *ip_from, void *ip_to, int ip_ver, int port_from, int port_to, uint32_t acknum, uint32_t seqnum, std::string &dt, int f_reset, int f_psh, uint8_t *buffer) { - char datagram[4096], *data; - - // zero out the packet buffer - memset(datagram, 0, sizeof(datagram)); - + int pkt_len; + pkt_id++; + // IP header - struct iphdr *iph = (struct iphdr *) datagram; - struct ip6_hdr *iph6 = (struct ip6_hdr *) datagram; + struct iphdr *iph = (struct iphdr *) buffer; + struct ip6_hdr *iph6 = (struct ip6_hdr *) buffer; // TCP header - struct tcphdr *tcph = (struct tcphdr *) (datagram + (ip_ver == 4 ? sizeof(struct iphdr) : sizeof(struct ip6_hdr))); + struct tcphdr *tcph = (struct tcphdr *) (buffer + (ip_ver == 4 ? sizeof(struct iphdr) : sizeof(struct ip6_hdr))); - struct sockaddr_in sin; - struct sockaddr_in6 sin6; int payloadlen=dt.size(); - if(payloadlen > (_parameters.mtu - (ip_ver == 4 ? sizeof(struct iphdr) : sizeof(struct ip6_hdr)) + sizeof(struct tcphdr) - 12)) - { - _logger.warning("Size of the outgoing packet bigger than the MTU. Removing all additional data in the redirect packet. Payload: %s", dt); - dt = rHeader; - payloadlen = rHeader.size(); - } // Data part - data = (char *)tcph + sizeof(struct tcphdr); - rte_memcpy(data, dt.c_str(), payloadlen); + uint8_t *data = (uint8_t *)tcph + sizeof(struct tcphdr); + + if(!dt.empty()) + rte_memcpy(data, dt.c_str(), payloadlen); if(_logger.getLevel() == Poco::Message::PRIO_DEBUG) { Poco::Net::IPAddress ipa(ip_to, ip_ver == 4 ? sizeof(in_addr) : sizeof(in6_addr)); _logger.debug("Trying to send packet to %s port %d", ipa.toString(), port_to); } - if(ip_ver == 4) { - sin.sin_family = AF_INET; - sin.sin_port = htons(port_to); - sin.sin_addr.s_addr = ((in_addr *)ip_to)->s_addr; // Fill the IPv4 header iph->ihl = 5; iph->version = 4; iph->tos=0; - iph->tot_len = sizeof(struct iphdr) + sizeof(struct tcphdr) + payloadlen; - iph->id = htons(random()); + iph->tot_len = rte_cpu_to_be_16(sizeof(struct iphdr) + sizeof(struct tcphdr) + payloadlen); + iph->id = rte_cpu_to_be_16(pkt_id); iph->frag_off = 0; iph->ttl = _parameters.ttl; iph->protocol = IPPROTO_TCP; iph->check = 0; iph->saddr = ((in_addr *)ip_from)->s_addr; - iph->daddr = sin.sin_addr.s_addr; + iph->daddr = ((in_addr *)ip_to)->s_addr;; + pkt_len = sizeof(struct iphdr) + sizeof(struct tcphdr) + payloadlen; } else { - sin6.sin6_family = AF_INET6; - sin6.sin6_port = 0; // not filled in ipv6 - rte_mov16((uint8_t *)&sin6.sin6_addr, (uint8_t *)ip_to); // IPv6 version (4 bits), Traffic class (8 bits), Flow label (20 bits) iph6->ip6_flow = htonl ((6 << 28) | (0 << 20) | 0); // Payload length (16 bits): TCP header + TCP data - iph6->ip6_plen = htons (sizeof(struct tcphdr) + payloadlen); + iph6->ip6_plen = rte_cpu_to_be_16 (sizeof(struct tcphdr) + payloadlen); // Next header (8 bits): 6 for TCP iph6->ip6_nxt = IPPROTO_TCP; // Hop limit (8 bits): default to maximum value iph6->ip6_hops = 250; rte_mov16((uint8_t *)&iph6->ip6_src, (uint8_t *)ip_from); rte_mov16((uint8_t *)&iph6->ip6_dst, (uint8_t *)ip_to); + pkt_len = (sizeof(struct ip6_hdr) + sizeof(struct tcphdr) + payloadlen); } // TCP Header - tcph->source = htons(port_from); - tcph->dest = htons(port_to); + tcph->source = port_from; + tcph->dest = port_to; tcph->seq = acknum; tcph->doff = 5; tcph->syn = 0; @@ -153,54 +126,76 @@ void CSender::sendPacket(void *ip_from, void *ip_to, int ip_ver, int port_from, tcph->ack = 1; tcph->ack_seq = seqnum; tcph->fin = 0; - tcph->window = htons(0xEF); + tcph->window = rte_cpu_to_be_16(0xEF); } else { tcph->ack_seq = seqnum; tcph->ack = 1; tcph->fin = 1; - tcph->window = htons(5880); + tcph->window = rte_cpu_to_be_16(5885); } tcph->urg = 0; tcph->check = 0; tcph->urg_ptr = 0; + if(ip_ver == 4) + tcph->check = rte_ipv4_udptcp_cksum((const ipv4_hdr*)iph,tcph); + else + tcph->check = rte_ipv6_udptcp_cksum((const ipv6_hdr*)iph6,tcph); + return pkt_len; +} + +void BSender::sendPacket(void *ip_from, void *ip_to, int ip_ver, int port_from, int port_to, uint32_t acknum, uint32_t seqnum, std::string &dt, int f_reset, int f_psh) +{ + uint8_t datagram[4096]; + int pkt_len = makePacket(ip_from, ip_to, ip_ver, port_from, port_to, acknum, seqnum, dt, f_reset, f_psh, &datagram[0]); + + struct sockaddr_in sin; + struct sockaddr_in6 sin6; + + if(ip_ver == 4) + { + sin.sin_family = AF_INET; + sin.sin_port = port_to; + sin.sin_addr.s_addr = ((in_addr *)ip_to)->s_addr; + } else { + sin6.sin6_family = AF_INET6; + sin6.sin6_port = 0; // not filled in ipv6 + rte_mov16((uint8_t *)&sin6.sin6_addr, (uint8_t *)ip_to); + } if(ip_ver == 4) { - iph->tot_len = rte_cpu_to_be_16(iph->tot_len); - tcph->check = rte_ipv4_udptcp_cksum((const ipv4_hdr*)iph,tcph); - iph->tot_len = rte_be_to_cpu_16(iph->tot_len); // Send the packet - if( ::sendto( this->s, datagram, iph->tot_len, 0, (struct sockaddr *)&sin, sizeof(sin)) < 0 ) + if(Send((uint8_t *)&datagram, pkt_len,(struct sockaddr *)&sin, sizeof(sin)) < 0 ) { Poco::Net::IPAddress ipa(ip_to, ip_ver == 4 ? sizeof(in_addr) : sizeof(in6_addr)); _logger.error("sendto() failed to %s:%d errno: %d",ipa.toString(), port_to, errno); } } else { - tcph->check = rte_ipv6_udptcp_cksum((const ipv6_hdr*)iph6,tcph); // Send the packet - if( ::sendto( this->s6, datagram, (sizeof(struct ip6_hdr) + sizeof(struct tcphdr) + payloadlen), 0, (struct sockaddr *)&sin6, sizeof(sin6)) < 0 ) + if(Send((uint8_t *)&datagram, pkt_len, (struct sockaddr *)&sin6, sizeof(sin6)) < 0 ) { Poco::Net::IPAddress ipa(ip_to, ip_ver == 4 ? sizeof(in_addr) : sizeof(in6_addr)); - _logger.error("sendto() failed to [%s]:%d errno: %d",ipa.toString(), port_to, errno); + Poco::Net::IPAddress ipb(ip_from, ip_ver == 4 ? sizeof(in_addr) : sizeof(in6_addr)); + _logger.error("sendto() failed to [%s]:%d from [%s]:%d errno: %d", ipa.toString(), ntohs(port_to), ipb.toString(), ntohs(port_from), errno); } } return; } -void CSender::Redirect(int user_port, int dst_port, void *user_ip, void *dst_ip, int ip_ver, uint32_t acknum, uint32_t seqnum, int f_psh, std::string &additional_param) +void BSender::Redirect(int user_port, int dst_port, void *user_ip, void *dst_ip, int ip_ver, uint32_t acknum, uint32_t seqnum, int f_psh, const char *additional_param) { // формируем дополнительные параметры std::string tstr = rHeader; - if(!additional_param.empty() && additional_param[0] == '@') + if(additional_param != nullptr && additional_param[0] == '@' && (tstr.length() < (_parameters.mtu - (ip_ver == 4 ? sizeof(struct iphdr) : sizeof(struct ip6_hdr)) + sizeof(struct tcphdr) - sizeof(struct ether_hdr)))) { - tstr = additional_param.substr(1, additional_param.length()); + tstr.assign(additional_param+1); } else { - if(!additional_param.empty() && _parameters.redirect_url[_parameters.redirect_url.length()-1] == '?') + if(additional_param != nullptr && _parameters.redirect_url[_parameters.redirect_url.length()-1] == '?' && (tstr.length() < (_parameters.mtu - (ip_ver == 4 ? sizeof(struct iphdr) : sizeof(struct ip6_hdr)) + sizeof(struct tcphdr) - sizeof(struct ether_hdr)))) { - tstr = "HTTP/1.1 "+_parameters.code+"\r\nLocation: " + _parameters.redirect_url + additional_param + "\r\nConnection: close\r\n"; + tstr = "HTTP/1.1 "+_parameters.code+"\r\nLocation: " + _parameters.redirect_url + additional_param + "\r\nConnection: close\r\n\r\n"; } } this->sendPacket(dst_ip, user_ip, ip_ver, dst_port, user_port, acknum, seqnum, tstr, 0, f_psh); @@ -214,7 +209,9 @@ void CSender::Redirect(int user_port, int dst_port, void *user_ip, void *dst_ip, return; } -void CSender::SendRST(int user_port, int dst_port, void *user_ip, void *dst_ip, int ip_ver, uint32_t acknum, uint32_t seqnum, int f_psh) + + +void BSender::SendRST(int user_port, int dst_port, void *user_ip, void *dst_ip, int ip_ver, uint32_t acknum, uint32_t seqnum, int f_psh) { std::string empty_str; // send rst to the client @@ -224,4 +221,142 @@ void CSender::SendRST(int user_port, int dst_port, void *user_ip, void *dst_ip, this->sendPacket(user_ip, dst_ip, ip_ver, user_port, dst_port, seqnum, acknum, empty_str, 1, 0); } +CSender::CSender(struct params &prm) : BSender("CSender", prm) +{ + this->s = ::socket( PF_INET, SOCK_RAW, IPPROTO_RAW ); + if( s == -1 ) { + _logger.error("Failed to create IPv4 socket!"); + return; + } + this->s6 = ::socket( PF_INET6, SOCK_RAW, IPPROTO_RAW ); + if( s6 == -1 ) { + _logger.error("Failed to create IPv6 socket!"); + return; + } + + int one = 1; + const int *val = &one; + if( ::setsockopt(this->s, IPPROTO_IP, IP_HDRINCL, val, sizeof(one)) < 0 ) + { + _logger.error("Error setting IP_HDRINCL for IPv4 socket"); + return; + } + +} + +CSender::~CSender() +{ + ::close(s); + ::close(s6); +} + + +int CSender::Send(uint8_t *buffer, int size, void *addr, int addr_size) +{ + if(addr_size == sizeof(sockaddr_in)) + { + return ::sendto(this->s, buffer, size, 0, (struct sockaddr *)addr, addr_size); + } else { + return ::sendto(this->s6, buffer, size, 0, (struct sockaddr *)&addr, addr_size); + } +} + + +DSender::DSender(struct BSender::params &prm, uint8_t port, uint8_t *mac, uint8_t *to_mac, struct rte_mempool *mp) : BSender("DSender", prm), + _port(port), + _mp(mp) +{ + memcpy(&_eth_hdr.s_addr, mac, 6); + memcpy(&_eth_hdr.d_addr, to_mac, 6); +} + +DSender::~DSender() +{ + +} + +int DSender::Send(uint8_t *buffer, int size, void *addr, int addr_size) +{ + struct rte_mbuf *pkt; + pkt = rte_pktmbuf_alloc(_mp); + if(pkt == nullptr) + { + _logger.error("Unable to allocate buffer for the packet"); + return -1; + } + int pkt_size = size + sizeof(struct ether_hdr); + pkt->data_len = pkt_size; + pkt->pkt_len = pkt_size; + struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(pkt, struct ether_hdr *); + rte_memcpy(eth_hdr, &_eth_hdr, sizeof(struct ether_hdr)); + if(addr_size == sizeof(sockaddr_in)) + { + eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) buffer; + ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr); + } else { + eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv6); + } + char *data = ((char *)eth_hdr + sizeof(struct ether_hdr)); + rte_memcpy(data, buffer, size); + if(rte_eth_tx_burst(_port, 0, &pkt, 1) != 1) + { + _logger.error("Unable to send packet with size %d to port %d", pkt_size, (int) _port); + rte_pktmbuf_free(pkt); + return -1; + } + return pkt_size; +} + +ESender::ESender(struct nparams &prm, uint8_t port, struct rte_mempool *mp, WorkerThread *wt) : BSender("DSender", prm.params), + _port(port), + _mp(mp), + _wt(wt) +{ + memcpy(&_eth_hdr.s_addr, prm.mac, 6); + memcpy(&_eth_hdr.d_addr, prm.to_mac, 6); +} + +ESender::~ESender() +{ + +} + +void ESender::sendPacket(void *ip_from, void *ip_to, int ip_ver, int port_from, int port_to, uint32_t acknum, uint32_t seqnum, std::string &dt, int f_reset, int f_psh) +{ + pkt_id++; + struct rte_mbuf *pkt = rte_pktmbuf_alloc(_mp); + if(unlikely(pkt == nullptr)) + { + _logger.error("Unable to allocate buffer for the packet"); + return; + } + struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(pkt, struct ether_hdr *); + uint8_t *pkt_buf = ((uint8_t *)eth_hdr + sizeof(struct ether_hdr)); + + int pkt_len = makePacket(ip_from, ip_to, ip_ver, port_from, port_to, acknum, seqnum, dt, f_reset, f_psh, pkt_buf) + sizeof(struct ether_hdr); + pkt->data_len = pkt_len; + pkt->pkt_len = pkt_len; + rte_memcpy(eth_hdr, &_eth_hdr, sizeof(struct ether_hdr)); + if(ip_ver == 4) + { + eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) pkt_buf; + ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr); + } else { + eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv6); + } + + if(likely(_wt->_n_send_pkts < EXTFILTER_WORKER_BURST_SIZE)) + { + _wt->_sender_buf[_wt->_n_send_pkts] = pkt; + _wt->_n_send_pkts += 1; + } else { + _logger.error("Can't send packet. Buffer is full."); + rte_pktmbuf_free(pkt); + return; + } + + return; +} diff --git a/src/sendertask.cpp b/src/sendertask.cpp index 01b9b17..b3e78d5 100644 --- a/src/sendertask.cpp +++ b/src/sendertask.cpp @@ -18,15 +18,15 @@ */ #include "sendertask.h" - #include "sender.h" +#include Poco::NotificationQueue SenderTask::queue; -SenderTask::SenderTask(struct CSender::params &prm, int instance): +SenderTask::SenderTask(BSender *snd, int instance): Task("SenderTask-"+std::to_string(instance)), - sender(new CSender(prm)), - _logger(Poco::Logger::get("SenderTask"+std::to_string(instance))) + sender(snd), + _logger(Poco::Logger::get("SenderTask-"+std::to_string(instance))) { } @@ -53,7 +53,7 @@ void SenderTask::runTask() if(pRedirectNf->is_rst()) sender->SendRST(pRedirectNf->user_port(), pRedirectNf->dst_port(),pRedirectNf->user_ip(),pRedirectNf->dst_ip(), pRedirectNf->ip_version(), pRedirectNf->acknum(), pRedirectNf->seqnum(), pRedirectNf->f_psh()); else - sender->Redirect(pRedirectNf->user_port(), pRedirectNf->dst_port(),pRedirectNf->user_ip(),pRedirectNf->dst_ip(), pRedirectNf->ip_version(), pRedirectNf->acknum(), pRedirectNf->seqnum(), pRedirectNf->f_psh(), pRedirectNf->additional_param()); + sender->Redirect(pRedirectNf->user_port(), pRedirectNf->dst_port(),pRedirectNf->user_ip(),pRedirectNf->dst_ip(), pRedirectNf->ip_version(), pRedirectNf->acknum(), pRedirectNf->seqnum(), pRedirectNf->f_psh(), pRedirectNf->additional_param().c_str()); } } } diff --git a/src/statistictask.cpp b/src/statistictask.cpp index 9926902..f8872cf 100644 --- a/src/statistictask.cpp +++ b/src/statistictask.cpp @@ -194,7 +194,7 @@ void StatisticTask::OutStatistic() app.logger().information("Thread IPv4 fragments: %" PRIu64 ", IPv6 fragments: %" PRIu64 ", IPv4 short packets: %" PRIu64, stats.ipv4_fragments, stats.ipv6_fragments, stats.ipv4_short_packets); app.logger().information("Thread matched by ip/port: %" PRIu64 ", matched by ssl: %" PRIu64 ", matched by ssl/ip: %" PRIu64 ", matched by domain: %" PRIu64 ", matched by url: %" PRIu64, stats.matched_ip_port, stats.matched_ssl, stats.matched_ssl_ip, stats.matched_domains, stats.matched_urls); app.logger().information("Thread redirected domains: %" PRIu64 ", redirected urls: %" PRIu64 ", rst sended: %" PRIu64, stats.redirected_domains,stats.redirected_urls,stats.sended_rst); - app.logger().information("Thread active flows: %" PRIu64 " (IPv4 flows: %" PRIu64 ", IPv6 flows: %" PRIu64 "), already detected blocked: %" PRIu64, stats.ndpi_flows_count, stats.ndpi_ipv4_flows_count, stats.ndpi_ipv6_flows_count, stats.already_detected_blocked); + app.logger().information("Thread active flows: %" PRIu64 " (IPv4 flows: %" PRIu64 ", IPv6 flows: %" PRIu64 "), deleted flows: %" PRIu64 " already detected blocked: %" PRIu64, stats.ndpi_flows_count, stats.ndpi_ipv4_flows_count, stats.ndpi_ipv6_flows_count, stats.ndpi_flows_deleted, stats.already_detected_blocked); if(stats.latency_counters.blocked_pkts != 0 && stats.latency_counters.total_pkts != 0) app.logger().information("Thread packets latency all packets: %" PRIu64 " cycles (%.0f ns), blocked packets: %" PRIu64 " (%.0f ns)", (stats.latency_counters.total_cycles / stats.latency_counters.total_pkts), cycles_to_ns(stats.latency_counters.total_cycles / stats.latency_counters.total_pkts), (stats.latency_counters.blocked_cycles / stats.latency_counters.blocked_pkts), cycles_to_ns(stats.latency_counters.blocked_cycles / stats.latency_counters.blocked_pkts)); if(!_statisticsFile.empty()) @@ -222,7 +222,7 @@ void StatisticTask::OutStatistic() app.logger().information("All worker IPv4 fragments: %" PRIu64 ", IPv6 fragments: %" PRIu64 ", IPv4 short packets: %" PRIu64, ipv4_fragments, ipv6_fragments, ipv4_short_packets); app.logger().information("All worker threads matched by ip/port: %" PRIu64 ", matched by ssl: %" PRIu64 ", matched by ssl/ip: %" PRIu64 ", matched by domain: %" PRIu64 ", matched by url: %" PRIu64, matched_ip_port, matched_ssl, matched_ssl_ip, matched_domains, matched_urls); app.logger().information("All worker threads redirected domains: %" PRIu64 ", redirected urls: %" PRIu64 ", rst sended: %" PRIu64, redirected_domains, redirected_urls, sended_rst); - app.logger().information("All worker threads active flows: %" PRIu64 " (IPv4 flows: %" PRIu64 ", IPv6 flows: %" PRIu64 ")", active_flows, ndpi_ipv4_flows_count, ndpi_ipv6_flows_count); + app.logger().information("All worker threads active flows: %" PRIu64 " (IPv4 flows: %" PRIu64 ", IPv6 flows: %" PRIu64 "), deletet flows: %" PRIu64 , active_flows, ndpi_ipv4_flows_count, ndpi_ipv6_flows_count, deleted_flows); if(!_statisticsFile.empty()) { std::string worker_name("allworkers"); diff --git a/src/worker.cpp b/src/worker.cpp index 819a973..e2cb612 100644 --- a/src/worker.cpp +++ b/src/worker.cpp @@ -28,17 +28,24 @@ #define tcphdr(x) ((struct tcphdr *)(x)) +inline u_int8_t ext_dpi_v6_addresses_equal(uint64_t *x, uint64_t *y) +{ + if(*x == *y && *(x+1) == *(y+1)) + return 1; + return 0; +} + void host_cb(dpi_http_message_informations_t* http_informations, const u_char* app_data, u_int32_t data_length, dpi_pkt_infos_t* pkt, void** flow_specific_user_data, void* user_data) { - if(*flow_specific_user_data != NULL && data_length > 0) + if(*flow_specific_user_data != NULL && data_length > 0 && (http_informations->method_or_code == DPI_HTTP_POST || http_informations->method_or_code == DPI_HTTP_GET || http_informations->method_or_code == DPI_HTTP_HEAD)) { struct dpi_flow_info *u = (struct dpi_flow_info *)*flow_specific_user_data; - if(u->host == NULL) - { - u->host = (char *)calloc(1, data_length+1); - memcpy(u->host, app_data, data_length); - u->host_size = data_length; - } + WorkerThread *obj = (WorkerThread *) user_data; + std::string &uri = obj->getUri(); + uri.assign("http://", 7); + uri.append((char *)app_data, data_length); + uri.append(u->url, u->url_size); + obj->setNeedBlock(obj->checkHTTP(uri, pkt)); } } @@ -54,63 +61,484 @@ void url_cb(const unsigned char* url, u_int32_t url_length, dpi_pkt_infos_t* pkt } } -void header_cb(dpi_http_message_informations_t* m, dpi_pkt_infos_t* pkt_informations, void** flow_specific_user_data, void* user_data) +void ssl_cert_cb(char *certificate, int size, void *user_data, dpi_pkt_infos_t *pkt) { - if(user_data != NULL && *flow_specific_user_data != NULL && (m->method_or_code == DPI_HTTP_GET || m->method_or_code == DPI_HTTP_PUT || m->method_or_code == DPI_HTTP_HEAD)) - { - struct dpi_flow_info *u = (struct dpi_flow_info *)*flow_specific_user_data; - if(u->url != NULL && u->host != NULL) - { - std::string *uri = (std::string *)user_data; - uri->assign("http://"); - uri->append(u->host, u->host_size); - uri->append(u->url, u->url_size); - } - } -} - -void ssl_cert_cb(char *certificate, int size, void *user_data) -{ - std::string *cert=(std::string *)user_data; - cert->assign(certificate, size > 255 ? 255 : size); + WorkerThread *obj = (WorkerThread *) user_data; + std::string &cert = obj->getCert(); + cert.assign(certificate, size > 255 ? 255 : size); + obj->setNeedBlock(obj->checkSSL(cert, pkt)); } -WorkerThread::WorkerThread(const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, int socketid) : +WorkerThread::WorkerThread(const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, int socketid, flowHash *fh, struct ESender::nparams &sp, struct rte_mempool *mp) : m_WorkerConfig(workerConfig), m_Stop(true), _logger(Poco::Logger::get(name)), dpi_state(state), - _name(name) + _name(name), + m_FlowHash(fh), + _n_send_pkts(0) { uri.reserve(URI_RESERVATION_SIZE); certificate.reserve(CERT_RESERVATION_SIZE); + + // setup peafowl static dpi_http_header_field_callback* single_cb[1]={&host_cb}; static const char* headers[1]={"host"}; - - static dpi_http_callbacks_t callback={.header_url_callback = url_cb, .header_names = headers, .num_header_types = 1, .header_types_callbacks = single_cb, .header_completion_callback = header_cb, .http_body_callback = 0}; - dpi_http_activate_callbacks(dpi_state, &callback, &uri); + static dpi_http_callbacks_t callback={.header_url_callback = url_cb, .header_names = headers, .num_header_types = 1, .header_types_callbacks = single_cb, .header_completion_callback = 0, .http_body_callback = 0}; + dpi_http_activate_callbacks(dpi_state, &callback, this); static dpi_ssl_callbacks_t ssl_callback = {.certificate_callback = ssl_cert_cb }; - dpi_ssl_activate_callbacks(state, &ssl_callback, &certificate); + dpi_ssl_activate_callbacks(state, &ssl_callback, this); + + // setup hash + std::string mem_name("IPv4Flows_"+name); + ipv4_flows = (struct ext_dpi_flow_info **)rte_zmalloc_socket(mem_name.c_str(), fh->getHashSizeIPv4()*sizeof(struct ext_dpi_flow_info *), RTE_CACHE_LINE_SIZE, socketid); + if(ipv4_flows == nullptr) + { + _logger.fatal("Not enough memory for ipv4 flows"); + throw Poco::Exception("Not enough memory for ipv4 flows"); + } + mem_name.assign("IPv6Flows_"+name); + ipv6_flows = (struct ext_dpi_flow_info **)rte_zmalloc_socket(mem_name.c_str(), fh->getHashSizeIPv6()*sizeof(struct ext_dpi_flow_info *), RTE_CACHE_LINE_SIZE, socketid); + if(ipv6_flows == nullptr) + { + _logger.fatal("Not enough memory for ipv6 flows"); + throw Poco::Exception("Not enough memory for ipv6 flows"); + } + _logger.debug("Allocating %d bytes for flow pool", (int) ((fh->getHashSizeIPv4() + fh->getHashSizeIPv6())*sizeof(struct ext_dpi_flow_info))); + std::string mempool_name("flows_pool_" + name); + flows_pool = rte_mempool_create(mempool_name.c_str(), (fh->getHashSizeIPv4() + fh->getHashSizeIPv6()), sizeof(struct ext_dpi_flow_info), 0, 0, NULL, NULL, NULL, NULL, socketid, 0); + if(flows_pool == nullptr) + { + _logger.fatal("Not enough memory for flows pool. Tried to allocate %d bytes", (int) ((fh->getHashSizeIPv4() + fh->getHashSizeIPv6())*sizeof(struct ext_dpi_flow_info))); + throw Poco::Exception("Not enough memory for flows pool"); + } + if(mp != nullptr) + { + // setup sender + _snd = new ESender(sp, m_WorkerConfig.sender_port, mp, this); + } else { + _snd = nullptr; + } } WorkerThread::~WorkerThread() { dpi_terminate(dpi_state); + delete m_WorkerConfig.atmLock; + delete m_WorkerConfig.atmSSLDomainsLock; + if(_snd != nullptr) + delete _snd; } const ThreadStats& WorkerThread::getStats() { - struct flow_table_stat stat; - get_flow_stat_v4(dpi_state->db4, &stat); - m_ThreadStats.ndpi_flows_count = stat.active_flows; - m_ThreadStats.ndpi_ipv4_flows_count = stat.active_flows; - m_ThreadStats.max_ipv4_flows = stat.max_active_flows; - get_flow_stat_v6(dpi_state->db6, &stat); - m_ThreadStats.ndpi_ipv6_flows_count = stat.active_flows; - m_ThreadStats.max_ipv6_flows = stat.max_active_flows; - m_ThreadStats.ndpi_flows_count += stat.active_flows; return m_ThreadStats; } +bool WorkerThread::checkSSL(std::string &certificate, dpi_pkt_infos_t *pkt) +{ + struct ipv4_hdr *ipv4_header = (struct ipv4_hdr *) pkt->pkt; + struct ipv6_hdr *ipv6_header = (struct ipv6_hdr *) pkt->pkt; + struct tcphdr* tcph; + tcph = (struct tcphdr *)((uint8_t *) pkt->pkt + (pkt->ip_version == 4 ? sizeof(struct ipv4_hdr) : sizeof(struct ipv6_hdr))); + + if(m_WorkerConfig.atmSSLDomains != nullptr) + { + if(!m_WorkerConfig.atmSSLDomainsLock->tryLock()) + return false; + if(m_WorkerConfig.lower_host) + std::transform(certificate.begin(), certificate.end(), certificate.begin(), ::tolower); + AhoCorasickPlus::Match match; + std::size_t host_len=certificate.length(); + bool found=false; + m_WorkerConfig.atmSSLDomains->search(certificate, false); + while(m_WorkerConfig.atmSSLDomains->findNext(match) && !found) + { + if(match.pattern.ptext.length != host_len) + { + bool exact_match=match.id & 0x01; + if(exact_match) + continue; + if(certificate[host_len-match.pattern.ptext.length-1] != '.') + continue; + } + found=true; + } + m_WorkerConfig.atmSSLDomainsLock->unlock(); + if(found) + { + m_ThreadStats.matched_ssl++; + if(likely(_snd != nullptr)) + { + _snd->SendRST(pkt->srcport, pkt->dstport, pkt->ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, pkt->ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, pkt->ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0); + } else { + SenderTask::queue.enqueueNotification(new RedirectNotificationG(pkt->srcport, pkt->dstport, pkt->ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, pkt->ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, pkt->ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0, nullptr, true)); + } + m_ThreadStats.sended_rst++; + return true; + } + } + return false; +} + +bool WorkerThread::checkHTTP(std::string &uri, dpi_pkt_infos_t *pkt) +{ + struct ipv4_hdr *ipv4_header = (struct ipv4_hdr *) pkt->pkt; + struct ipv6_hdr *ipv6_header = (struct ipv6_hdr *) pkt->pkt; + struct tcphdr* tcph; + tcph = (struct tcphdr *)((uint8_t *) pkt->pkt + (pkt->ip_version == 4 ? sizeof(struct ipv4_hdr) : sizeof(struct ipv6_hdr))); + if(m_WorkerConfig.atm != nullptr) + { + if(m_WorkerConfig.atmLock->tryLock()) + { + if(m_WorkerConfig.url_normalization) + { + try + { + Poco::URI uri_p(uri); + uri_p.normalize(); + uri.assign(uri_p.toString()); + } catch (Poco::SyntaxException &ex) + { + _logger.debug("An SyntaxException occured: '%s' on URI: '%s'", ex.displayText(), uri); + } + } + if(m_WorkerConfig.remove_dot || (!m_WorkerConfig.url_normalization && m_WorkerConfig.lower_host)) + { + // remove dot after domain... + size_t f_slash_pos=uri.find('/',10); + if(!m_WorkerConfig.url_normalization && m_WorkerConfig.lower_host && f_slash_pos != std::string::npos) + { + std::transform(uri.begin()+7, uri.begin()+f_slash_pos, uri.begin()+7, ::tolower); + } + if(m_WorkerConfig.remove_dot && f_slash_pos != std::string::npos) + { + if(uri[f_slash_pos-1] == '.') + uri.erase(f_slash_pos-1,1); + } + } + AhoCorasickPlus::Match match; + bool found=false; + size_t uri_length=uri.length() - 7; + char const *uri_ptr=uri.c_str() + 7; + m_WorkerConfig.atm->search((char *)uri_ptr, uri_length, false); + while(m_WorkerConfig.atm->findNext(match) && !found) + { + if(match.pattern.ptext.length != uri_length) + { + int r=match.position-match.pattern.ptext.length; + if(((match.id & 0x02) >> 1) == E_TYPE_DOMAIN) + { + if(r > 0) + { + if(match.id & 0x01) + continue; + if(*(uri_ptr+r-1) != '.') + continue; + } + } else if(((match.id & 0x02) >> 1) == E_TYPE_URL) + { + if(m_WorkerConfig.match_url_exactly) + continue; + if(r > 0) + { + if(*(uri_ptr+r-1) != '.') + continue; + } + } + } + found=true; + } + m_WorkerConfig.atmLock->unlock(); + if(found) + { + if(((match.id & 0x02) >> 1) == E_TYPE_DOMAIN) // block by domain... + { + m_ThreadStats.matched_domains++; + if(m_WorkerConfig.http_redirect) + { + std::string add_param; + switch (m_WorkerConfig.add_p_type) + { + case A_TYPE_ID: add_param="id="+std::to_string(match.id >> 2); + break; + case A_TYPE_URL: add_param="url="+uri; + break; + default: break; + } + if(likely(_snd != nullptr)) + { + _snd->Redirect(pkt->srcport, pkt->dstport, pkt->ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, pkt->ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, pkt->ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), 1, add_param.empty() ? nullptr : (char *)add_param.c_str()); + } else { + SenderTask::queue.enqueueNotification(new RedirectNotificationG(pkt->srcport, pkt->dstport, pkt->ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, pkt->ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, pkt->ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), 1, add_param.empty() ? nullptr : (char *)add_param.c_str())); + } + m_ThreadStats.redirected_domains++; + } else { + if(likely(_snd != nullptr)) + { + _snd->SendRST(pkt->srcport, pkt->dstport, pkt->ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, pkt->ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, pkt->ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0); + } else { + SenderTask::queue.enqueueNotification(new RedirectNotificationG(pkt->srcport, pkt->dstport, pkt->ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, pkt->ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, pkt->ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0, nullptr, true)); + } + m_ThreadStats.sended_rst++; + } + return true; + } else if(((match.id & 0x02) >> 1) == E_TYPE_URL) // block by url... + { + m_ThreadStats.matched_urls++; + if(m_WorkerConfig.http_redirect) + { + std::string add_param; + switch (m_WorkerConfig.add_p_type) + { + case A_TYPE_ID: add_param="id="+std::to_string(match.id >> 2); + break; + case A_TYPE_URL: add_param="url="+uri; + break; + default: break; + } + if(likely(_snd != nullptr)) + { + _snd->Redirect(pkt->srcport, pkt->dstport, pkt->ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, pkt->ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, pkt->ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), 1, add_param.empty() ? nullptr : (char *)add_param.c_str()); + } else { + SenderTask::queue.enqueueNotification(new RedirectNotificationG(pkt->srcport, pkt->dstport, pkt->ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, pkt->ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, pkt->ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), 1, add_param.empty() ? nullptr : (char *)add_param.c_str())); + } + m_ThreadStats.redirected_urls++; + } else { + if(likely(_snd != nullptr)) + { + _snd->SendRST(pkt->srcport, pkt->dstport, pkt->ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, pkt->ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, pkt->ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0); + } else { + SenderTask::queue.enqueueNotification(new RedirectNotificationG(pkt->srcport, pkt->dstport, pkt->ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, pkt->ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, pkt->ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0, nullptr, true)); + } + m_ThreadStats.sended_rst++; + } + return true; + } + } + } + } + return false; +} + + + +dpi_identification_result_t WorkerThread::identifyAppProtocol(const unsigned char* pkt, u_int32_t length, u_int32_t current_time, uint8_t *host_key, uint32_t sig) +{ + dpi_identification_result_t r; + r.status = DPI_STATUS_OK; + dpi_pkt_infos_t infos = { 0 }; + u_int8_t l3_status; + + r.status = dpi_parse_L3_L4_headers(dpi_state, pkt, length, &infos, current_time); + + if(unlikely(r.status==DPI_STATUS_IP_FRAGMENT || r.status<0)) + { + return r; + } + + if(infos.l4prot != IPPROTO_TCP && infos.l4prot != IPPROTO_UDP) + { + r.status=DPI_ERROR_TRANSPORT_PROTOCOL_NOTSUPPORTED; + return r; + } + + l3_status = r.status; + r.status = DPI_STATUS_OK; + /** + * We return the status of dpi_stateful_get_app_protocol call, + * without giving informations on status returned + * by dpi_parse_L3_L4_headers. Basically we return the status which + * provides more informations. + */ + r = getAppProtocol(host_key, current_time, sig, &infos); + + if(l3_status == DPI_STATUS_IP_LAST_FRAGMENT) + { + free((unsigned char*) infos.pkt); + } + + return r; +} + + + +dpi_identification_result_t WorkerThread::getAppProtocol(uint8_t *host_key, uint64_t timestamp, uint32_t sig, dpi_pkt_infos_t *pkt_infos) +{ + dpi_identification_result_t r; + r.status = DPI_STATUS_OK; + + dpi_flow_infos_t* flow_infos=NULL; + + int32_t hash_idx = 0; + + ext_dpi_flow_info *fi = getFlow(host_key, timestamp, &hash_idx, sig, pkt_infos); + + if(unlikely(fi==NULL)) + { + r.status=DPI_ERROR_MAX_FLOWS; + return r; + } + + flow_infos = &(fi->infos); + + r = dpi_stateless_get_app_protocol(dpi_state, flow_infos, pkt_infos); + + if(r.status == DPI_STATUS_TCP_CONNECTION_TERMINATED) + { + if(pkt_infos->ip_version == 4) + { + int32_t delr=rte_hash_del_key(m_FlowHash->getIPv4Hash(), host_key); + if(delr < 0) + { + _logger.error("Error (%d) occured while delete data from the ipv4 flow hash table", (int)delr); + } else { + ipv4_flows[hash_idx]->free_mem(dpi_state->flow_cleaner_callback); + rte_mempool_put(flows_pool, ipv4_flows[hash_idx]); + ipv4_flows[hash_idx] = nullptr; + m_ThreadStats.ndpi_flows_count--; + m_ThreadStats.ndpi_ipv4_flows_count--; + m_ThreadStats.ndpi_flows_deleted++; + } + } else { + int32_t delr=rte_hash_del_key(m_FlowHash->getIPv6Hash(), host_key); + if(delr < 0) + { + _logger.error("Error (%d) occured while delete data from the ipv6 flow hash table", (int)delr); + } else { + ipv6_flows[hash_idx]->free_mem(dpi_state->flow_cleaner_callback); + rte_mempool_put(flows_pool,ipv6_flows[hash_idx]); + ipv6_flows[hash_idx] = nullptr; + m_ThreadStats.ndpi_flows_count--; + m_ThreadStats.ndpi_ipv6_flows_count--; + m_ThreadStats.ndpi_flows_deleted++; + } + } + } + return r; +} + + + +ext_dpi_flow_info *WorkerThread::getFlow(uint8_t *host_key, uint64_t timestamp, int32_t *idx, uint32_t sig, dpi_pkt_infos_t *pkt_infos) +{ + if(pkt_infos->ip_version == 6) + { + int32_t ret = rte_hash_lookup_with_hash(m_FlowHash->getIPv6Hash(), host_key, sig); + if(ret >= 0) + { + *idx = ret; + if(ext_dpi_v6_addresses_equal((uint64_t *)&(ipv6_flows[ret]->src_addr_t.ipv6_srcaddr),(uint64_t *) &pkt_infos->src_addr_t.ipv6_srcaddr) && ipv6_flows[ret]->srcport == pkt_infos->srcport) + pkt_infos->direction=0; + else + pkt_infos->direction=1; + ipv6_flows[ret]->last_timestamp = timestamp; + return ipv6_flows[ret]; + } + if(ret == -EINVAL) + { + _logger.error("Bad parameter in ipv6 hash lookup"); + return NULL; + } + if(ret == -ENOENT) + { + struct ext_dpi_flow_info *newflow; + if(rte_mempool_get(flows_pool, (void **)&newflow) != 0) + { + _logger.fatal("Not enough memory for the flow in the flows_pool"); + return NULL; + } + memset(newflow, 0, sizeof(struct ext_dpi_flow_info)); + newflow->last_timestamp = timestamp; + rte_memcpy(&newflow->src_addr_t.ipv6_srcaddr, &pkt_infos->src_addr_t.ipv6_srcaddr, IPV6_ADDR_LEN * 2); + newflow->srcport=pkt_infos->srcport; + newflow->dstport=pkt_infos->dstport; + newflow->l4prot=pkt_infos->l4prot; + + dpi_init_flow_infos(dpi_state, &(newflow->infos), pkt_infos->l4prot); + + pkt_infos->direction = 0; + ret = rte_hash_add_key_with_hash(m_FlowHash->getIPv6Hash(), host_key, sig); + if(ret == -EINVAL) + { + rte_mempool_put(flows_pool,newflow); + _logger.fatal("Bad parameters in hash add"); + return NULL; + } + if(ret == -ENOSPC) + { + rte_mempool_put(flows_pool,newflow); + _logger.fatal("There is no space in the ipv6 flow hash"); + return NULL; + } + ipv6_flows[ret] = newflow; + *idx = ret; + m_ThreadStats.ndpi_ipv6_flows_count++; + m_ThreadStats.ndpi_flows_count++; + return newflow; + } + return NULL; + } + if(pkt_infos->ip_version == 4) + { + int32_t ret = rte_hash_lookup_with_hash(m_FlowHash->getIPv4Hash(), host_key, sig); + if(ret >= 0) + { + *idx = ret; + if(ipv4_flows[ret]->src_addr_t.ipv4_srcaddr == pkt_infos->src_addr_t.ipv4_srcaddr && ipv4_flows[ret]->srcport == pkt_infos->srcport) + pkt_infos->direction=0; + else + pkt_infos->direction=1; + ipv4_flows[ret]->last_timestamp = timestamp; + return ipv4_flows[ret]; + } + if(ret == -EINVAL) + { + _logger.error("Bad parameter in ipv4 hash lookup"); + return NULL; + } + if(ret == -ENOENT) + { + struct ext_dpi_flow_info *newflow; + if(rte_mempool_get(flows_pool, (void **)&newflow) != 0) + { + _logger.fatal("Not enough memory for the flow in the flows_pool"); + return NULL; + } + memset(newflow, 0, sizeof(struct ext_dpi_flow_info)); + newflow->last_timestamp = timestamp; + + newflow->src_addr_t.ipv4_srcaddr = pkt_infos->src_addr_t.ipv4_srcaddr; + newflow->dst_addr_t.ipv4_dstaddr = pkt_infos->dst_addr_t.ipv4_dstaddr; + newflow->srcport=pkt_infos->srcport; + newflow->dstport=pkt_infos->dstport; + newflow->l4prot=pkt_infos->l4prot; + + dpi_init_flow_infos(dpi_state, &(newflow->infos), pkt_infos->l4prot); + + pkt_infos->direction = 0; + ret = rte_hash_add_key_with_hash(m_FlowHash->getIPv4Hash(), host_key, sig); + if(ret == -EINVAL) + { + rte_mempool_put(flows_pool,newflow); + _logger.fatal("Bad parameters in hash add"); + return NULL; + } + if(ret == -ENOSPC) + { + rte_mempool_put(flows_pool,newflow); + _logger.fatal("There is no space in the ipv4 flow hash"); + return NULL; + } + ipv4_flows[ret] = newflow; + *idx = ret; + m_ThreadStats.ndpi_ipv4_flows_count++; + m_ThreadStats.ndpi_flows_count++; + return newflow; + } + return NULL; + } + return NULL; +} bool WorkerThread::analyzePacket(struct rte_mbuf* m, uint64_t timestamp) { @@ -121,6 +549,7 @@ bool WorkerThread::analyzePacket(struct rte_mbuf* m, uint64_t timestamp) struct ipv6_hdr *ipv6_header=nullptr; int size=rte_pktmbuf_pkt_len(m); + _need_block = false; int ip_version=0; uint32_t ip_len; int iphlen=0; @@ -205,11 +634,11 @@ bool WorkerThread::analyzePacket(struct rte_mbuf* m, uint64_t timestamp) m_ThreadStats.analyzed_packets++; - int tcp_src_port=rte_be_to_cpu_16(tcph->source); - int tcp_dst_port=rte_be_to_cpu_16(tcph->dest); + uint16_t tcp_src_port = tcph->source; + uint16_t tcp_dst_port = tcph->dest; uint32_t acl_action = pkt_info->acl_res & ACL_POLICY_MASK; - if(acl_action == ACL::ACL_DROP) + if(payload_len > 0 && acl_action == ACL::ACL_DROP) { m_ThreadStats.matched_ip_port++; SenderTask::queue.enqueueNotification(new RedirectNotificationG(tcp_src_port, tcp_dst_port, ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0, nullptr, true)); @@ -220,205 +649,31 @@ bool WorkerThread::analyzePacket(struct rte_mbuf* m, uint64_t timestamp) dpi_identification_result_t r; uri.clear(); - certificate.clear(); -// r = dpi_stateful_identify_application_protocol_new(dpi_state, l3, ip_len, timestamp, m->hash.usr); - r = dpi_stateful_identify_application_protocol_new(dpi_state, l3, ip_len, timestamp, m->hash.rss); + + r = identifyAppProtocol(l3, ip_len, timestamp, (uint8_t *)&((struct packet_info *)m->userdata)->keys, m->hash.usr); + + if(_need_block) + return true; + + if(payload_len == 0) + return false; if(r.protocol.l7prot == DPI_PROTOCOL_TCP_SSL) { - if(m_WorkerConfig.atmSSLDomains) + if(m_WorkerConfig.block_ssl_no_sni) { - if(!certificate.empty()) + if(acl_action == ACL::ACL_SSL && payload_len > 0) { - // если не можем выставить lock, то нет смысла продолжать... - if(!m_WorkerConfig.atmSSLDomainsLock.tryLock()) - return false; - if(m_WorkerConfig.lower_host) - std::transform(certificate.begin(), certificate.end(), certificate.begin(), ::tolower); - AhoCorasickPlus::Match match; - std::size_t host_len=certificate.length(); - bool found=false; - { - m_WorkerConfig.atmSSLDomains->search(certificate,false); - while(m_WorkerConfig.atmSSLDomains->findNext(match) && !found) - { - if(match.pattern.ptext.length != host_len) - { - DomainsMatchType::Iterator it=m_WorkerConfig.SSLdomainsMatchType->find(match.id); - bool exact_match=false; - if(it != m_WorkerConfig.SSLdomainsMatchType->end()) - exact_match = it->second; - if(exact_match) - continue; - if(certificate[host_len-match.pattern.ptext.length-1] != '.') - continue; - } - found=true; - } - } - m_WorkerConfig.atmSSLDomainsLock.unlock(); -#ifdef DEBUG_TIME - sw.stop(); - _logger.debug("SSL Host seek occupied %ld us, host: %s",sw.elapsed(),certificate); -#endif - if(found) - { - m_ThreadStats.matched_ssl++; -#ifdef _DEBUG_WORKER - _logger.debug("SSL host %s present in SSL domain (file line %u) list from ip %s:%d to ip %s:%d", certificate, match.id, src_ip->toString(),tcp_src_port,dst_ip->toString(),tcp_dst_port); -#endif - SenderTask::queue.enqueueNotification(new RedirectNotificationG(tcp_src_port, tcp_dst_port, ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0, nullptr, true)); - m_ThreadStats.sended_rst++; -// flow_info->block=true; - return true; - } else { - return false; - } - } else if(m_WorkerConfig.block_undetected_ssl) - { - if(acl_action == ACL::ACL_SSL) - { - m_ThreadStats.matched_ssl_ip++; -#ifdef _DEBUG_WORKER - _logger.debug("Blocking/Marking SSL client hello packet from %s:%d to %s:%d", src_ip->toString(),tcp_src_port,dst_ip->toString(),tcp_dst_port); -#endif - m_ThreadStats.sended_rst++; - SenderTask::queue.enqueueNotification(new RedirectNotificationG(tcp_src_port, tcp_dst_port, ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0, nullptr, true)); -// flow_info->block=true; - return true; - } + m_ThreadStats.matched_ssl_ip++; + m_ThreadStats.sended_rst++; + SenderTask::queue.enqueueNotification(new RedirectNotificationG(tcp_src_port, tcp_dst_port, ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0, nullptr, true)); + return true; } } - return false; } - if(r.protocol.l7prot == DPI_PROTOCOL_TCP_HTTP && !uri.empty()) { - if(m_WorkerConfig.atm) - { - if(m_WorkerConfig.atmLock.tryLock()) - { - std::string orig_uri(uri); - if(m_WorkerConfig.url_normalization) - { - try - { - Poco::URI uri_p(uri); - uri_p.normalize(); - uri.assign(uri_p.toString()); - } catch (Poco::SyntaxException &ex) - { - _logger.debug("An SyntaxException occured: '%s' on URI: '%s'", ex.displayText(), uri); - } - } - if(m_WorkerConfig.remove_dot || (!m_WorkerConfig.url_normalization && m_WorkerConfig.lower_host)) - { - // remove dot after domain... - size_t f_slash_pos=uri.find('/',10); - if(!m_WorkerConfig.url_normalization && m_WorkerConfig.lower_host && f_slash_pos != std::string::npos) - { - std::transform(uri.begin()+7, uri.begin()+f_slash_pos, uri.begin()+7, ::tolower); - } - if(m_WorkerConfig.remove_dot && f_slash_pos != std::string::npos) - { - if(uri[f_slash_pos-1] == '.') - uri.erase(f_slash_pos-1,1); - } - } - AhoCorasickPlus::Match match; - bool found=false; - size_t uri_length=uri.length() - 7; - char const *uri_ptr=uri.c_str() + 7; - m_WorkerConfig.atm->search((char *)uri_ptr, uri_length, false); // skip http:// - EntriesData::Iterator it; - while(m_WorkerConfig.atm->findNext(match) && !found) - { - it=m_WorkerConfig.entriesData->find(match.id); - if(match.pattern.ptext.length != uri_length) - { - int r=match.position-match.pattern.ptext.length; - if(it->second.type == E_TYPE_DOMAIN) - { - if(r > 0) - { - if(it->second.match_exactly) - continue; - if(*(uri_ptr+r-1) != '.') - continue; - } - } else if(it->second.type == E_TYPE_URL) - { - if(m_WorkerConfig.match_url_exactly) - continue; - if(r > 0) - { - if(*(uri_ptr+r-1) != '.') - continue; - } - } - } - found=true; - } - m_WorkerConfig.atmLock.unlock(); - if(found) - { - if(it->second.type == E_TYPE_DOMAIN) // block by domain... - { - m_ThreadStats.matched_domains++; -// _logger.debug("Host %s present in domain (file line %u) list from ip %s to ip %s", host, match.id, src_ip->toString(), dst_ip->toString()); - if(m_WorkerConfig.http_redirect) - { - std::string add_param; - switch (m_WorkerConfig.add_p_type) - { - case A_TYPE_ID: add_param="id="+std::to_string(it->second.lineno); - break; - case A_TYPE_URL: add_param="url="+uri; - break; - default: break; - } - - SenderTask::queue.enqueueNotification(new RedirectNotificationG(tcp_src_port, tcp_dst_port, ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+payload_len), 1, add_param.empty() ? nullptr : (char *)add_param.c_str())); - -// SenderTask::queue.enqueueNotification(new RedirectNotification(tcp_src_port, tcp_dst_port, src_ip.get(), dst_ip.get(), /*acknum*/ tcph->ack_seq, /*seqnum*/ rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+payload_len),/* flag psh */ 1, add_param)); - m_ThreadStats.redirected_domains++; - } else { - SenderTask::queue.enqueueNotification(new RedirectNotificationG(tcp_src_port, tcp_dst_port, ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0, nullptr, true)); - m_ThreadStats.sended_rst++; - } - return true; - } else if(it->second.type == E_TYPE_URL) // block by url... - { - m_ThreadStats.matched_urls++; -// _logger.debug("URL %s present in url (file pos %u) list from ip %s to ip %s", uri, match.id, src_ip->toString(), dst_ip->toString()); - if(m_WorkerConfig.http_redirect) - { - std::string add_param; - switch (m_WorkerConfig.add_p_type) - { - case A_TYPE_ID: add_param="id="+std::to_string(it->second.lineno); - break; - case A_TYPE_URL: add_param="url="+uri; - break; - default: break; - } - - SenderTask::queue.enqueueNotification(new RedirectNotificationG(tcp_src_port, tcp_dst_port, ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+payload_len), 1, add_param.empty() ? nullptr : (char *)add_param.c_str())); - -// SenderTask::queue.enqueueNotification(new RedirectNotification(tcp_src_port, tcp_dst_port, src_ip.get(), dst_ip.get(), /*acknum*/ tcph->ack_seq, /*seqnum*/ rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+payload_len),/* flag psh */ 1, add_param)); - m_ThreadStats.redirected_urls++; - } else { - SenderTask::queue.enqueueNotification(new RedirectNotificationG(tcp_src_port, tcp_dst_port, ip_version == 4 ? (void *)&ipv4_header->src_addr : (void *)&ipv6_header->src_addr, ip_version == 4 ? (void *)&ipv4_header->dst_addr : (void *)&ipv6_header->dst_addr, ip_version, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0, nullptr, true)); -// SenderTask::queue.enqueueNotification(new RedirectNotification(tcp_src_port, tcp_dst_port,src_ip.get(), dst_ip.get(), /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq, 0, empty_str, true)); - m_ThreadStats.sended_rst++; - } -// flow_info->block=true; - return true; - } - } - } - } if(ip_version == 4 && m_WorkerConfig.nm && m_WorkerConfig.notify_enabled && acl_action == ACL::ACL_NOTIFY) { uint32_t notify_group = (pkt_info->acl_res & ACL_NOTIFY_GROUP) >> 4; @@ -500,6 +755,7 @@ bool WorkerThread::run(uint32_t coreId) struct lcore_conf* qconf; uint16_t nb_rx; struct rte_mbuf *bufs[EXTFILTER_CAPTURE_BURST_SIZE]; + int tx_ret; lcore_id = rte_lcore_id(); qconf = extFilter::getLcoreConf(lcore_id); @@ -516,10 +772,27 @@ bool WorkerThread::run(uint32_t coreId) const uint64_t timer_interval = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * (1000*1000); + const uint64_t gc_int_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * EXTF_GC_INTERVAL; + + int gc_budget_ipv4 = ((double)m_FlowHash->getHashSizeIPv4()/(EXTF_ALL_GC_INTERVAL*1000*1000))*EXTF_GC_INTERVAL; + + int gc_budget_ipv6 = ((double)m_FlowHash->getHashSizeIPv6()/(EXTF_ALL_GC_INTERVAL*1000*1000))*EXTF_GC_INTERVAL; + + _logger.information("gc_budget_ipv4: %d, gc_budget_ipv6: %d", gc_budget_ipv4, gc_budget_ipv6); + + _logger.information("Running gc clean every %" PRIu64 " cycles. Cycles per second %" PRIu64, gc_int_tsc, rte_get_timer_hz()); + uint64_t last_sec = 0; - uint64_t cur_tsc, diff_timer_tsc; + uint64_t cur_tsc, diff_timer_tsc, diff_gc_tsc; uint64_t prev_timer_tsc = 0; + uint64_t prev_gc_tsc=0; + + uint32_t iter_flows_ipv4 = 0; + uint32_t iter_flows_ipv6 = 0; + + uint8_t sender_port = m_WorkerConfig.sender_port; + uint16_t tx_queue_id = m_WorkerConfig.tx_queue_id; _logger.debug("Starting working thread on core %u", coreId); @@ -538,7 +811,6 @@ bool WorkerThread::run(uint32_t coreId) break; cur_tsc = rte_rdtsc(); - last_time = cur_tsc; #ifdef ATOMIC_ACL #define SWAP_ACX(cur_acx, new_acx) \ @@ -625,8 +897,86 @@ bool WorkerThread::run(uint32_t coreId) m_ThreadStats.latency_counters.total_cycles += cycles; m_ThreadStats.latency_counters.blocked_cycles += blocked_cycles; m_ThreadStats.latency_counters.total_pkts += nb_rx; + if(_n_send_pkts > 0) + { + tx_ret = rte_eth_tx_burst(sender_port, tx_queue_id, _sender_buf, _n_send_pkts); + if (unlikely(tx_ret < _n_send_pkts)) + { + do { + rte_pktmbuf_free(_sender_buf[tx_ret]); + } while (++tx_ret < _n_send_pkts); + } + _n_send_pkts = 0; + } } + diff_gc_tsc = cur_tsc - prev_gc_tsc; + if (unlikely(diff_gc_tsc >= gc_int_tsc)) + { + int z=0; + while(z < gc_budget_ipv4 && iter_flows_ipv4 < m_FlowHash->getHashSizeIPv4()) + { + if(ipv4_flows[iter_flows_ipv4] && (last_sec - (ipv4_flows[iter_flows_ipv4]->last_timestamp) > EXT_DPI_FLOW_TABLE_MAX_IDLE_TIME)) + { + void *key_ptr; + int fr=rte_hash_get_key_with_position(m_FlowHash->getIPv4Hash(), iter_flows_ipv4, &key_ptr); + if(fr < 0) + { + _logger.error("Key not found in the hash for the position %d", (int) iter_flows_ipv4); + } else { + int32_t delr=rte_hash_del_key(m_FlowHash->getIPv4Hash(), key_ptr); + if(delr < 0) + { + _logger.error("Error (%d) occured while delete data from the ipv4 flow hash table", (int)delr); + } else { + ipv4_flows[iter_flows_ipv4]->free_mem(dpi_state->flow_cleaner_callback); + rte_mempool_put(flows_pool, ipv4_flows[iter_flows_ipv4]); + ipv4_flows[iter_flows_ipv4] = nullptr; + m_ThreadStats.ndpi_flows_count--; + m_ThreadStats.ndpi_ipv4_flows_count--; + m_ThreadStats.ndpi_flows_deleted++; + m_ThreadStats.ndpi_flows_expired++; + } + } + } + z++; + iter_flows_ipv4++; + } + if(iter_flows_ipv4 >= m_FlowHash->getHashSizeIPv4()) + iter_flows_ipv4 = 0; + z=0; + while(z < gc_budget_ipv6 && iter_flows_ipv6 < m_FlowHash->getHashSizeIPv6()) + { + if(ipv6_flows[iter_flows_ipv6] && ((last_sec - ipv6_flows[iter_flows_ipv6]->last_timestamp) > EXT_DPI_FLOW_TABLE_MAX_IDLE_TIME)) + { + void *key_ptr; + int fr=rte_hash_get_key_with_position(m_FlowHash->getIPv6Hash(), iter_flows_ipv6, &key_ptr); + if(fr < 0) + { + _logger.error("Key not found in the hash for the position %d", (int) iter_flows_ipv6); + } else { + int32_t delr=rte_hash_del_key(m_FlowHash->getIPv6Hash(), key_ptr); + if(delr < 0) + { + _logger.error("Error (%d) occured while delete data from the ipv6 flow hash table", (int)delr); + } else { + ipv6_flows[iter_flows_ipv6]->free_mem(dpi_state->flow_cleaner_callback); + rte_mempool_put(flows_pool,ipv6_flows[iter_flows_ipv6]); + ipv6_flows[iter_flows_ipv6] = nullptr; + m_ThreadStats.ndpi_flows_count--; + m_ThreadStats.ndpi_ipv6_flows_count--; + m_ThreadStats.ndpi_flows_deleted++; + m_ThreadStats.ndpi_flows_expired++; + } + } + } + z++; + iter_flows_ipv6++; + } + if(iter_flows_ipv6 >= m_FlowHash->getHashSizeIPv6()) + iter_flows_ipv6 = 0; + prev_gc_tsc = cur_tsc; + } diff_timer_tsc = cur_tsc - prev_timer_tsc; if (unlikely(diff_timer_tsc >= timer_interval)) {