From 636774b7b4356b8b99645a38d9de05e5f655473f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jerry=20Lundstr=C3=B6m?= Date: Wed, 25 Apr 2018 19:22:48 +0200 Subject: [PATCH] Producer - `examples/test_pcap_read.lua`: Add `-p` to test using producer interface - `filter.layer` Issue #41: Add producer interface - `output.null`: Change `run()` so it can consume all objects --- examples/test_pcap_read.lua | 226 ++++++++++++++++------- src/filter/layer.c | 348 +++++++++++++++++++++--------------- src/filter/layer.h | 14 ++ src/filter/layer.hh | 34 ++++ src/filter/layer.lua | 12 ++ src/output/null.c | 27 ++- src/output/null.lua | 7 +- 7 files changed, 452 insertions(+), 216 deletions(-) diff --git a/examples/test_pcap_read.lua b/examples/test_pcap_read.lua index fe8f3464..801a63f7 100755 --- a/examples/test_pcap_read.lua +++ b/examples/test_pcap_read.lua @@ -3,6 +3,7 @@ local clock = require("dnsjit.lib.clock") local getopt = require("dnsjit.lib.getopt").new({ { "t", "thread", 0, "Test also with dnsjit.filter.thread, give number of threads to run", "?" }, { "l", "layer", false, "Test also with dnsjit.filter.layer", "?" }, + { "p", "producer", false, "Test with the producer interface rather then receiver interface", "?" }, }) local pcap, runs = unpack(getopt:parse()) if getopt:val("help") then @@ -15,7 +16,11 @@ if pcap == nil then return end -inputs = { "fpcap", "mmpcap", "pcap", "pcapthread" } +if getopt:val("p") then + inputs = { "fpcap", "mmpcap", "pcap" } +else + inputs = { "fpcap", "mmpcap", "pcap", "pcapthread" } +end result = {} results = {} highest = nil @@ -26,74 +31,69 @@ else runs = tonumber(runs) end -for _, name in pairs(inputs) do - rt = 0.0 - p = 0 +if getopt:val("p") then + for _, name in pairs(inputs) do + rt = 0.0 + p = 0 - print("run", name) - for n = 1, runs do - t = nil - tos = nil - if getopt:val("t") > 1 then - local nn - tos = {} - o = require("dnsjit.filter.thread").new() - for nn = 1, getopt:val("t") do - local oo = require("dnsjit.output.null").new() - o:receiver(oo) - table.insert(tos, oo) - end - o:start() - t = o - else - o = require("dnsjit.output.null").new() - end - i = require("dnsjit.input."..name).new() - if name == "pcap" then - i:open_offline(pcap) - if getopt:val("l") then - f = require("dnsjit.filter.layer").new() - f:receiver(o) - i:receiver(f) + print("run", name) + for n = 1, runs do + t = nil + tos = nil + -- if getopt:val("t") > 1 then + -- local nn + -- tos = {} + -- o = require("dnsjit.filter.thread").new() + -- for nn = 1, getopt:val("t") do + -- local oo = require("dnsjit.output.null").new() + -- o:receiver(oo) + -- table.insert(tos, oo) + -- end + -- o:start() + -- t = o + -- else + o = require("dnsjit.output.null").new() + -- end + i = require("dnsjit.input."..name).new() + if name == "pcap" then + i:open_offline(pcap) + if getopt:val("l") then + f = require("dnsjit.filter.layer").new() + f:producer(i) + o:producer(f) + else + o:producer(i) + end + ss, sns = clock:monotonic() + -- i:dispatch() + o:run(0) else - i:receiver(o) + -- if t then + -- i:use_shared(true) + -- end + i:open(pcap) + if getopt:val("l") then + f = require("dnsjit.filter.layer").new() + f:producer(i) + o:producer(f) + else + o:producer(i) + end + ss, sns = clock:monotonic() + -- i:run() + o:run(0) end - ss, sns = clock:monotonic() - i:dispatch() - elseif name == "pcapthread" then - i:open_offline(pcap) - i:receiver(o) - ss, sns = clock:monotonic() - i:run() - else if t then - i:use_shared(true) - end - i:open(pcap) - if getopt:val("l") then - f = require("dnsjit.filter.layer").new() - f:receiver(o) - i:receiver(f) - else - i:receiver(o) + t:stop() end - ss, sns = clock:monotonic() - i:run() - end - if t then - t:stop() - end - es, ens = clock:monotonic() + es, ens = clock:monotonic() - if es > ss then - rt = rt + ((es - ss) - 1) + ((1000000000 - sns + ens)/1000000000) - elseif es == ss and ens > sns then - rt = rt + (ens - sns) / 1000000000 - end + if es > ss then + rt = rt + ((es - ss) - 1) + ((1000000000 - sns + ens)/1000000000) + elseif es == ss and ens > sns then + rt = rt + (ens - sns) / 1000000000 + end - if name == "pcapthread" then - p = p + i:packets() - else if tos then for _, oo in pairs(tos) do p = p + oo:packets() @@ -102,16 +102,104 @@ for _, name in pairs(inputs) do p = p + o:packets() end end + + result[name] = { + rt = rt, + p = p + } + if highest == nil or rt > result[highest].rt then + highest = name + end + table.insert(results, name) end +else + for _, name in pairs(inputs) do + rt = 0.0 + p = 0 - result[name] = { - rt = rt, - p = p - } - if highest == nil or rt > result[highest].rt then - highest = name + print("run", name) + for n = 1, runs do + t = nil + tos = nil + if getopt:val("t") > 1 then + local nn + tos = {} + o = require("dnsjit.filter.thread").new() + for nn = 1, getopt:val("t") do + local oo = require("dnsjit.output.null").new() + o:receiver(oo) + table.insert(tos, oo) + end + o:start() + t = o + else + o = require("dnsjit.output.null").new() + end + i = require("dnsjit.input."..name).new() + if name == "pcap" then + i:open_offline(pcap) + if getopt:val("l") then + f = require("dnsjit.filter.layer").new() + f:receiver(o) + i:receiver(f) + else + i:receiver(o) + end + ss, sns = clock:monotonic() + i:dispatch() + elseif name == "pcapthread" then + i:open_offline(pcap) + i:receiver(o) + ss, sns = clock:monotonic() + i:run() + else + if t then + i:use_shared(true) + end + i:open(pcap) + if getopt:val("l") then + f = require("dnsjit.filter.layer").new() + f:receiver(o) + i:receiver(f) + else + i:receiver(o) + end + ss, sns = clock:monotonic() + i:run() + end + if t then + t:stop() + end + es, ens = clock:monotonic() + + if es > ss then + rt = rt + ((es - ss) - 1) + ((1000000000 - sns + ens)/1000000000) + elseif es == ss and ens > sns then + rt = rt + (ens - sns) / 1000000000 + end + + if name == "pcapthread" then + p = p + i:packets() + else + if tos then + for _, oo in pairs(tos) do + p = p + oo:packets() + end + else + p = p + o:packets() + end + end + end + + result[name] = { + rt = rt, + p = p + } + if highest == nil or rt > result[highest].rt then + highest = name + end + table.insert(results, name) end - table.insert(results, name) end print("name", "runtime", "pps", "x", "pkts") diff --git a/src/filter/layer.c b/src/filter/layer.c index e5d3b7a3..129552b4 100644 --- a/src/filter/layer.c +++ b/src/filter/layer.c @@ -21,19 +21,6 @@ #include "config.h" #include "filter/layer.h" -#include "core/object/pcap.h" -#include "core/object/null.h" -#include "core/object/ether.h" -#include "core/object/loop.h" -#include "core/object/linuxsll.h" -#include "core/object/ieee802.h" -#include "core/object/ip.h" -#include "core/object/ip6.h" -#include "core/object/gre.h" -#include "core/object/icmp.h" -#include "core/object/icmp6.h" -#include "core/object/udp.h" -#include "core/object/tcp.h" #include #include @@ -49,10 +36,26 @@ #include #endif +#define N_IEEE802 3 + static core_log_t _log = LOG_T_INIT("filter.layer"); static filter_layer_t _defaults = { LOG_T_INIT_OBJ("filter.layer"), - 0, 0 + 0, 0, + 0, 0, 0, + 0, + CORE_OBJECT_NULL_INIT(0), + CORE_OBJECT_ETHER_INIT(0), + CORE_OBJECT_LOOP_INIT(0), + CORE_OBJECT_LINUXSLL_INIT(0), + 0, { CORE_OBJECT_IEEE802_INIT(0), CORE_OBJECT_IEEE802_INIT(0), CORE_OBJECT_IEEE802_INIT(0) }, + CORE_OBJECT_IP_INIT(0), + CORE_OBJECT_IP6_INIT(0), + CORE_OBJECT_GRE_INIT(0), + CORE_OBJECT_ICMP_INIT(0), + CORE_OBJECT_ICMP6_INIT(0), + CORE_OBJECT_UDP_INIT(0), + CORE_OBJECT_TCP_INIT(0) }; core_log_t* filter_layer_log() @@ -154,14 +157,15 @@ static int _proto(filter_layer_t* self, uint8_t proto, const core_object_t* obj, { switch (proto) { case IPPROTO_GRE: { - core_object_gre_t gre = CORE_OBJECT_GRE_INIT(obj); + core_object_gre_t* gre = &self->gre; + gre->obj_prev = obj; - need16(gre.gre_flags, pkt, len); - need16(gre.ether_type, pkt, len); + need16(gre->gre_flags, pkt, len); + need16(gre->ether_type, pkt, len); /* TODO: Incomplete, check RFC 1701 */ - self->recv(self->ctx, (core_object_t*)&gre); + self->produced = (core_object_t*)gre; // if (gre.gre_flags & 0x1) { // need16(gre.checksum, pkt, len); @@ -184,64 +188,70 @@ static int _proto(filter_layer_t* self, uint8_t proto, const core_object_t* obj, break; } case IPPROTO_ICMP: { - core_object_icmp_t icmp = CORE_OBJECT_ICMP_INIT(obj); + core_object_icmp_t* icmp = &self->icmp; + icmp->obj_prev = obj; - need8(icmp.type, pkt, len); - need8(icmp.code, pkt, len); - need16(icmp.cksum, pkt, len); + need8(icmp->type, pkt, len); + need8(icmp->code, pkt, len); + need16(icmp->cksum, pkt, len); - self->recv(self->ctx, (core_object_t*)&icmp); + self->produced = (core_object_t*)icmp; break; } case IPPROTO_ICMPV6: { - core_object_icmp6_t icmp6 = CORE_OBJECT_ICMP_INIT(obj); + core_object_icmp6_t* icmp6 = &self->icmp6; + icmp6->obj_prev = obj; - need8(icmp6.type, pkt, len); - need8(icmp6.code, pkt, len); - need16(icmp6.cksum, pkt, len); + need8(icmp6->type, pkt, len); + need8(icmp6->code, pkt, len); + need16(icmp6->cksum, pkt, len); - self->recv(self->ctx, (core_object_t*)&icmp6); + self->produced = (core_object_t*)icmp6; break; } case IPPROTO_UDP: { - core_object_udp_t udp = CORE_OBJECT_UDP_INIT(obj); + core_object_udp_t* udp = &self->udp; + udp->obj_prev = obj; - need16(udp.sport, pkt, len); - need16(udp.dport, pkt, len); - need16(udp.ulen, pkt, len); - need16(udp.sum, pkt, len); + need16(udp->sport, pkt, len); + need16(udp->dport, pkt, len); + need16(udp->ulen, pkt, len); + need16(udp->sum, pkt, len); - udp.payload = (uint8_t*)pkt; - udp.len = len; + udp->payload = (uint8_t*)pkt; + udp->len = len; - self->recv(self->ctx, (core_object_t*)&udp); + self->produced = (core_object_t*)udp; break; } case IPPROTO_TCP: { - core_object_tcp_t tcp = CORE_OBJECT_TCP_INIT(obj); - - need16(tcp.sport, pkt, len); - need16(tcp.dport, pkt, len); - need32(tcp.seq, pkt, len); - need32(tcp.ack, pkt, len); - need4x2(tcp.off, tcp.x2, pkt, len); - need8(tcp.flags, pkt, len); - need16(tcp.win, pkt, len); - need16(tcp.sum, pkt, len); - need16(tcp.urp, pkt, len); - if (tcp.off > 5) { - tcp.opts_len = (tcp.off - 5) * 4; - needxb(tcp.opts, tcp.opts_len, pkt, len); + core_object_tcp_t* tcp = &self->tcp; + tcp->obj_prev = obj; + + need16(tcp->sport, pkt, len); + need16(tcp->dport, pkt, len); + need32(tcp->seq, pkt, len); + need32(tcp->ack, pkt, len); + need4x2(tcp->off, tcp->x2, pkt, len); + need8(tcp->flags, pkt, len); + need16(tcp->win, pkt, len); + need16(tcp->sum, pkt, len); + need16(tcp->urp, pkt, len); + if (tcp->off > 5) { + tcp->opts_len = (tcp->off - 5) * 4; + needxb(tcp->opts, tcp->opts_len, pkt, len); + } else { + tcp->opts_len = 0; } - tcp.payload = (uint8_t*)pkt; - tcp.len = len; + tcp->payload = (uint8_t*)pkt; + tcp->len = len; - self->recv(self->ctx, (core_object_t*)&tcp); + self->produced = (core_object_t*)tcp; break; } default: - self->recv(self->ctx, obj); + self->produced = obj; break; } @@ -253,76 +263,79 @@ static int _ip(filter_layer_t* self, const core_object_t* obj, const unsigned ch if (len) { switch ((*pkt >> 4)) { case 4: { - core_object_ip_t ip = CORE_OBJECT_IP_INIT(obj); - - need4x2(ip.v, ip.hl, pkt, len); - need8(ip.tos, pkt, len); - need16(ip.len, pkt, len); - need16(ip.id, pkt, len); - need16(ip.off, pkt, len); - need8(ip.ttl, pkt, len); - need8(ip.p, pkt, len); - need16(ip.sum, pkt, len); - needxb(&ip.src, 4, pkt, len); - needxb(&ip.dst, 4, pkt, len); + core_object_ip_t* ip = &self->ip; + ip->obj_prev = obj; + + need4x2(ip->v, ip->hl, pkt, len); + need8(ip->tos, pkt, len); + need16(ip->len, pkt, len); + need16(ip->id, pkt, len); + need16(ip->off, pkt, len); + need8(ip->ttl, pkt, len); + need8(ip->p, pkt, len); + need16(ip->sum, pkt, len); + needxb(&ip->src, 4, pkt, len); + needxb(&ip->dst, 4, pkt, len); /* TODO: IPv4 options */ - if (ip.hl < 5) + if (ip->hl < 5) break; - if (ip.hl > 5) { - advancexb((ip.hl - 5) * 4, pkt, len); + if (ip->hl > 5) { + advancexb((ip->hl - 5) * 4, pkt, len); } /* Check reported length for missing payload or padding */ - if (ip.len < (ip.hl * 4)) { + if (ip->len < (ip->hl * 4)) { break; } - if (len < (ip.len - (ip.hl * 4))) { + if (len < (ip->len - (ip->hl * 4))) { break; } - if (len > (ip.len - (ip.hl * 4))) { - // TODO: Padding - // layer_trace("have_ippadding"); - // packet->ippadding = len - (ip.len - (ip.hl * 4)); - // packet->have_ippadding = 1; - // len -= packet->ippadding; - } - - if (ip.off & 0x2000 || ip.off & 0x1fff) { - ip.payload = (uint8_t*)pkt; - ip.plen = len; - self->recv(self->ctx, (core_object_t*)&ip); + // TODO: Padding + // if (len > (ip->len - (ip->hl * 4))) { + // layer_trace("have_ippadding"); + // packet->ippadding = len - (ip->len - (ip->hl * 4)); + // packet->have_ippadding = 1; + // len -= packet->ippadding; + // } + + if (ip->off & 0x2000 || ip->off & 0x1fff) { + ip->payload = (uint8_t*)pkt; + ip->plen = len; + self->produced = (core_object_t*)ip; return 0; } - return _proto(self, ip.p, (core_object_t*)&ip, pkt, len); + return _proto(self, ip->p, (core_object_t*)ip, pkt, len); } case 6: { - core_object_ip6_t ip6 = CORE_OBJECT_IP6_INIT(obj); - struct ip6_ext ext; - size_t already_advanced = 0; + core_object_ip6_t* ip6 = &self->ip6; + struct ip6_ext ext; + size_t already_advanced = 0; + + ip6->obj_prev = obj; - need32(ip6.flow, pkt, len); - need16(ip6.plen, pkt, len); - need8(ip6.nxt, pkt, len); - need8(ip6.hlim, pkt, len); - needxb(&ip6.src, 16, pkt, len); - needxb(&ip6.dst, 16, pkt, len); + need32(ip6->flow, pkt, len); + need16(ip6->plen, pkt, len); + need8(ip6->nxt, pkt, len); + need8(ip6->hlim, pkt, len); + needxb(&ip6->src, 16, pkt, len); + needxb(&ip6->dst, 16, pkt, len); /* Check reported length for missing payload or padding */ - if (len < ip6.plen) { + if (len < ip6->plen) { break; } - if (len > ip6.plen) { + if (len > ip6->plen) { // TODO: Padding // layer_trace("have_ip6padding"); - // packet->ip6padding = len - ip6.ip6_plen; + // packet->ip6padding = len - ip6->ip6_plen; // packet->have_ip6padding = 1; // len -= packet->ip6padding; } - ext.ip6e_nxt = ip6.nxt; + ext.ip6e_nxt = ip6->nxt; ext.ip6e_len = 0; while (ext.ip6e_nxt != IPPROTO_NONE && ext.ip6e_nxt != IPPROTO_GRE @@ -402,45 +415,51 @@ static int _ip(filter_layer_t* self, const core_object_t* obj, const unsigned ch } if (ext.ip6e_nxt == IPPROTO_NONE || ext.ip6e_nxt == IPPROTO_FRAGMENT) { - ip6.payload = (uint8_t*)pkt; - ip6.len = len; - self->recv(self->ctx, (core_object_t*)&ip6); + ip6->payload = (uint8_t*)pkt; + ip6->len = len; + self->produced = (core_object_t*)ip6; return 0; } - return _proto(self, ext.ip6e_nxt, (core_object_t*)&ip6, pkt, len); + return _proto(self, ext.ip6e_nxt, (core_object_t*)ip6, pkt, len); } default: break; } } - self->recv(self->ctx, obj); + self->produced = obj; return 0; } static int _ieee802(filter_layer_t* self, uint16_t tpid, const core_object_t* obj, const unsigned char* pkt, size_t len) { - core_object_ieee802_t ieee802 = CORE_OBJECT_IEEE802_INIT(obj); - uint16_t tci; + core_object_ieee802_t* ieee802 = &self->ieee802[self->n_ieee802]; + uint16_t tci; + + ieee802->obj_prev = obj; for (;;) { - ieee802.tpid = tpid; + ieee802->tpid = tpid; need16(tci, pkt, len); - ieee802.pcp = (tci & 0xe000) >> 13; - ieee802.dei = (tci & 0x1000) >> 12; - ieee802.vid = tci & 0x0fff; - need16(ieee802.ether_type, pkt, len); + ieee802->pcp = (tci & 0xe000) >> 13; + ieee802->dei = (tci & 0x1000) >> 12; + ieee802->vid = tci & 0x0fff; + need16(ieee802->ether_type, pkt, len); - switch (ieee802.ether_type) { + switch (ieee802->ether_type) { case 0x88a8: /* 802.1ad */ case 0x9100: /* 802.1 QinQ non-standard */ - return _ieee802(self, ieee802.ether_type, (core_object_t*)&ieee802, pkt, len); + self->n_ieee802++; + if (self->n_ieee802 < N_IEEE802) { + return _ieee802(self, ieee802->ether_type, (core_object_t*)ieee802, pkt, len); + } + return 1; case ETHERTYPE_IP: case ETHERTYPE_IPV6: - return _ip(self, (core_object_t*)&ieee802, pkt, len); + return _ip(self, (core_object_t*)ieee802, pkt, len); default: break; @@ -448,7 +467,7 @@ static int _ieee802(filter_layer_t* self, uint16_t tpid, const core_object_t* ob break; } - self->recv(self->ctx, obj); + self->produced = obj; return 0; } @@ -458,25 +477,28 @@ static int _link(filter_layer_t* self, const core_object_pcap_t* pcap) const unsigned char* pkt; size_t len; + self->n_ieee802 = 0; + pkt = pcap->bytes; len = pcap->caplen; switch (pcap->linktype) { case DLT_NULL: { - core_object_null_t null = CORE_OBJECT_NULL_INIT(pcap); + core_object_null_t* null = &self->null; + null->obj_prev = (core_object_t*)pcap; if (pcap->is_swapped) { - needr32(null.family, pkt, len); + needr32(null->family, pkt, len); } else { - need32(null.family, pkt, len); + need32(null->family, pkt, len); } - switch (null.family) { + switch (null->family) { case 2: case 24: case 28: case 30: - return _ip(self, (core_object_t*)&null, pkt, len); + return _ip(self, (core_object_t*)null, pkt, len); default: break; @@ -484,21 +506,22 @@ static int _link(filter_layer_t* self, const core_object_pcap_t* pcap) break; } case DLT_EN10MB: { - core_object_ether_t ether = CORE_OBJECT_ETHER_INIT(pcap); + core_object_ether_t* ether = &self->ether; + ether->obj_prev = (core_object_t*)pcap; - needxb(ether.dhost, 6, pkt, len); - needxb(ether.shost, 6, pkt, len); - need16(ether.type, pkt, len); + needxb(ether->dhost, 6, pkt, len); + needxb(ether->shost, 6, pkt, len); + need16(ether->type, pkt, len); - switch (ether.type) { + switch (ether->type) { case 0x8100: /* 802.1q */ case 0x88a8: /* 802.1ad */ case 0x9100: /* 802.1 QinQ non-standard */ - return _ieee802(self, ether.type, (core_object_t*)ðer, pkt, len); + return _ieee802(self, ether->type, (core_object_t*)ether, pkt, len); case ETHERTYPE_IP: case ETHERTYPE_IPV6: - return _ip(self, (core_object_t*)ðer, pkt, len); + return _ip(self, (core_object_t*)ether, pkt, len); default: break; @@ -506,16 +529,17 @@ static int _link(filter_layer_t* self, const core_object_pcap_t* pcap) break; } case DLT_LOOP: { - core_object_loop_t loop = CORE_OBJECT_LOOP_INIT(pcap); + core_object_loop_t* loop = &self->loop; + loop->obj_prev = (core_object_t*)pcap; - need32(loop.family, pkt, len); + need32(loop->family, pkt, len); - switch (loop.family) { + switch (loop->family) { case 2: case 24: case 28: case 30: - return _ip(self, (core_object_t*)&loop, pkt, len); + return _ip(self, (core_object_t*)loop, pkt, len); default: break; @@ -531,23 +555,24 @@ static int _link(filter_layer_t* self, const core_object_pcap_t* pcap) #endif return _ip(self, (core_object_t*)&pcap, pkt, len); case DLT_LINUX_SLL: { - core_object_linuxsll_t linuxsll = CORE_OBJECT_LINUXSLL_INIT(pcap); + core_object_linuxsll_t* linuxsll = &self->linuxsll; + linuxsll->obj_prev = (core_object_t*)pcap; - need16(linuxsll.packet_type, pkt, len); - need16(linuxsll.arp_hardware, pkt, len); - need16(linuxsll.link_layer_address_length, pkt, len); - needxb(linuxsll.link_layer_address, 8, pkt, len); - need16(linuxsll.ether_type, pkt, len); + need16(linuxsll->packet_type, pkt, len); + need16(linuxsll->arp_hardware, pkt, len); + need16(linuxsll->link_layer_address_length, pkt, len); + needxb(linuxsll->link_layer_address, 8, pkt, len); + need16(linuxsll->ether_type, pkt, len); - switch (linuxsll.ether_type) { + switch (linuxsll->ether_type) { case 0x8100: /* 802.1q */ case 0x88a8: /* 802.1ad */ case 0x9100: /* 802.1 QinQ non-standard */ - return _ieee802(self, linuxsll.ether_type, (core_object_t*)&linuxsll, pkt, len); + return _ieee802(self, linuxsll->ether_type, (core_object_t*)linuxsll, pkt, len); case ETHERTYPE_IP: case ETHERTYPE_IPV6: - return _ip(self, (core_object_t*)&linuxsll, pkt, len); + return _ip(self, (core_object_t*)linuxsll, pkt, len); default: break; @@ -562,7 +587,7 @@ static int _link(filter_layer_t* self, const core_object_pcap_t* pcap) break; } - self->recv(self->ctx, (core_object_t*)pcap); + self->produced = (core_object_t*)pcap; return 0; } @@ -571,6 +596,7 @@ static int _receive(void* ctx, const core_object_t* obj) { filter_layer_t* self = (filter_layer_t*)ctx; const core_object_pcap_t* pcap = (core_object_pcap_t*)obj; + int ret; if (!self || !obj || obj->obj_type != CORE_OBJECT_PCAP || !self->recv) { return 1; @@ -578,13 +604,17 @@ static int _receive(void* ctx, const core_object_t* obj) if (pcap->is_multiple) { while (pcap) { - int ret = _link(self, pcap); - if (ret) + if ((ret = _link(self, pcap))) return ret; + self->recv(self->ctx, self->produced); pcap = (const core_object_pcap_t*)pcap->obj_prev; + if (pcap && pcap->obj_type != CORE_OBJECT_PCAP) + return 1; } } else { - return _link(self, pcap); + if ((ret = _link(self, pcap))) + return ret; + self->recv(self->ctx, self->produced); } return 0; @@ -594,3 +624,37 @@ core_receiver_t filter_layer_receiver() { return _receive; } + +static const core_object_t* _produce(void* ctx) +{ + filter_layer_t* self = (filter_layer_t*)ctx; + + if (!self) { + return 0; + } + + if (self->prod_obj) { + if (self->prod_obj->obj_type != CORE_OBJECT_PCAP || _link(self, self->prod_obj)) { + self->prod_obj = 0; + return 0; + } + self->prod_obj = (const core_object_pcap_t*)self->prod_obj->obj_prev; + } else { + const core_object_pcap_t* obj = (core_object_pcap_t*)self->prod(self->prod_ctx); + + if (!obj || obj->obj_type != CORE_OBJECT_PCAP || _link(self, obj)) { + return 0; + } + + if (obj->is_multiple) { + self->prod_obj = (const core_object_pcap_t*)obj->obj_prev; + } + } + + return self->produced; +} + +core_producer_t filter_layer_producer() +{ + return _produce; +} diff --git a/src/filter/layer.h b/src/filter/layer.h index 59e6cad9..e0589dac 100644 --- a/src/filter/layer.h +++ b/src/filter/layer.h @@ -20,6 +20,20 @@ #include "core/log.h" #include "core/receiver.h" +#include "core/producer.h" +#include "core/object/pcap.h" +#include "core/object/null.h" +#include "core/object/ether.h" +#include "core/object/loop.h" +#include "core/object/linuxsll.h" +#include "core/object/ieee802.h" +#include "core/object/ip.h" +#include "core/object/ip6.h" +#include "core/object/gre.h" +#include "core/object/icmp.h" +#include "core/object/icmp6.h" +#include "core/object/udp.h" +#include "core/object/tcp.h" #ifndef __dnsjit_filter_layer_h #define __dnsjit_filter_layer_h diff --git a/src/filter/layer.hh b/src/filter/layer.hh index 206c898d..cc31e11e 100644 --- a/src/filter/layer.hh +++ b/src/filter/layer.hh @@ -20,11 +20,44 @@ //lua:require("dnsjit.core.log") //lua:require("dnsjit.core.receiver_h") +//lua:require("dnsjit.core.producer_h") +//lua:require("dnsjit.core.object.pcap_h") +//lua:require("dnsjit.core.object.null_h") +//lua:require("dnsjit.core.object.ether_h") +//lua:require("dnsjit.core.object.loop_h") +//lua:require("dnsjit.core.object.linuxsll_h") +//lua:require("dnsjit.core.object.ieee802_h") +//lua:require("dnsjit.core.object.ip_h") +//lua:require("dnsjit.core.object.ip6_h") +//lua:require("dnsjit.core.object.gre_h") +//lua:require("dnsjit.core.object.icmp_h") +//lua:require("dnsjit.core.object.icmp6_h") +//lua:require("dnsjit.core.object.udp_h") +//lua:require("dnsjit.core.object.tcp_h") typedef struct filter_layer { core_log_t _log; core_receiver_t recv; void* ctx; + + core_producer_t prod; + void* prod_ctx; + const core_object_pcap_t* prod_obj; + + const core_object_t* produced; + core_object_null_t null; + core_object_ether_t ether; + core_object_loop_t loop; + core_object_linuxsll_t linuxsll; + size_t n_ieee802; + core_object_ieee802_t ieee802[3]; // N_IEEE802 + core_object_ip_t ip; + core_object_ip6_t ip6; + core_object_gre_t gre; + core_object_icmp_t icmp; + core_object_icmp6_t icmp6; + core_object_udp_t udp; + core_object_tcp_t tcp; } filter_layer_t; core_log_t* filter_layer_log(); @@ -33,3 +66,4 @@ int filter_layer_init(filter_layer_t* self); int filter_layer_destroy(filter_layer_t* self); core_receiver_t filter_layer_receiver(); +core_producer_t filter_layer_producer(); diff --git a/src/filter/layer.lua b/src/filter/layer.lua index eea8405b..a68d57d5 100644 --- a/src/filter/layer.lua +++ b/src/filter/layer.lua @@ -67,6 +67,18 @@ function Layer:receiver(o) self._receiver = o end +-- Return the C functions and context for producing objects. +function Layer:produce() + return C.filter_layer_producer(), self.obj +end + +-- Set the producer to get objects from. +function Layer:producer(o) + self.obj._log:debug("producer()") + self.obj.prod, self.obj.prod_ctx = o:produce() + self._producer = o +end + -- dnsjit.core.object.pcap (3), -- dnsjit.core.object.ether (3), -- dnsjit.core.object.null (3), diff --git a/src/output/null.c b/src/output/null.c index a7012220..112f39b9 100644 --- a/src/output/null.c +++ b/src/output/null.c @@ -97,9 +97,30 @@ int output_null_run(output_null_t* self, uint64_t num) p = self->prod; c = self->ctx; - while (num--) { - const core_object_t* obj = p(c); - if (obj) { + if (num) { + while (num--) { + const core_object_t* obj = p(c); + if (!obj) + continue; + + if (obj->obj_type == CORE_OBJECT_PCAP) { + const core_object_pcap_t* pkt = (core_object_pcap_t*)obj; + if (pkt->is_multiple) { + while (pkt) { + self->pkts++; + pkt = (core_object_pcap_t*)pkt->obj_prev; + } + continue; + } + } + self->pkts++; + } + } else { + for (;;) { + const core_object_t* obj = p(c); + if (!obj) + break; + if (obj->obj_type == CORE_OBJECT_PCAP) { const core_object_pcap_t* pkt = (core_object_pcap_t*)obj; if (pkt->is_multiple) { diff --git a/src/output/null.lua b/src/output/null.lua index 961ce46d..d06fccc3 100644 --- a/src/output/null.lua +++ b/src/output/null.lua @@ -55,7 +55,7 @@ function Null:receive() return C.output_null_receiver(), self.obj end --- Set the producer to pass objects to. +-- Set the producer to get objects from. function Null:producer(o) self.obj._log:debug("producer()") self.obj.prod, self.obj.ctx = o:produce() @@ -64,7 +64,10 @@ end -- Retrieve -- .I num --- objects from the producer, return 0 if successful. +-- objects from the producer, if +-- .I num +-- is zero or less then retrieve all objects. +-- Returns 0 if successful. function Null:run(num) return C.output_null_run(self.obj, num) end