From cb95e36f799d2fc81b73239aece178ef0a3e3bf3 Mon Sep 17 00:00:00 2001 From: max197616 Date: Thu, 21 Jun 2018 12:33:07 +0300 Subject: [PATCH] Version 0.99a --- configure.ac | 2 +- include/Makefile.am | 2 +- include/acl.h | 5 +- include/bworker.h | 14 +++ include/cfg.h | 20 +++-- include/main.h | 59 ++----------- include/params.h | 62 +++++++++++++ include/sender.h | 207 ++++++++++++++++++++++++++++---------------- include/stats.h | 1 + include/worker.h | 198 ++++++++++++++++++++++++++++++++++++++++-- src/Makefile.am | 2 +- src/acl.cpp | 2 + src/bworker.cpp | 188 ++++++++++++++++++++++++++++++++++++++++ src/cmdlinetask.cpp | 23 ++--- src/main.cpp | 113 ++++++++++++++---------- src/sender.cpp | 115 ++++++++++-------------- src/worker.cpp | 192 ++++++---------------------------------- 17 files changed, 774 insertions(+), 431 deletions(-) create mode 100644 include/bworker.h create mode 100644 src/bworker.cpp diff --git a/configure.ac b/configure.ac index 86be8ec..fd963c9 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. AC_PREREQ([2.69]) -AC_INIT(extFilter, 0.97, max1976@mail.ru) +AC_INIT(extFilter, 0.99a, max1976@mail.ru) DPDK_HOME= DPDK_TARGET= diff --git a/include/Makefile.am b/include/Makefile.am index aa3014a..1caf44f 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -1,2 +1,2 @@ -noinst_HEADERS = main.h worker.h statistictask.h sender.h sendertask.h stats.h reloadtask.h flow.h dtypes.h dpdk.h acl.h cmdlinetask.h notification.h tries.h cfg.h utils.h http.h params.h arr.h ssl.h ssli.h +noinst_HEADERS = main.h worker.h statistictask.h sender.h sendertask.h stats.h reloadtask.h flow.h dtypes.h dpdk.h acl.h cmdlinetask.h notification.h tries.h cfg.h utils.h http.h params.h arr.h ssl.h ssli.h bworker.h diff --git a/include/acl.h b/include/acl.h index 8f1c360..ba76963 100644 --- a/include/acl.h +++ b/include/acl.h @@ -24,8 +24,9 @@ #include #include #include -#include "worker.h" -#include "main.h" +#include +//#include "worker.h" +#include "cfg.h" #define MAX_ACL_RULE_NUM 100000 #define DEFAULT_MAX_CATEGORIES 1 diff --git a/include/bworker.h b/include/bworker.h new file mode 100644 index 0000000..756ec50 --- /dev/null +++ b/include/bworker.h @@ -0,0 +1,14 @@ + +#pragma once + +#include "worker.h" + +class BWorkerThread: public WorkerThread +{ +public: + BWorkerThread(uint8_t worker_id, const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, struct ESender::nparams &sp, struct rte_mempool *mp); + ~BWorkerThread() {} + + bool run(uint32_t coreId); +}; + diff --git a/include/cfg.h b/include/cfg.h index 226069b..cf31c06 100644 --- a/include/cfg.h +++ b/include/cfg.h @@ -19,23 +19,33 @@ #pragma once -#include "params.h" - // maximum active threads #define MAX_WORKER_THREADS 10 +#define DEFAULT_MBUF_POOL_SIZE 8191 +#define MAX_RX_QUEUE_PER_LCORE 16 +#define MAX_LCORE_PARAMS 1024 +#define NB_SOCKETS 4 +#define MAX_RX_QUEUE_PER_PORT 128 +#define EXTF_RX_DESC_DEFAULT 256 +#define EXTF_TX_DESC_DEFAULT 512 +#define PERCENT_URL_ENTRIES 0.20 + +#define EXTFILTER_CAPTURE_BURST_SIZE 32 +#define EXTFILTER_WORKER_BURST_SIZE EXTFILTER_CAPTURE_BURST_SIZE +#define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */ + #define MAX_REDIRECT_URL_SIZE 1189 const char r_line1[] = "HTTP/1.1 302 Moved Temporarily\r\n"; const char r_line2[] = "Location: "; const char r_line3[] = "\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; -const char f_lines[] = "HTTP/1.1 403 Forbidden\r\nConnection: close\r\n\r\n"; +const char f_lines[] = "HTTP/1.1 403 Forbidden\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"; const char uri_p[] = "uri=http%3A%2F%2F"; #define OUR_REDIR_SIZE (sizeof(r_line1) + sizeof(r_line2) + sizeof(r_line3) - 3) #define OUR_PAYLOAD_SIZE 1400 -extern const global_params_t *global_prm; -extern worker_params_t worker_params[MAX_WORKER_THREADS]; + diff --git a/include/main.h b/include/main.h index e0e3d03..094bd74 100644 --- a/include/main.h +++ b/include/main.h @@ -28,52 +28,11 @@ #include "notification.h" #include "tries.h" -#define DEFAULT_MBUF_POOL_SIZE 8191 -#define MAX_RX_QUEUE_PER_LCORE 16 -#define MAX_LCORE_PARAMS 1024 -#define NB_SOCKETS 4 -#define MAX_RX_QUEUE_PER_PORT 128 -#define EXTF_RX_DESC_DEFAULT 256 -#define EXTF_TX_DESC_DEFAULT 512 -#define PERCENT_URL_ENTRIES 0.20 -#define EXTF_MAX_PKT_BURST 32 class AhoCorasickPlus; class Patricia; class ACL; -struct lcore_params { - uint8_t port_id; - uint8_t port_type; - uint8_t queue_id; - uint8_t lcore_id; - uint8_t mapto; -} __rte_cache_aligned; - -struct lcore_rx_queue { - uint8_t port_id; - uint8_t port_type; - uint8_t queue_id; -} __rte_cache_aligned; - -struct mbuf_table -{ - uint16_t len; - struct rte_mbuf* m_table[EXTF_MAX_PKT_BURST]; -}; - -struct lcore_conf { - uint16_t n_rx_queue; - struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE]; - struct rte_acl_ctx *cur_acx_ipv4, *new_acx_ipv4; - struct rte_acl_ctx *cur_acx_ipv6, *new_acx_ipv6; - uint8_t sender_port; - uint16_t tx_queue; - uint16_t tx_queue_id[RTE_MAX_ETHPORTS]; - struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS]; -} __rte_cache_aligned; - - class extFilter: public Poco::Util::ServerApplication { @@ -126,16 +85,6 @@ class extFilter: public Poco::Util::ServerApplication return _notify_acl_file; } - static inline uint64_t getTscHz() - { - return _tsc_hz; - } - - static inline struct rte_mempool *getPktInfoPool() - { - return packet_info_pool[rte_socket_id()]; - } - static inline struct lcore_conf *getLcoreConf(uint32_t lcore_id) { return &_lcore_conf[lcore_id]; @@ -195,7 +144,10 @@ class extFilter: public Poco::Util::ServerApplication return &_tries; } - static rte_mempool *packet_info_pool[NB_SOCKETS]; + inline operation_modes getOperationMode() + { + return _operation_mode; + } static struct ether_addr ports_eth_addr[RTE_MAX_ETHPORTS]; private: @@ -209,6 +161,7 @@ class extFilter: public Poco::Util::ServerApplication int _init_lcore_rx_queues(void); int _check_lcore_params(void); int _check_port_config(const unsigned nb_ports); + int initDPIMemPools(); bool _helpRequested; bool _listDPDKPorts; @@ -226,8 +179,6 @@ class extFilter: public Poco::Util::ServerApplication int _statistic_interval; struct CSender::params _sender_params; - - static uint64_t _tsc_hz; int _numa_on; uint32_t _enabled_port_mask; diff --git a/include/params.h b/include/params.h index dcee8fd..a163c02 100644 --- a/include/params.h +++ b/include/params.h @@ -21,6 +21,8 @@ #include #include +#include "dtypes.h" +#include "cfg.h" class FlowStorage; struct rte_mempool; @@ -64,6 +66,18 @@ struct fragmentation_configs_t fragmentation_config_t ipv6; }; +struct memory_pool_t +{ + rte_mempool *mempool; + uint32_t entries; // количество элементов +}; + +struct memory_pools_t +{ + memory_pool_t ssl_entries; + memory_pool_t http_entries; +}; + struct global_params_t { memory_configs_t memory_configs; @@ -72,6 +86,12 @@ struct global_params_t uint8_t workers_number; uint64_t flow_lifetime[2]; // [0] для flows, который завершены или установлены без данных, [1] для flows с данными uint8_t answer_duplication; + operation_modes operation_mode; +}; + +struct common_data_t +{ + memory_pools_t mempools; }; struct flow_storage_t @@ -89,4 +109,46 @@ struct worker_params_t uint16_t tx_queue_id; } __rte_cache_aligned; +struct lcore_params { + uint8_t port_id; + uint8_t port_type; + uint8_t queue_id; + uint8_t lcore_id; + uint8_t mapto; +} __rte_cache_aligned; + +struct lcore_rx_queue { + uint8_t port_id; + uint8_t port_type; + uint8_t queue_id; +} __rte_cache_aligned; + +struct mbuf_table +{ + uint16_t len; + struct rte_mbuf* m_table[EXTFILTER_CAPTURE_BURST_SIZE]; +}; + +struct lcore_conf { + uint16_t n_rx_queue; + struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE]; + struct rte_acl_ctx *cur_acx_ipv4, *new_acx_ipv4; + struct rte_acl_ctx *cur_acx_ipv6, *new_acx_ipv6; + uint8_t sender_port; + uint16_t n_tx_port; + uint16_t tx_queue; + uint16_t tx_port_id[RTE_MAX_ETHPORTS]; + uint16_t tx_queue_id[RTE_MAX_ETHPORTS]; + struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS]; +} __rte_cache_aligned; + +struct filter_tx +{ + struct mbuf_table *mbuf; + bool to_client; +}; + +extern const global_params_t *global_prm; +extern common_data_t *common_data; +extern worker_params_t worker_params[MAX_WORKER_THREADS]; diff --git a/include/sender.h b/include/sender.h index b91c288..25d7fc7 100644 --- a/include/sender.h +++ b/include/sender.h @@ -41,6 +41,7 @@ #include #include #include +#include #include "cfg.h" class BSender @@ -150,24 +151,96 @@ class BSender return pkt_len; } - inline int makeSwapPacketIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, uint8_t *buffer) + + virtual int Send(uint8_t *buffer, int size, void *addr, int addr_size) = 0; + + Poco::Logger& _logger; + struct params _parameters; + uint16_t pkt_id; +}; + +class CSender : public BSender +{ +public: + CSender(struct BSender::params &prm); + ~CSender(); + int Send(uint8_t *buffer, int size, void *addr, int addr_size); +private: + int s; + int s6; +}; + +class DSender : public BSender +{ +public: + DSender(struct BSender::params &prm, uint8_t port, uint8_t *mac, uint8_t *to_mac, struct rte_mempool *mp); + ~DSender(); + int Send(uint8_t *buffer, int size, void *addr, int addr_size); +private: + uint8_t _port; + struct ether_hdr _eth_hdr; + struct rte_mempool *_mp; +}; + +class WorkerThread; + +class ESender : public BSender +{ +public: + struct nparams { - int pkt_len; + struct params params; + uint8_t *mac; + uint8_t *to_mac; + int answer_duplication; + struct rte_mempool *clone_pool; + }; + ESender(struct nparams ¶ms, uint8_t port, struct rte_mempool *mp, WorkerThread *wt, bool keep_l2_hdr = false); + ~ESender(); + + void sendPacket(void *ip_from, void *ip_to, int ip_ver, int port_from, int port_to, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, int f_reset, int f_psh); + int Send(uint8_t *buffer, int size, void *addr, int addr_size) + { + return size; + } + + inline int makeSwapPacketIPv4(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, struct rte_mbuf *m, bool to_server = false) + { + int pkt_len; + const uint8_t *pkt = pkt_infos->pkt; struct ipv4_hdr *ipv4_header = (struct ipv4_hdr *)pkt; struct tcphdr *tcph_orig = (struct tcphdr *)(pkt + sizeof(struct ipv4_hdr)); + // ethernet header + std::size_t l2_hdr_size = _keep_l2_hdr ? (pkt_infos->pkt - pkt_infos->l2_pkt) : sizeof(struct ether_hdr); + struct ether_hdr *eth_hdr = (struct ether_hdr *) rte_pktmbuf_append(m, l2_hdr_size); + if(_keep_l2_hdr) + { + rte_memcpy(eth_hdr, pkt_infos->l2_pkt, l2_hdr_size); + if(!to_server) + { + struct ether_addr addr; + ether_addr_copy(ð_hdr->d_addr, &addr); + ether_addr_copy(ð_hdr->s_addr, ð_hdr->d_addr); + ether_addr_copy(&addr, ð_hdr->s_addr); + } + } else { + *eth_hdr = _eth_hdr; + } + // IP header - struct iphdr *iph = (struct iphdr *) buffer; + struct iphdr *iph = (struct iphdr *) rte_pktmbuf_append(m, sizeof(struct iphdr)); // TCP header - struct tcphdr *tcph = (struct tcphdr *) (buffer + sizeof(struct iphdr)); + struct tcphdr *tcph = (struct tcphdr *) rte_pktmbuf_append(m, sizeof(struct tcphdr)); // Data part - uint8_t *data = (uint8_t *)tcph + sizeof(struct tcphdr); - if(dt_buf != nullptr && dt_len != 0) + { + uint8_t *data = (uint8_t *) rte_pktmbuf_append(m, dt_len); rte_memcpy(data, dt_buf, dt_len); + } if(_logger.getLevel() == Poco::Message::PRIO_DEBUG) { @@ -184,8 +257,14 @@ class BSender iph->ttl = _parameters.ttl; iph->protocol = IPPROTO_TCP; iph->check = 0; - iph->saddr = ipv4_header->dst_addr; - iph->daddr = ipv4_header->src_addr; + if(to_server) + { + iph->saddr = ipv4_header->src_addr; + iph->daddr = ipv4_header->dst_addr; + } else { + iph->saddr = ipv4_header->dst_addr; + iph->daddr = ipv4_header->src_addr; + } pkt_len = sizeof(struct iphdr) + sizeof(struct tcphdr) + dt_len; // TCP Header @@ -215,27 +294,48 @@ class BSender tcph->urg_ptr = 0; tcph->check = rte_ipv4_udptcp_cksum((const ipv4_hdr*)iph, tcph); + iph->check = rte_ipv4_cksum((const ipv4_hdr*)iph); + return pkt_len; } - inline int makeSwapPacketIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, uint8_t *buffer) + inline int makeSwapPacketIPv6(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, struct rte_mbuf *m, bool to_server = false) { int pkt_len; + const uint8_t *pkt = pkt_infos->pkt; struct ipv6_hdr *ipv6_header = (struct ipv6_hdr *)pkt; struct tcphdr *tcph_orig = (struct tcphdr *)(pkt + sizeof(struct ipv6_hdr)); + // ethernet header + std::size_t l2_hdr_size = _keep_l2_hdr ? (pkt_infos->pkt - pkt_infos->l2_pkt) : sizeof(struct ether_hdr); + struct ether_hdr *eth_hdr = (struct ether_hdr *) rte_pktmbuf_append(m, l2_hdr_size); + if(_keep_l2_hdr) + { + rte_memcpy(eth_hdr, pkt_infos->l2_pkt, l2_hdr_size); + if(!to_server) + { + struct ether_addr addr; + ether_addr_copy(ð_hdr->d_addr, &addr); + ether_addr_copy(ð_hdr->s_addr, ð_hdr->d_addr); + ether_addr_copy(&addr, ð_hdr->s_addr); + } + } else { + *eth_hdr = _eth_hdr_ipv6; + } + // IP header - struct ip6_hdr *iph6 = (struct ip6_hdr *) buffer; + struct ip6_hdr *iph6 = (struct ip6_hdr *) rte_pktmbuf_append(m, sizeof(struct ip6_hdr)); // TCP header - struct tcphdr *tcph = (struct tcphdr *) (buffer + sizeof(struct ipv6_hdr)); + struct tcphdr *tcph = (struct tcphdr *) rte_pktmbuf_append(m, sizeof(struct tcphdr)); // Data part - uint8_t *data = (uint8_t *)tcph + sizeof(struct tcphdr); - if(dt_buf != nullptr && dt_len != 0) + { + uint8_t *data = (uint8_t *) rte_pktmbuf_append(m, dt_len); rte_memcpy(data, dt_buf, dt_len); + } if(_logger.getLevel() == Poco::Message::PRIO_DEBUG) { @@ -250,8 +350,14 @@ class BSender iph6->ip6_nxt = IPPROTO_TCP; // Hop limit (8 bits): default to maximum value iph6->ip6_hops = 250; - rte_mov16((uint8_t *)&iph6->ip6_src, (uint8_t *)&ipv6_header->dst_addr); - rte_mov16((uint8_t *)&iph6->ip6_dst, (uint8_t *)&ipv6_header->src_addr); + if(to_server) + { + rte_mov16((uint8_t *)&iph6->ip6_src, (uint8_t *)&ipv6_header->src_addr); + rte_mov16((uint8_t *)&iph6->ip6_dst, (uint8_t *)&ipv6_header->dst_addr); + } else { + rte_mov16((uint8_t *)&iph6->ip6_src, (uint8_t *)&ipv6_header->dst_addr); + rte_mov16((uint8_t *)&iph6->ip6_dst, (uint8_t *)&ipv6_header->src_addr); + } pkt_len = (sizeof(struct ip6_hdr) + sizeof(struct tcphdr) + dt_len); // TCP Header tcph->source = tcph_orig->dest; @@ -281,75 +387,24 @@ class BSender return pkt_len; } - virtual int Send(uint8_t *buffer, int size, void *addr, int addr_size) = 0; - - Poco::Logger& _logger; - struct params _parameters; - uint16_t pkt_id; -}; - -class CSender : public BSender -{ -public: - CSender(struct BSender::params &prm); - ~CSender(); - int Send(uint8_t *buffer, int size, void *addr, int addr_size); -private: - int s; - int s6; -}; - -class DSender : public BSender -{ -public: - DSender(struct BSender::params &prm, uint8_t port, uint8_t *mac, uint8_t *to_mac, struct rte_mempool *mp); - ~DSender(); - int Send(uint8_t *buffer, int size, void *addr, int addr_size); -private: - uint8_t _port; - struct ether_hdr _eth_hdr; - struct rte_mempool *_mp; -}; - -class WorkerThread; - -class ESender : public BSender -{ -public: - struct nparams - { - struct params params; - uint8_t *mac; - uint8_t *to_mac; - int answer_duplication; - struct rte_mempool *clone_pool; - }; - ESender(struct nparams ¶ms, uint8_t port, struct rte_mempool *mp, WorkerThread *wt); - ~ESender(); - - void sendPacket(void *ip_from, void *ip_to, int ip_ver, int port_from, int port_to, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, int f_reset, int f_psh); - - int Send(uint8_t *buffer, int size, void *addr, int addr_size) - { - return size; - } - - void sendPacketIPv4(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server = false); - void sendPacketIPv6(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server = false); - void HTTPRedirectIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh, const char *redir_url, size_t r_len); - void HTTPRedirectIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh, const char *redir_url, size_t r_len); - void SendRSTIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum); - void SendRSTIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum); - void HTTPForbiddenIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh); - void HTTPForbiddenIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh); + void sendPacketIPv4(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server = false); + void SendRSTIPv4(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum); + void HTTPRedirectIPv4(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, bool f_psh, const char *redir_url, size_t r_len); + void HTTPForbiddenIPv4(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, bool f_psh); + void sendPacketIPv6(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server = false); + void HTTPRedirectIPv6(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, bool f_psh, const char *redir_url, size_t r_len); + void SendRSTIPv6(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum); + void HTTPForbiddenIPv6(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, bool f_psh); private: uint8_t _port; struct ether_hdr _eth_hdr; + struct ether_hdr _eth_hdr_ipv6; struct rte_mempool *_mp; struct rte_mempool *_clone_pool; WorkerThread *_wt; int _answer_duplication; + bool _keep_l2_hdr; }; #endif diff --git a/include/stats.h b/include/stats.h index dbb45dd..f319643 100644 --- a/include/stats.h +++ b/include/stats.h @@ -90,6 +90,7 @@ struct ThreadStats uint64_t matched_http_bl_ipv6; uint64_t redirected_http_bl_ipv6; + uint64_t tx_dropped; ThreadStats() { diff --git a/include/worker.h b/include/worker.h index b61bcd0..a3a03ff 100644 --- a/include/worker.h +++ b/include/worker.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include "flow.h" #include "stats.h" @@ -35,9 +36,8 @@ #include "sender.h" #include "http.h" #include "ssl.h" - -#define EXTFILTER_CAPTURE_BURST_SIZE 32 -#define EXTFILTER_WORKER_BURST_SIZE 32 +#include "acl.h" +#include "cfg.h" #define CERT_RESERVATION_SIZE 1024 @@ -67,6 +67,7 @@ struct WorkerConfig class WorkerThread : public DpdkWorkerThread { friend class ESender; + friend class BWorkerThread; private: WorkerConfig m_WorkerConfig; bool m_Stop; @@ -83,7 +84,7 @@ class WorkerThread : public DpdkWorkerThread // bool analyzePacketIPv4(struct rte_mbuf* mBuf, uint64_t timestamp); dpi_identification_result_t getAppProtocol(uint8_t *host_key, uint64_t timestamp, uint32_t sig, dpi_pkt_infos_t *pkt_infos); - dpi_identification_result_t identifyAppProtocol(const unsigned char* pkt, u_int32_t length, u_int32_t current_time, struct packet_info *pkt_info, uint32_t sig); + dpi_identification_result_t identifyAppProtocol(const unsigned char* pkt, u_int32_t length, const uint8_t *l2_pkt, u_int32_t current_time, struct packet_info *pkt_info, uint32_t sig); bool checkSSL(); std::string _name; @@ -91,7 +92,9 @@ class WorkerThread : public DpdkWorkerThread /// for sender through dpdk int _n_send_pkts; +// struct filter_tx _sender_buf[EXTFILTER_WORKER_BURST_SIZE]; struct rte_mbuf* _sender_buf[EXTFILTER_WORKER_BURST_SIZE]; + bool _sender_buf_flags[EXTFILTER_WORKER_BURST_SIZE]; ESender *_snd; struct rte_mempool *_dpi_http_mempool; struct rte_mempool *_dpi_ssl_mempool; @@ -103,7 +106,7 @@ class WorkerThread : public DpdkWorkerThread struct packet_info _pkt_infos[EXTFILTER_CAPTURE_BURST_SIZE]; public: - WorkerThread(uint8_t worker_id, const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, int socketid, struct ESender::nparams &sp, struct rte_mempool *mp, struct rte_mempool *dpi_http_mempool, struct rte_mempool *dpi_ssl_mempool); + WorkerThread(uint8_t worker_id, const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, struct ESender::nparams &sp, struct rte_mempool *mp); ~WorkerThread(); bool checkURLBlocked(const char *host, size_t host_len, const char *uri, size_t uri_len, dpi_pkt_infos_t* pkt); @@ -191,5 +194,190 @@ class WorkerThread : public DpdkWorkerThread return _worker_id; } + // qconf - core config + // n - number of packets + // port - port to send + inline int send_burst(struct lcore_conf *qconf, uint16_t n, uint16_t port) + { + struct rte_mbuf **m_table; + int ret; + uint16_t queueid; + + queueid = qconf->tx_queue_id[port]; + m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table; + + ret = rte_eth_tx_burst(port, queueid, m_table, n); + if (unlikely(ret < n)) + { + m_ThreadStats.tx_dropped += (n - ret); + do { + rte_pktmbuf_free(m_table[ret]); + } while (++ret < n); + } + return 0; + } + + /* Enqueue a single packet, and send burst if queue is filled */ + inline int send_single_packet(struct lcore_conf *qconf, struct rte_mbuf *m, uint16_t port) + { + uint16_t len; + + len = qconf->tx_mbufs[port].len; + qconf->tx_mbufs[port].m_table[len] = m; + len++; + + /* enough pkts to be sent */ + if (unlikely(len == EXTFILTER_WORKER_BURST_SIZE)) + { + send_burst(qconf, EXTFILTER_WORKER_BURST_SIZE, port); + len = 0; + } + + qconf->tx_mbufs[port].len = len; + return 0; + } + +}; + +static inline void parsePtype(struct rte_mbuf *m, struct packet_info *pkt_info) +{ + struct ether_hdr *eth_hdr; + uint32_t packet_type = RTE_PTYPE_UNKNOWN; + uint16_t ether_type; + uint8_t *l3; + int hdr_len; + struct ipv4_hdr *ipv4_hdr; + struct ipv6_hdr *ipv6_hdr; + + pkt_info->timestamp = rte_rdtsc(); // timestamp + + eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *); + ether_type = rte_be_to_cpu_16(eth_hdr->ether_type); + l3 = (uint8_t *)eth_hdr + sizeof(struct ether_hdr); + + if(ether_type == ETHER_TYPE_VLAN || ether_type == 0x8847) + { + while(1) + { + if(ether_type == ETHER_TYPE_VLAN) + { + struct vlan_hdr *vlan_hdr = (struct vlan_hdr *)(l3); + ether_type = rte_be_to_cpu_16(vlan_hdr->eth_proto); + l3 += sizeof(struct vlan_hdr); + } else if(ether_type == 0x8847) + { + uint8_t bos = ((uint8_t *)l3)[2] & 0x1; + l3 += 4; + if(bos) + { + ether_type = ETHER_TYPE_IPv4; + break; + } + } else + break; + } + } + + if (ether_type == ETHER_TYPE_IPv4) + { + ipv4_hdr = (struct ipv4_hdr *)l3; + hdr_len = (ipv4_hdr->version_ihl & IPV4_HDR_IHL_MASK) * IPV4_IHL_MULTIPLIER; + if (hdr_len == sizeof(struct ipv4_hdr)) + { + packet_type |= RTE_PTYPE_L3_IPV4; + if (ipv4_hdr->next_proto_id == IPPROTO_TCP) + packet_type |= RTE_PTYPE_L4_TCP; + else if (ipv4_hdr->next_proto_id == IPPROTO_UDP) + packet_type |= RTE_PTYPE_L4_UDP; + } else + packet_type |= RTE_PTYPE_L3_IPV4_EXT; + } else if (ether_type == ETHER_TYPE_IPv6) + { + ipv6_hdr = (struct ipv6_hdr *)l3; + if (ipv6_hdr->proto == IPPROTO_TCP) + packet_type |= RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_TCP; + else if (ipv6_hdr->proto == IPPROTO_UDP) + packet_type |= RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP; + else + packet_type |= RTE_PTYPE_L3_IPV6_EXT_UNKNOWN; + } + m->packet_type = packet_type; + pkt_info->l3 = l3; + pkt_info->acl_res = ACL::ACL_DEFAULT_POLICY; + m->userdata = pkt_info; + uint32_t tcp_or_udp = packet_type & (RTE_PTYPE_L4_TCP | RTE_PTYPE_L4_UDP); + uint32_t l3_ptypes = packet_type & RTE_PTYPE_L3_MASK; + if(tcp_or_udp && (l3_ptypes == RTE_PTYPE_L3_IPV6)) + { + void *ipv6_hdr = l3 + offsetof(struct ipv6_hdr, payload_len); + void *data0 = ipv6_hdr; + void *data1 = ((uint8_t *)ipv6_hdr) + sizeof(xmm_t); + void *data2 = ((uint8_t *)ipv6_hdr) + sizeof(xmm_t) + sizeof(xmm_t); + pkt_info->keys.ipv6_key.xmm[0] = em_mask_key(data0, mask1.x); + pkt_info->keys.ipv6_key.xmm[1] = _mm_loadu_si128((__m128i *)(data1)); + pkt_info->keys.ipv6_key.xmm[2] = em_mask_key(data2, mask2.x); + m->hash.usr = ipv6_hash_crc(&pkt_info->keys.ipv6_key,0,0); + } else if (tcp_or_udp && (l3_ptypes == RTE_PTYPE_L3_IPV4)) + { + void *ipv4_hdr = l3 + offsetof(struct ipv4_hdr, time_to_live); + pkt_info->keys.ipv4_key.xmm = em_mask_key(ipv4_hdr, mask0.x); + m->hash.usr = ipv4_hash_crc(&pkt_info->keys.ipv4_key,0,0); + } +}; + +/* + * Put one packet in acl_search struct according to the packet ol_flags + */ +static inline void prepare_one_packet(struct rte_mbuf** pkts_in, struct ACL::acl_search_t* acl, int index, struct packet_info *pkt_info) +{ + struct rte_mbuf* pkt = pkts_in[index]; + + parsePtype(pkt, pkt_info); + uint32_t l3_ptypes = pkt->packet_type & RTE_PTYPE_L3_MASK; + + // XXX we cannot filter non IP packet yet + if (l3_ptypes == RTE_PTYPE_L3_IPV4) + { + /* Fill acl structure */ + acl->data_ipv4[acl->num_ipv4] = ((struct packet_info *)pkt->userdata)->l3 + offsetof(struct ipv4_hdr, next_proto_id); + acl->m_ipv4[(acl->num_ipv4)++] = pkt; + } else if (l3_ptypes == RTE_PTYPE_L3_IPV6) + { + /* Fill acl structure */ + acl->data_ipv6[acl->num_ipv6] = ((struct packet_info *)pkt->userdata)->l3 + offsetof(struct ipv6_hdr, proto); + acl->m_ipv6[(acl->num_ipv6)++] = pkt; + } }; +/* + * Loop through all packets and classify them if acl_search if possible. + */ +static inline void prepare_acl_parameter(struct rte_mbuf** pkts_in, struct ACL::acl_search_t* acl, int nb_rx, struct packet_info *pkt_infos) +{ + int i = 0, j = 0; + + acl->num_ipv4 = 0; + acl->num_ipv6 = 0; + +#define PREFETCH() \ + rte_prefetch0(rte_pktmbuf_mtod(pkts_in[i], void*)); \ + i++; \ + j++; + + // we prefetch0 packets 3 per 3 + switch (nb_rx % PREFETCH_OFFSET) { + while (nb_rx != i) { + case 0: + PREFETCH(); + case 2: + PREFETCH(); + case 1: + PREFETCH(); + while (j > 0) + { + prepare_one_packet(pkts_in, acl, i - j, &pkt_infos[i-j]); + --j; + } + } + } +}; diff --git a/src/Makefile.am b/src/Makefile.am index e8b0bf2..d71fa21 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -6,5 +6,5 @@ bin_PROGRAMS = extFilter extFilter_LDFLAGS = -Wl,--whole-archive -lrte_acl -lrte_pmd_bond -lrte_pmd_vmxnet3_uio -lrte_pmd_virtio -lrte_pmd_enic -lrte_pmd_i40e -lrte_pmd_fm10k -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ring -lrte_pmd_af_packet -lrte_ethdev -lrte_eal -lrte_mbuf -lrte_mempool -lrte_mempool_ring -lrte_mempool_stack -lrte_ring -lrte_kvargs -lrte_hash -lrte_cmdline -lrte_net -Wl,--no-whole-archive -extFilter_SOURCES = main.cpp worker.cpp sender.cpp statistictask.cpp reloadtask.cpp flow.cpp acl.cpp cmdlinetask.cpp notification.cpp tries.cpp utils.cpp http.cpp ssl.cpp ssli.cpp +extFilter_SOURCES = main.cpp worker.cpp sender.cpp statistictask.cpp reloadtask.cpp flow.cpp acl.cpp cmdlinetask.cpp notification.cpp tries.cpp utils.cpp http.cpp ssl.cpp ssli.cpp bworker.cpp diff --git a/src/acl.cpp b/src/acl.cpp index d2410c8..e7856d6 100644 --- a/src/acl.cpp +++ b/src/acl.cpp @@ -20,7 +20,9 @@ #define __STDC_FORMAT_MACROS #include #include +#include #include "acl.h" +#include "main.h" struct rte_acl_ctx* ACL::ipv4_acx[NB_SOCKETS]; struct rte_acl_ctx* ACL::ipv6_acx[NB_SOCKETS]; diff --git a/src/bworker.cpp b/src/bworker.cpp new file mode 100644 index 0000000..3019cc4 --- /dev/null +++ b/src/bworker.cpp @@ -0,0 +1,188 @@ + +#include "bworker.h" +#include "main.h" +#include +#include "acl.h" + +BWorkerThread::BWorkerThread(uint8_t worker_id, const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, struct ESender::nparams &sp, struct rte_mempool *mp) : + WorkerThread(worker_id, name, workerConfig, state, sp, mp) +{ +} + + +bool BWorkerThread::run(uint32_t coreId) +{ + setCoreId(coreId); + uint8_t portid = 0, queueid, port_type; + uint32_t lcore_id; + struct lcore_conf* qconf; + uint16_t nb_rx; + struct rte_mbuf *bufs[EXTFILTER_CAPTURE_BURST_SIZE]; + + lcore_id = rte_lcore_id(); + qconf = extFilter::getLcoreConf(lcore_id); + + if (qconf->n_rx_queue == 0) + { + _logger.information("Lcore %d has nothing to do", (int) lcore_id); + return true; + } + + m_Stop = false; + struct rte_mbuf *buf; + + const uint64_t timer_interval = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * (1000*1000); + const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * BURST_TX_DRAIN_US; + + uint64_t last_sec = 0; + + uint64_t cur_tsc, diff_timer_tsc, diff_tsc; + uint64_t prev_timer_tsc = 0, prev_tsc = 0; + + uint8_t sender_port = qconf->sender_port; + uint16_t tx_queue_id = qconf->tx_queue_id[sender_port]; + _logger.information("Output port for the worker %d is %d (tx_queue_id %d) n_tx_port %d", (int)_worker_id, (int)sender_port, (int)tx_queue_id, (int) qconf->n_tx_port); + + _logger.debug("Starting bridge working thread on core %u", coreId); + + for (int i = 0; i < qconf->n_rx_queue; i++) + { + portid = qconf->rx_queue_list[i].port_id; + queueid = qconf->rx_queue_list[i].queue_id; + _logger.information("-- lcoreid=%d portid=%d rxqueueid=%d", (int)lcore_id, (int)portid, (int)queueid); + } + + // main loop, runs until be told to stop + while (!m_Stop) + { + if(m_Stop) + break; + + cur_tsc = rte_rdtsc(); +//#define ATOMIC_ACL +#ifdef ATOMIC_ACL +#define SWAP_ACX(cur_acx, new_acx) \ + rte_atomic64_cmpswap((uintptr_t*)&new_acx, (uintptr_t*)&cur_acx, \ + (uintptr_t)new_acx)) +#else +#define SWAP_ACX(cur_acx, new_acx) \ + if (unlikely(cur_acx != new_acx)) { \ + cur_acx = new_acx; \ + } +#endif + SWAP_ACX(qconf->cur_acx_ipv4, qconf->new_acx_ipv4); + SWAP_ACX(qconf->cur_acx_ipv6, qconf->new_acx_ipv6); +#undef SWAP_ACX + + diff_tsc = cur_tsc - prev_tsc; + if (unlikely(diff_tsc > drain_tsc)) + { + for (uint16_t i = 0; i < qconf->n_tx_port; ++i) + { + portid = qconf->tx_port_id[i]; + if (qconf->tx_mbufs[portid].len == 0) + continue; + send_burst(qconf, qconf->tx_mbufs[portid].len, portid); + qconf->tx_mbufs[portid].len = 0; + } + prev_tsc = cur_tsc; + } + + /* + * Read packet from RX queues + */ + for (int i = 0; i < qconf->n_rx_queue; i++) + { + portid = qconf->rx_queue_list[i].port_id; + port_type = qconf->rx_queue_list[i].port_type; + queueid = qconf->rx_queue_list[i].queue_id; + nb_rx = rte_eth_rx_burst(portid, queueid, bufs, EXTFILTER_CAPTURE_BURST_SIZE); + if (unlikely(nb_rx == 0)) + continue; + + m_ThreadStats.total_packets += nb_rx; + + switch (port_type) + { + case P_TYPE_NETWORK: + for (uint16_t z = 0; z < nb_rx; z++) + { + send_single_packet(qconf, bufs[z], sender_port); + } + break; + + case P_TYPE_SUBSCRIBER: + struct ACL::acl_search_t acl_search; + prepare_acl_parameter(bufs, &acl_search, nb_rx, &_pkt_infos[0]); + if(likely(qconf->cur_acx_ipv4 && acl_search.num_ipv4)) + { + rte_acl_classify(qconf->cur_acx_ipv4, acl_search.data_ipv4, acl_search.res_ipv4, acl_search.num_ipv4, DEFAULT_MAX_CATEGORIES); + for(int acli=0; acli < acl_search.num_ipv4; acli++) + { + if(unlikely(acl_search.res_ipv4[acli] != 0)) + { + ((struct packet_info *)acl_search.m_ipv4[acli]->userdata)->acl_res=acl_search.res_ipv4[acli]; + } + } + } + if(qconf->cur_acx_ipv6 && acl_search.num_ipv6) + { + rte_acl_classify(qconf->cur_acx_ipv6, acl_search.data_ipv6, acl_search.res_ipv6, acl_search.num_ipv6, DEFAULT_MAX_CATEGORIES); + for(int acli=0; acli < acl_search.num_ipv6; acli++) + { + if(unlikely(acl_search.res_ipv6[acli] != 0)) + { + ((struct packet_info *)acl_search.m_ipv6[acli]->userdata)->acl_res=acl_search.res_ipv6[acli]; + } + } + } + uint64_t cycles = 0; + uint64_t blocked_cycles = 0; + uint64_t unblocked_cycles = 0; + for(uint16_t i = 0; i < nb_rx; i++) + { + buf = bufs[i]; + rte_prefetch0(rte_pktmbuf_mtod(buf, void *)); + if(likely(buf->userdata != nullptr)) + { + bool need_block = analyzePacket(buf, last_sec); + uint64_t now = rte_rdtsc(); + if(need_block) + { + blocked_cycles += now - ((struct packet_info *)buf->userdata)->timestamp; + m_ThreadStats.latency_counters.blocked_pkts++; + rte_pktmbuf_free(buf); + } else { + unblocked_cycles += now - ((struct packet_info *)buf->userdata)->timestamp; + m_ThreadStats.latency_counters.unblocked_pkts++; + send_single_packet(qconf, buf, sender_port); + } + cycles += now - ((struct packet_info *)buf->userdata)->timestamp; + } + } + m_ThreadStats.latency_counters.total_cycles += cycles; + m_ThreadStats.latency_counters.blocked_cycles += blocked_cycles; + m_ThreadStats.latency_counters.unblocked_cycles += unblocked_cycles; + m_ThreadStats.latency_counters.total_pkts += nb_rx; + if(unlikely(_n_send_pkts != 0)) + { + for(int z = 0; z < _n_send_pkts; z++) + { + send_single_packet(qconf, _sender_buf[z], _sender_buf_flags[z] ? sender_port : portid); + } + _n_send_pkts = 0; + } + break; + } + } + + diff_timer_tsc = cur_tsc - prev_timer_tsc; + if (unlikely(diff_timer_tsc >= timer_interval)) + { + last_sec++; + prev_timer_tsc = cur_tsc; + } + } + _logger.debug("Worker thread on core %u terminated", coreId); + return true; +} diff --git a/src/cmdlinetask.cpp b/src/cmdlinetask.cpp index 7a9af02..6fadb9a 100644 --- a/src/cmdlinetask.cpp +++ b/src/cmdlinetask.cpp @@ -374,13 +374,19 @@ static void display_worker_stats(struct cmdline* cl,const ThreadStats &stats) cmdline_printf(cl, " SSL/TLS packets: %" PRIu64 "\n", stats.ssl_packets); cmdline_printf(cl, " SSL/TLS partial packets: %" PRIu64 "\n", stats.dpi_ssl_partial_packets); - cmdline_printf(cl, " Allocs:\n"); - cmdline_printf(cl, " HTTP: %" PRIu64 "\n", stats.dpi_alloc_http); - cmdline_printf(cl, " SSL: %" PRIu64 "\n", stats.dpi_alloc_ssl); - -// cmdline_printf(cl, " SSL/TLS max packet size: %" PRIu32 "\n", ssl_max_packet_size); -// cmdline_printf(cl, " SSL/TLS mallocs: %" PRIu64 "\n", ssl_mallocs); -// cmdline_printf(cl, " SSL/TLS reallocs: %" PRIu64 "\n", ssl_reallocs); + cmdline_printf(cl, " DPI parsers:\n"); + cmdline_printf(cl, " Allocs:\n"); + cmdline_printf(cl, " HTTP: %" PRIu64 "\n", stats.dpi_alloc_http); + cmdline_printf(cl, " SSL: %" PRIu64 "\n", stats.dpi_alloc_ssl); + cmdline_printf(cl, " Memory pools usage:\n"); + cmdline_printf(cl, " HTTP: %d\n", rte_mempool_in_use_count(common_data->mempools.http_entries.mempool)); + cmdline_printf(cl, " SSL: %d\n", rte_mempool_in_use_count(common_data->mempools.ssl_entries.mempool)); + cmdline_printf(cl, " Memory pools free:\n"); + cmdline_printf(cl, " HTTP: %d\n", rte_mempool_avail_count(common_data->mempools.http_entries.mempool)); + cmdline_printf(cl, " SSL: %d\n", rte_mempool_avail_count(common_data->mempools.ssl_entries.mempool)); + cmdline_printf(cl, " DPI errors:\n"); + cmdline_printf(cl, " No memory http: %" PRIu64 "\n",stats.dpi_no_mempool_http); + cmdline_printf(cl, " No memory ssl: %" PRIu64 "\n",stats.dpi_no_mempool_ssl); if(stats.ip_packets && stats.total_bytes) { @@ -415,9 +421,6 @@ static void display_worker_stats(struct cmdline* cl,const ThreadStats &stats) cmdline_printf(cl, " IPv6:\n"); cmdline_printf(cl, " HTTP : %" PRIu64 "\n", stats.seen_already_blocked_http_ipv6); cmdline_printf(cl, " SSL : %" PRIu64 "\n", stats.seen_already_blocked_ssl_ipv6); - cmdline_printf(cl, " DPI errors:\n"); - cmdline_printf(cl, " No memory http: %" PRIu64 "\n",stats.dpi_no_mempool_http); - cmdline_printf(cl, " No memory ssl: %" PRIu64 "\n",stats.dpi_no_mempool_ssl); cmdline_printf(cl, " Flows:\n"); cmdline_printf(cl, " IPv4:\n"); cmdline_printf(cl, " New: %" PRIu64 "\n", stats.new_flow); diff --git a/src/main.cpp b/src/main.cpp index e77b65c..5c0645c 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -33,6 +33,7 @@ #include "config.h" #include "tries.h" #include "ssli.h" +#include "bworker.h" #define MBUF_CACHE_SIZE 256 @@ -44,8 +45,6 @@ #define DPDK_CONFIG_HW_STRIP_CRC 0 /**< CRC stripped by hardware disabled */ #define DPDK_CONFIG_MQ_MODE ETH_MQ_RX_RSS -uint64_t extFilter::_tsc_hz; - extFilter *extFilter::_instance = NULL; struct ether_addr extFilter::ports_eth_addr[RTE_MAX_ETHPORTS]; @@ -56,6 +55,7 @@ uint8_t sender_mac[6]; const global_params_t *global_prm = nullptr; // основные параметры системы worker_params_t worker_params[MAX_WORKER_THREADS] __rte_cache_aligned; // параметры для worker'ов +common_data_t *common_data; // данные для работы фильтра uint8_t m_RSSKey[40] = { 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, @@ -93,8 +93,13 @@ extFilter::extFilter(): _helpRequested(false), if(global_prm == nullptr) { global_prm = new global_params_t; + memset((void *)global_prm, 0, sizeof(global_params_t)); + } + if(common_data == nullptr) + { + common_data = new common_data_t; + memset(common_data, 0, sizeof(common_data_t)); } - // Poco::ErrorHandler::set(&_errorHandler); } @@ -102,6 +107,7 @@ extFilter::extFilter(): _helpRequested(false), extFilter::~extFilter() { delete global_prm; + delete common_data; } int _calc_scale(int scale, int min_val, int max_val) @@ -126,7 +132,6 @@ int _calc_number_recs(int n_workers, int num_flows) void extFilter::initParams() { global_params_t *prm = (global_params_t *)global_prm; - memset(prm, 0, sizeof(global_params_t)); int scale = config().getInt("dpi.scale", 10); if(scale < 1 || scale > 10) @@ -211,6 +216,8 @@ void extFilter::initParams() { logger().warning("answer_duplication set to 3, it must be between 0 and 3"); } + + prm->operation_mode = _operation_mode; } static inline unsigned get_port_max_rx_queues(uint8_t port_id) @@ -337,7 +344,8 @@ int extFilter::initPort(uint8_t port, struct ether_addr *addr, bool no_promisc) int retval; nb_rx_queue = _get_port_n_rx_queues(port); - nb_tx_queue = nb_rx_queue; + nb_tx_queue = rte_lcore_count(); + if (nb_rx_queue > get_port_max_rx_queues(port)) { logger().fatal("Number of rx queues %d exceeds max number of rx queues %d for port %d", (int)nb_rx_queue, (int)get_port_max_rx_queues(port), (int)port); @@ -387,12 +395,11 @@ int extFilter::initPort(uint8_t port, struct ether_addr *addr, bool no_promisc) return retval; } } - if (queueid == -1) { +// if (queueid == -1) { // no rx_queue set, don't need to setup tx_queue for // that core - continue; - } -// retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, rte_eth_dev_socket_id(port), NULL); +// continue; +// } logger().information("port=%d tx_queueid=%d nb_txd=%d core=%d", (int) port, (int) nb_tx_queue, (int) _nb_txd, (int) lcore_id); retval = rte_eth_tx_queue_setup(port, nb_tx_queue, _nb_txd, socketid, NULL); @@ -402,6 +409,8 @@ int extFilter::initPort(uint8_t port, struct ether_addr *addr, bool no_promisc) return retval; } qconf->tx_queue_id[port] = nb_tx_queue++; + qconf->tx_port_id[qconf->n_tx_port] = port; + qconf->n_tx_port++; } retval = rte_eth_dev_start(port); @@ -838,6 +847,10 @@ void extFilter::initialize(Application& self) _lcore_params_array[_nb_lcore_params].queue_id = queue_id; _lcore_params_array[_nb_lcore_params].lcore_id = lcore_id; _lcore_params_array[_nb_lcore_params].mapto = mapto; + if(_operation_mode == OP_MODE_INLINE) + { + _lcore_conf[lcore_id].sender_port = mapto; + } _nb_lcore_params++; nb_lcores_per_port++; } @@ -862,13 +875,14 @@ void extFilter::initialize(Application& self) logger().fatal("Number of subscribers and networks ports not equal in the inline operation mode"); throw Poco::Exception("Configuration error"); } - // TODO fill array for determine in <-> out - // like: - // inout_ports[0] = 1 - // inout_ports[1] = 0 - // inout_ports[2] = 3 - // inout_ports[3] = 2 - // then we can extract port for the output packets... + for(auto i = 0; i < _nb_lcore_params; i++) + { + if(_lcore_params_array[i].mapto > n_ports) + { + logger().fatal("Port %d mapped to the unexisting port %d", (int) _lcore_params_array[i].port_id, (int) _lcore_params_array[i].mapto); + throw Poco::Exception("Configuration error"); + } + } } initParams(); @@ -904,9 +918,6 @@ void extFilter::initialize(Application& self) logger().fatal("Unable to load blacklists"); throw Poco::Exception("Unable to load blacklists"); } - - // init value... - _tsc_hz = rte_get_tsc_hz(); } void extFilter::uninitialize() @@ -1012,6 +1023,31 @@ namespace } } +int extFilter::initDPIMemPools() +{ + std::string pool_name("DPIHTTPPool"); + logger().information("Create pool '%s' for the http dissector with number of entries: %u, element size %z size: %Lu bytes", pool_name, global_prm->memory_configs.http_entries, sizeof(http::http_req_buf),(uint64_t)(global_prm->memory_configs.http_entries * sizeof(http::http_req_buf))); + common_data->mempools.http_entries.entries = global_prm->memory_configs.http_entries; + struct rte_mempool *dpi_http_mempool = rte_mempool_create(pool_name.c_str(), global_prm->memory_configs.http_entries, sizeof(http::http_req_buf), 0, 0, NULL, NULL, NULL, NULL, 0, 0); + common_data->mempools.http_entries.mempool = dpi_http_mempool; + if(dpi_http_mempool == nullptr) + { + logger().fatal("Unable to create mempool for the http dissector."); + return -1; + } + + pool_name.assign("DPISSLPool"); + logger().information("Create pool '%s' for the ssl dissector with number of entries: %u, element size %z size: %Lu bytes", pool_name, global_prm->memory_configs.ssl_entries, sizeof(ssl_state),(uint64_t)(global_prm->memory_configs.ssl_entries * sizeof(ssl_state))); + common_data->mempools.ssl_entries.entries = global_prm->memory_configs.ssl_entries; + struct rte_mempool *dpi_ssl_mempool = rte_mempool_create(pool_name.c_str(), global_prm->memory_configs.ssl_entries, sizeof(ssl_state), 0, 0, NULL, NULL, NULL, NULL, 0, 0); + common_data->mempools.ssl_entries.mempool = dpi_ssl_mempool; + if(dpi_ssl_mempool == nullptr) + { + logger().fatal("Unable to create mempool for the ssl dissector."); + return -1; + } + return 0; +} int extFilter::main(const ArgVec& args) { @@ -1041,6 +1077,11 @@ int extFilter::main(const ArgVec& args) struct rte_mempool *_mp = nullptr; struct rte_mempool *clone_pool = nullptr; + unsigned n = global_prm->workers_number*2; // 1 to client + 1 to server + n = 8192; + logger().information("Set number of entries of the sender buffer to %u", n); + _mp = rte_pktmbuf_pool_create("SenderBuffer", n, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); + std::vector ports; for (uint8_t portid = 0; portid < _nb_ports; portid++) { @@ -1053,10 +1094,6 @@ int extFilter::main(const ArgVec& args) logger().fatal("Cannot initialize port %d", (int) portid); return Poco::Util::Application::EXIT_CONFIG; } - unsigned n = global_prm->workers_number*2; // 1 to client + 1 to server - n = 8192; - logger().information("Set number of entries of the sender buffer to %u", n); - _mp = rte_pktmbuf_pool_create("SenderBuffer", n, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id()); if(_mp == nullptr) { logger().fatal("Unable to allocate mempool for sender: %d", (int) rte_errno); @@ -1081,6 +1118,10 @@ int extFilter::main(const ArgVec& args) ports.push_back(portid); } + if(initDPIMemPools()) + { + return Poco::Util::Application::EXIT_CONFIG; + } WorkerConfig workerConfigArr[RTE_MAX_LCORE]; Poco::TaskManager tm; @@ -1088,25 +1129,6 @@ int extFilter::main(const ArgVec& args) // NotifyManager *nm = new NotifyManager(20000, _notify_groups); // tm.start(nm); - - std::string pool_name("DPIHTTPPool"); - logger().information("Create pool '%s' for the http dissector with number of entries: %u, element size %z size: %Lu bytes", pool_name, global_prm->memory_configs.http_entries, sizeof(http::http_req_buf),(uint64_t)(global_prm->memory_configs.http_entries * sizeof(http::http_req_buf))); - struct rte_mempool *dpi_http_mempool = rte_mempool_create(pool_name.c_str(), global_prm->memory_configs.http_entries, sizeof(http::http_req_buf), 0, 0, NULL, NULL, NULL, NULL, 0, 0); - if(dpi_http_mempool == nullptr) - { - logger().fatal("Unable to create mempool for the http dissector."); - return Poco::Util::Application::EXIT_CONFIG; - } - - pool_name.assign("DPISSLPool"); - logger().information("Create pool '%s' for the ssl dissector with number of entries: %u, element size %z size: %Lu bytes", pool_name, global_prm->memory_configs.ssl_entries, sizeof(ssl_state),(uint64_t)(global_prm->memory_configs.ssl_entries * sizeof(ssl_state))); - struct rte_mempool *dpi_ssl_mempool = rte_mempool_create(pool_name.c_str(), global_prm->memory_configs.ssl_entries, sizeof(ssl_state), 0, 0, NULL, NULL, NULL, NULL, 0, 0); - if(dpi_ssl_mempool == nullptr) - { - logger().fatal("Unable to create mempool for the ssl dissector."); - return Poco::Util::Application::EXIT_CONFIG; - } - initFlowStorages(); uint8_t worker_id = 0; uint16_t tx_queue_id = 0; @@ -1163,8 +1185,11 @@ int extFilter::main(const ArgVec& args) prms.answer_duplication = global_prm->answer_duplication; prms.clone_pool = clone_pool; } - - WorkerThread* newWorker = new WorkerThread(worker_id, workerName, workerConfigArr[worker_id], dpi_state, rte_lcore_to_socket_id(lcore_id), prms, _mp, dpi_http_mempool, dpi_ssl_mempool); + WorkerThread* newWorker; + if(_operation_mode == OP_MODE_MIRROR) + newWorker = new WorkerThread(worker_id, workerName, workerConfigArr[worker_id], dpi_state, prms, _mp); + else + newWorker = new BWorkerThread(worker_id, workerName, workerConfigArr[worker_id], dpi_state, prms, _mp); int err = rte_eal_remote_launch(dpdkWorkerThreadStart, newWorker, lcore_id); if (err != 0) diff --git a/src/sender.cpp b/src/sender.cpp index 3b1a1c2..0e57c03 100644 --- a/src/sender.cpp +++ b/src/sender.cpp @@ -24,24 +24,6 @@ #include #include "worker.h" -struct pseudo_header -{ - u_int32_t source_address; - u_int32_t dest_address; - u_int8_t placeholder; - u_int8_t protocol; - u_int16_t tcp_length; -}; - -struct ipv6_pseudo_hdr -{ - struct in6_addr source_address; - struct in6_addr dest_address; - u_int32_t tcp_length; - u_int32_t zero: 24, - nexthdr: 8; -}; - BSender::BSender(const char *cn, struct params &prm) : _logger(Poco::Logger::get(cn)), _parameters(prm) { pkt_id = 1; @@ -222,15 +204,20 @@ int DSender::Send(uint8_t *buffer, int size, void *addr, int addr_size) return pkt_size; } -ESender::ESender(struct nparams &prm, uint8_t port, struct rte_mempool *mp, WorkerThread *wt) : BSender("ESender", prm.params), +ESender::ESender(struct nparams &prm, uint8_t port, struct rte_mempool *mp, WorkerThread *wt, bool keep_l2_hdr) : BSender("ESender", prm.params), _port(port), _mp(mp), _clone_pool(prm.clone_pool), _wt(wt), - _answer_duplication(prm.answer_duplication) + _answer_duplication(prm.answer_duplication), + _keep_l2_hdr(keep_l2_hdr) { rte_memcpy(&_eth_hdr.s_addr, prm.mac, 6); rte_memcpy(&_eth_hdr.d_addr, prm.to_mac, 6); + _eth_hdr.ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); + rte_memcpy(&_eth_hdr_ipv6.s_addr, prm.mac, 6); + rte_memcpy(&_eth_hdr_ipv6.d_addr, prm.to_mac, 6); + _eth_hdr_ipv6.ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv6); } ESender::~ESender() @@ -277,7 +264,7 @@ void ESender::sendPacket(void *ip_from, void *ip_to, int ip_ver, int port_from, } -void ESender::sendPacketIPv4(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server) +void ESender::sendPacketIPv4(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server) { struct rte_mbuf *pkt = rte_pktmbuf_alloc(_mp); if(unlikely(pkt == nullptr)) @@ -285,35 +272,31 @@ void ESender::sendPacketIPv4(const uint8_t *l3_pkt, uint32_t acknum, uint32_t se _logger.error("Unable to allocate buffer for the packet"); return; } - struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(pkt, struct ether_hdr *); - uint8_t *pkt_buf = ((uint8_t *)eth_hdr + sizeof(struct ether_hdr)); - int pkt_len = makeSwapPacketIPv4(l3_pkt, acknum, seqnum, dt_buf, dt_len, f_reset, f_psh, pkt_buf) + sizeof(struct ether_hdr); - pkt->data_len = pkt_len; - pkt->pkt_len = pkt_len; - ether_addr_copy(&_eth_hdr.s_addr, ð_hdr->s_addr); - ether_addr_copy(&_eth_hdr.d_addr, ð_hdr->d_addr); - eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4); - struct ipv4_hdr *ip_hdr = (struct ipv4_hdr *) pkt_buf; -/* ip_hdr->hdr_checksum = 0; - pkt->ol_flags |= PKT_TX_IPV4 | PKT_TX_IP_CKSUM;*/ - ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr); + makeSwapPacketIPv4(pkt_infos, acknum, seqnum, dt_buf, dt_len, f_reset, f_psh, pkt, to_server); if(likely(_wt->_n_send_pkts < EXTFILTER_WORKER_BURST_SIZE)) { - if(!to_server && _answer_duplication > 0 && _wt->_n_send_pkts+_answer_duplication < EXTFILTER_WORKER_BURST_SIZE) + if(!_keep_l2_hdr && !to_server && _answer_duplication > 0 && _wt->_n_send_pkts+_answer_duplication < EXTFILTER_WORKER_BURST_SIZE) { for(uint8_t z = 0; z < _answer_duplication; z++) { struct rte_mbuf *clone = rte_pktmbuf_clone(pkt, _clone_pool); if(clone != nullptr) - _wt->_sender_buf[_wt->_n_send_pkts++] = clone; + { + _wt->_sender_buf[_wt->_n_send_pkts] = clone; + _wt->_sender_buf_flags[_wt->_n_send_pkts] = false; + _wt->_n_send_pkts++; + } else { _logger.error("Unable to create clone packet."); break; } } } - _wt->_sender_buf[_wt->_n_send_pkts++] = pkt; + _wt->_sender_buf[_wt->_n_send_pkts] = pkt; + _wt->_sender_buf_flags[_wt->_n_send_pkts] = to_server; + _wt->_n_send_pkts++; + } else { _logger.error("Can't send packet. Buffer is full."); rte_pktmbuf_free(pkt); @@ -322,7 +305,7 @@ void ESender::sendPacketIPv4(const uint8_t *l3_pkt, uint32_t acknum, uint32_t se return; } -void ESender::sendPacketIPv6(const uint8_t *l3_pkt, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server) +void ESender::sendPacketIPv6(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, const char *dt_buf, size_t dt_len, bool f_reset, bool f_psh, bool to_server) { struct rte_mbuf *pkt = rte_pktmbuf_alloc(_mp); if(unlikely(pkt == nullptr)) @@ -330,31 +313,29 @@ void ESender::sendPacketIPv6(const uint8_t *l3_pkt, uint32_t acknum, uint32_t se _logger.error("Unable to allocate buffer for the packet"); return; } - struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(pkt, struct ether_hdr *); - uint8_t *pkt_buf = ((uint8_t *)eth_hdr + sizeof(struct ether_hdr)); - int pkt_len = makeSwapPacketIPv6(l3_pkt, acknum, seqnum, dt_buf, dt_len, f_reset, f_psh, pkt_buf) + sizeof(struct ether_hdr); - pkt->data_len = pkt_len; - pkt->pkt_len = pkt_len; - ether_addr_copy(&_eth_hdr.s_addr, ð_hdr->s_addr); - ether_addr_copy(&_eth_hdr.d_addr, ð_hdr->d_addr); - eth_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv6); - + makeSwapPacketIPv6(pkt_infos, acknum, seqnum, dt_buf, dt_len, f_reset, f_psh, pkt, to_server); if(likely(_wt->_n_send_pkts < EXTFILTER_WORKER_BURST_SIZE)) { - if(!to_server && _answer_duplication > 0 && _wt->_n_send_pkts+_answer_duplication < EXTFILTER_WORKER_BURST_SIZE) + if(!_keep_l2_hdr && !to_server && _answer_duplication > 0 && _wt->_n_send_pkts+_answer_duplication < EXTFILTER_WORKER_BURST_SIZE) { for(uint8_t z = 0; z < _answer_duplication; z++) { struct rte_mbuf *clone = rte_pktmbuf_clone(pkt, _clone_pool); if(clone != nullptr) - _wt->_sender_buf[_wt->_n_send_pkts++] = clone; + { + _wt->_sender_buf[_wt->_n_send_pkts] = clone; + _wt->_sender_buf_flags[_wt->_n_send_pkts] = false; + _wt->_n_send_pkts++; + } else { _logger.error("Unable to create clone packet."); break; } } } - _wt->_sender_buf[_wt->_n_send_pkts++] = pkt; + _wt->_sender_buf[_wt->_n_send_pkts] = pkt; + _wt->_sender_buf_flags[_wt->_n_send_pkts] = to_server; + _wt->_n_send_pkts++; } else { _logger.error("Can't send packet. Buffer is full."); rte_pktmbuf_free(pkt); @@ -363,7 +344,7 @@ void ESender::sendPacketIPv6(const uint8_t *l3_pkt, uint32_t acknum, uint32_t se return; } -void ESender::HTTPRedirectIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh, const char *redir_url, size_t r_len) +void ESender::HTTPRedirectIPv4(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, bool f_psh, const char *redir_url, size_t r_len) { char payload[OUR_PAYLOAD_SIZE]; size_t payload_size = sizeof(f_lines) - 1; @@ -377,22 +358,22 @@ void ESender::HTTPRedirectIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seq payload_size = sizeof(r_line1) - 1 + sizeof(r_line2) - 1 + sizeof(r_line3) - 1 + r_len; payload_ptr = payload; } - sendPacketIPv4(pkt, acknum, seqnum, payload_ptr, payload_size, false, f_psh); + sendPacketIPv4(pkt_infos, acknum, seqnum, payload_ptr, payload_size, false, f_psh); // sendPacketIPv4(pkt, rte_cpu_to_be_32(rte_be_to_cpu_32(acknum) + payload_size), 0, nullptr, 0, true, false); // send rst... // And reset session with server, if needed if(_parameters.send_rst_to_server) - this->sendPacketIPv4(pkt, seqnum, acknum, nullptr, 0, 1, 0, true); + this->sendPacketIPv4(pkt_infos, seqnum, acknum, nullptr, 0, 1, 0, true); } -void ESender::HTTPForbiddenIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh) +void ESender::HTTPForbiddenIPv4(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, bool f_psh) { - sendPacketIPv4(pkt, acknum, seqnum, f_lines, sizeof(f_lines)-1, 0, f_psh); + sendPacketIPv4(pkt_infos, acknum, seqnum, f_lines, sizeof(f_lines)-1, 0, f_psh); // And reset session with server, if needed if(_parameters.send_rst_to_server) - this->sendPacketIPv4(pkt, seqnum, acknum, nullptr, 0, 1, 0, true); + this->sendPacketIPv4(pkt_infos, seqnum, acknum, nullptr, 0, 1, 0, true); } -void ESender::HTTPRedirectIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh, const char *redir_url, size_t r_len) +void ESender::HTTPRedirectIPv6(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, bool f_psh, const char *redir_url, size_t r_len) { char payload[OUR_PAYLOAD_SIZE]; size_t payload_size = sizeof(f_lines) - 1; @@ -406,36 +387,36 @@ void ESender::HTTPRedirectIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seq payload_size = sizeof(r_line1) - 1 + sizeof(r_line2) - 1 + sizeof(r_line3) - 1 + r_len; payload_ptr = payload; } - sendPacketIPv6(pkt, acknum, seqnum, payload_ptr, payload_size, false, f_psh); + sendPacketIPv6(pkt_infos, acknum, seqnum, payload_ptr, payload_size, false, f_psh); // sendPacketIPv6(pkt, rte_cpu_to_be_32(rte_be_to_cpu_32(acknum) + payload_size), 0, nullptr, 0, true, false); // send rst... // And reset session with server, if needed if(_parameters.send_rst_to_server) - sendPacketIPv6(pkt, seqnum, acknum, nullptr, 0, 1, 0, true); + sendPacketIPv6(pkt_infos, seqnum, acknum, nullptr, 0, 1, 0, true); } -void ESender::HTTPForbiddenIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum, bool f_psh) +void ESender::HTTPForbiddenIPv6(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum, bool f_psh) { - sendPacketIPv6(pkt, acknum, seqnum, f_lines, sizeof(f_lines)-1, 0, f_psh); + sendPacketIPv6(pkt_infos, acknum, seqnum, f_lines, sizeof(f_lines)-1, 0, f_psh); // And reset session with server, if needed if(_parameters.send_rst_to_server) - this->sendPacketIPv6(pkt, seqnum, acknum, nullptr, 0, 1, 0, true); + this->sendPacketIPv6(pkt_infos, seqnum, acknum, nullptr, 0, 1, 0, true); } -void ESender::SendRSTIPv4(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum) +void ESender::SendRSTIPv4(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum) { // send rst to the client - sendPacketIPv4(pkt, acknum, seqnum, nullptr, 0, true, false); + sendPacketIPv4(pkt_infos, acknum, seqnum, nullptr, 0, true, false); // send rst to the server if(_parameters.send_rst_to_server) - sendPacketIPv4(pkt, seqnum, acknum, nullptr, 0, true, false, true); + sendPacketIPv4(pkt_infos, seqnum, acknum, nullptr, 0, true, false, true); } -void ESender::SendRSTIPv6(const uint8_t *pkt, uint32_t acknum, uint32_t seqnum) +void ESender::SendRSTIPv6(dpi_pkt_infos_t *pkt_infos, uint32_t acknum, uint32_t seqnum) { // send rst to the client - sendPacketIPv6(pkt, acknum, seqnum, nullptr, 0, true, false); + sendPacketIPv6(pkt_infos, acknum, seqnum, nullptr, 0, true, false); // send rst to the server if(_parameters.send_rst_to_server) - sendPacketIPv6(pkt, seqnum, acknum, nullptr, 0, true, false, true); + sendPacketIPv6(pkt_infos, seqnum, acknum, nullptr, 0, true, false, true); } diff --git a/src/worker.cpp b/src/worker.cpp index 76ee215..72672ac 100644 --- a/src/worker.cpp +++ b/src/worker.cpp @@ -69,7 +69,7 @@ int on_header_complete_ext(http_parser* p, dpi_pkt_infos_t* pkt_informations, vo return 1; // no need to check body... } -WorkerThread::WorkerThread(uint8_t worker_id,const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, int socketid, struct ESender::nparams &sp, struct rte_mempool *mp, struct rte_mempool *dpi_http_mempool, struct rte_mempool *dpi_ssl_mempool) : +WorkerThread::WorkerThread(uint8_t worker_id,const std::string& name, WorkerConfig &workerConfig, dpi_library_state_t* state, struct ESender::nparams &sp, struct rte_mempool *mp) : m_WorkerConfig(workerConfig), m_Stop(true), _logger(Poco::Logger::get(name)), dpi_state(state), @@ -93,12 +93,12 @@ WorkerThread::WorkerThread(uint8_t worker_id,const std::string& name, WorkerConf if(mp != nullptr) { // setup sender - _snd = new ESender(sp, m_WorkerConfig.sender_port, mp, this); + _snd = new ESender(sp, m_WorkerConfig.sender_port, mp, this, global_prm->operation_mode == OP_MODE_INLINE ? true : false); } else { throw Poco::Exception("ESender is null!"); } - _dpi_http_mempool = dpi_http_mempool; - _dpi_ssl_mempool = dpi_ssl_mempool; + _dpi_http_mempool = common_data->mempools.http_entries.mempool; + _dpi_ssl_mempool = common_data->mempools.ssl_entries.mempool; } WorkerThread::~WorkerThread() @@ -116,11 +116,10 @@ bool WorkerThread::checkSNIBlocked(const char *sni, size_t sni_len, dpi_pkt_info m_ThreadStats.matched_ssl_sni++; if(pkt->ip_version == 4) { - _snd->SendRSTIPv4(pkt->pkt, tcph->ack_seq, tcph->seq); + _snd->SendRSTIPv4(pkt, tcph->ack_seq, tcph->seq); m_ThreadStats.sended_rst_ipv4++; } else { - - _snd->SendRSTIPv6(pkt->pkt, tcph->ack_seq, tcph->seq); + _snd->SendRSTIPv6(pkt, tcph->ack_seq, tcph->seq); m_ThreadStats.sended_rst_ipv6++; } return true; @@ -139,22 +138,22 @@ bool WorkerThread::checkURLBlocked(const char *host, size_t host_len, const char { if(pkt->ip_version == 4) { - _snd->HTTPRedirectIPv4(pkt->pkt, tcph->ack_seq, rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), true, redir_url, redir_size); + _snd->HTTPRedirectIPv4(pkt, tcph->ack_seq, rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), true, redir_url, redir_size); m_ThreadStats.matched_http_bl_ipv4++; m_ThreadStats.redirected_http_bl_ipv4++; } else { - _snd->HTTPRedirectIPv6(pkt->pkt, tcph->ack_seq, rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), true, redir_url, redir_size); + _snd->HTTPRedirectIPv6(pkt, tcph->ack_seq, rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), true, redir_url, redir_size); m_ThreadStats.matched_http_bl_ipv6++; m_ThreadStats.redirected_http_bl_ipv6++; } } else { if(pkt->ip_version == 4) { - _snd->HTTPForbiddenIPv4(pkt->pkt, tcph->ack_seq, rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), true); + _snd->HTTPForbiddenIPv4(pkt, tcph->ack_seq, rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), true); m_ThreadStats.sended_forbidden_ipv4++; m_ThreadStats.matched_http_bl_ipv4++; } else { - _snd->HTTPForbiddenIPv6(pkt->pkt, tcph->ack_seq, rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), true); + _snd->HTTPForbiddenIPv6(pkt, tcph->ack_seq, rte_cpu_to_be_32(rte_be_to_cpu_32(tcph->seq)+pkt->data_length), true); m_ThreadStats.sended_forbidden_ipv6++; m_ThreadStats.matched_http_bl_ipv6++; } @@ -164,7 +163,7 @@ bool WorkerThread::checkURLBlocked(const char *host, size_t host_len, const char return false; } -dpi_identification_result_t WorkerThread::identifyAppProtocol(const unsigned char* pkt, u_int32_t length, u_int32_t current_time, struct packet_info *pkt_info, uint32_t sig) +dpi_identification_result_t WorkerThread::identifyAppProtocol(const unsigned char* pkt, u_int32_t length, const uint8_t *l2_pkt, u_int32_t current_time, struct packet_info *pkt_info, uint32_t sig) { uint8_t *host_key = (uint8_t *)&pkt_info->keys; dpi_identification_result_t r; @@ -172,7 +171,7 @@ dpi_identification_result_t WorkerThread::identifyAppProtocol(const unsigned cha u_int8_t l3_status; r.status = dpi_parse_L3_L4_headers(dpi_state, pkt, length, &infos, current_time); - + infos.l2_pkt = l2_pkt; if(unlikely(r.status==DPI_STATUS_IP_FRAGMENT || r.status<0)) { return r; @@ -458,12 +457,12 @@ dpi_identification_result_t WorkerThread::getAppProtocol(uint8_t *host_key, uint switch (r.protocol.l7prot) { case DPI_PROTOCOL_TCP_SSL: - _snd->SendRSTIPv4(pkt_infos->pkt, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); + _snd->SendRSTIPv4(pkt_infos, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); m_ThreadStats.seen_already_blocked_ssl_ipv4++; m_ThreadStats.sended_rst_ipv4++; break; case DPI_PROTOCOL_TCP_HTTP: - _snd->SendRSTIPv4(pkt_infos->pkt, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); + _snd->SendRSTIPv4(pkt_infos, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); m_ThreadStats.seen_already_blocked_http_ipv4++; m_ThreadStats.sended_rst_ipv4++; break; @@ -484,12 +483,12 @@ dpi_identification_result_t WorkerThread::getAppProtocol(uint8_t *host_key, uint switch (r.protocol.l7prot) { case DPI_PROTOCOL_TCP_SSL: - _snd->SendRSTIPv6(pkt_infos->pkt, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); + _snd->SendRSTIPv6(pkt_infos, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); m_ThreadStats.seen_already_blocked_ssl_ipv6++; m_ThreadStats.sended_rst_ipv6++; break; case DPI_PROTOCOL_TCP_HTTP: - _snd->SendRSTIPv6(pkt_infos->pkt, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); + _snd->SendRSTIPv6(pkt_infos, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); m_ThreadStats.seen_already_blocked_http_ipv6++; m_ThreadStats.sended_rst_ipv6++; break; @@ -611,14 +610,17 @@ bool WorkerThread::analyzePacket(struct rte_mbuf* m, uint64_t timestamp) if(unlikely(payload_len > 0 && acl_action == ACL::ACL_DROP)) { m_ThreadStats.matched_ip_port++; + dpi_pkt_infos_t pkt_infos; + pkt_infos.pkt = l3; + pkt_infos.l2_pkt = rte_pktmbuf_mtod(m, const uint8_t *); if(ip_version == 4) { - _snd->SendRSTIPv4(l3, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); + _snd->SendRSTIPv4(&pkt_infos, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); m_ThreadStats.sended_rst_ipv4++; } else { - _snd->SendRSTIPv6(l3, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); + _snd->SendRSTIPv6(&pkt_infos, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); m_ThreadStats.sended_rst_ipv6++; } return true; @@ -626,7 +628,7 @@ bool WorkerThread::analyzePacket(struct rte_mbuf* m, uint64_t timestamp) dpi_identification_result_t r; - r = identifyAppProtocol(l3, ip_len, timestamp, (struct packet_info *)m->userdata, m->hash.usr); + r = identifyAppProtocol(l3, ip_len, rte_pktmbuf_mtod(m, const uint8_t *), timestamp, (struct packet_info *)m->userdata, m->hash.usr); switch (r.protocol.l7prot) { @@ -649,14 +651,17 @@ bool WorkerThread::analyzePacket(struct rte_mbuf* m, uint64_t timestamp) if(unlikely(r.protocol.l7prot == DPI_PROTOCOL_TCP_SSL && m_WorkerConfig.block_ssl_no_sni && acl_action == ACL::ACL_SSL)) { m_ThreadStats.matched_ssl_ip++; + dpi_pkt_infos_t pkt_infos; + pkt_infos.pkt = l3; + pkt_infos.l2_pkt = rte_pktmbuf_mtod(m, const uint8_t *); if(ip_version == 4) { - _snd->SendRSTIPv4(l3, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); + _snd->SendRSTIPv4(&pkt_infos, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); m_ThreadStats.sended_rst_ipv4++; } else { - _snd->SendRSTIPv6(l3, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); + _snd->SendRSTIPv6(&pkt_infos, /*acknum*/ tcph->ack_seq, /*seqnum*/ tcph->seq); m_ThreadStats.sended_rst_ipv6++; } return true; @@ -679,149 +684,6 @@ bool WorkerThread::analyzePacket(struct rte_mbuf* m, uint64_t timestamp) return false; } -static inline void parsePtype(struct rte_mbuf *m, struct packet_info *pkt_info) -{ - struct ether_hdr *eth_hdr; - uint32_t packet_type = RTE_PTYPE_UNKNOWN; - uint16_t ether_type; - uint8_t *l3; - int hdr_len; - struct ipv4_hdr *ipv4_hdr; - struct ipv6_hdr *ipv6_hdr; - - pkt_info->timestamp = rte_rdtsc(); // timestamp - - eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *); - ether_type = rte_be_to_cpu_16(eth_hdr->ether_type); - l3 = (uint8_t *)eth_hdr + sizeof(struct ether_hdr); - - if(ether_type == ETHER_TYPE_VLAN || ether_type == 0x8847) - { - while(1) - { - if(ether_type == ETHER_TYPE_VLAN) - { - struct vlan_hdr *vlan_hdr = (struct vlan_hdr *)(l3); - ether_type = rte_be_to_cpu_16(vlan_hdr->eth_proto); - l3 += sizeof(struct vlan_hdr); - } else if(ether_type == 0x8847) - { - uint8_t bos = ((uint8_t *)l3)[2] & 0x1; - l3 += 4; - if(bos) - { - ether_type = ETHER_TYPE_IPv4; - break; - } - } else - break; - } - } - - if (ether_type == ETHER_TYPE_IPv4) - { - ipv4_hdr = (struct ipv4_hdr *)l3; - hdr_len = (ipv4_hdr->version_ihl & IPV4_HDR_IHL_MASK) * IPV4_IHL_MULTIPLIER; - if (hdr_len == sizeof(struct ipv4_hdr)) - { - packet_type |= RTE_PTYPE_L3_IPV4; - if (ipv4_hdr->next_proto_id == IPPROTO_TCP) - packet_type |= RTE_PTYPE_L4_TCP; - else if (ipv4_hdr->next_proto_id == IPPROTO_UDP) - packet_type |= RTE_PTYPE_L4_UDP; - } else - packet_type |= RTE_PTYPE_L3_IPV4_EXT; - } else if (ether_type == ETHER_TYPE_IPv6) - { - ipv6_hdr = (struct ipv6_hdr *)l3; - if (ipv6_hdr->proto == IPPROTO_TCP) - packet_type |= RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_TCP; - else if (ipv6_hdr->proto == IPPROTO_UDP) - packet_type |= RTE_PTYPE_L3_IPV6 | RTE_PTYPE_L4_UDP; - else - packet_type |= RTE_PTYPE_L3_IPV6_EXT_UNKNOWN; - } - m->packet_type = packet_type; - pkt_info->l3 = l3; - pkt_info->acl_res = ACL::ACL_DEFAULT_POLICY; - m->userdata = pkt_info; - uint32_t tcp_or_udp = packet_type & (RTE_PTYPE_L4_TCP | RTE_PTYPE_L4_UDP); - uint32_t l3_ptypes = packet_type & RTE_PTYPE_L3_MASK; - if(tcp_or_udp && (l3_ptypes == RTE_PTYPE_L3_IPV6)) - { - void *ipv6_hdr = l3 + offsetof(struct ipv6_hdr, payload_len); - void *data0 = ipv6_hdr; - void *data1 = ((uint8_t *)ipv6_hdr) + sizeof(xmm_t); - void *data2 = ((uint8_t *)ipv6_hdr) + sizeof(xmm_t) + sizeof(xmm_t); - pkt_info->keys.ipv6_key.xmm[0] = em_mask_key(data0, mask1.x); - pkt_info->keys.ipv6_key.xmm[1] = _mm_loadu_si128((__m128i *)(data1)); - pkt_info->keys.ipv6_key.xmm[2] = em_mask_key(data2, mask2.x); - m->hash.usr = ipv6_hash_crc(&pkt_info->keys.ipv6_key,0,0); - } else if (tcp_or_udp && (l3_ptypes == RTE_PTYPE_L3_IPV4)) - { - void *ipv4_hdr = l3 + offsetof(struct ipv4_hdr, time_to_live); - pkt_info->keys.ipv4_key.xmm = em_mask_key(ipv4_hdr, mask0.x); - m->hash.usr = ipv4_hash_crc(&pkt_info->keys.ipv4_key,0,0); - } -} - -/* - * Put one packet in acl_search struct according to the packet ol_flags - */ -static inline void prepare_one_packet(struct rte_mbuf** pkts_in, struct ACL::acl_search_t* acl, int index, struct packet_info *pkt_info) -{ - struct rte_mbuf* pkt = pkts_in[index]; - - parsePtype(pkt, pkt_info); - uint32_t l3_ptypes = pkt->packet_type & RTE_PTYPE_L3_MASK; - - // XXX we cannot filter non IP packet yet - if (l3_ptypes == RTE_PTYPE_L3_IPV4) - { - /* Fill acl structure */ - acl->data_ipv4[acl->num_ipv4] = ((struct packet_info *)pkt->userdata)->l3 + offsetof(struct ipv4_hdr, next_proto_id); - acl->m_ipv4[(acl->num_ipv4)++] = pkt; - } else if (l3_ptypes == RTE_PTYPE_L3_IPV6) - { - /* Fill acl structure */ - acl->data_ipv6[acl->num_ipv6] = ((struct packet_info *)pkt->userdata)->l3 + offsetof(struct ipv6_hdr, proto); - acl->m_ipv6[(acl->num_ipv6)++] = pkt; - } -} - -/* - * Loop through all packets and classify them if acl_search if possible. - */ -static inline void prepare_acl_parameter(struct rte_mbuf** pkts_in, struct ACL::acl_search_t* acl, int nb_rx, struct packet_info *pkt_infos) -{ - int i = 0, j = 0; - - acl->num_ipv4 = 0; - acl->num_ipv6 = 0; - -#define PREFETCH() \ - rte_prefetch0(rte_pktmbuf_mtod(pkts_in[i], void*)); \ - i++; \ - j++; - - // we prefetch0 packets 3 per 3 - switch (nb_rx % PREFETCH_OFFSET) { - while (nb_rx != i) { - case 0: - PREFETCH(); - case 2: - PREFETCH(); - case 1: - PREFETCH(); - while (j > 0) - { - prepare_one_packet(pkts_in, acl, i - j, &pkt_infos[i-j]); - --j; - } - } - } -} - bool WorkerThread::run(uint32_t coreId) { setCoreId(coreId);