Skip to content

Commit

Permalink
feature/3.0.x compatibility (#29)
Browse files Browse the repository at this point in the history
* more idiomatic go structure

* CoreServer

* log verbosity. work towards compatible metrics.

* switch to a new third party timer class, fixes some slowdowns

* single static rate timer thread

* fix modules, limit catch to runtime_error

* naming

* interface for pulling information

* info json work, and split out set up routes

* error handling and metric bucket info

* decidedly make front bucket the most recent

* refactor period shifts

* set buckets read only on period shift, stopping rate calculations

* compatible app info end point

* thread safety, rate events, read only

* rates

* fix rates, error msgs, entry work.

* docs and period shift fixes

* add dns transaction timeouts

* more info in the base common json

* fix docker build.
  • Loading branch information
weyrick authored Feb 26, 2021
1 parent b23473b commit 0392e49
Show file tree
Hide file tree
Showing 36 changed files with 1,203 additions and 573 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
cmake-build*
tests/external*
Dockerfile
.dockerignore
1 change: 1 addition & 0 deletions 3rd/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ message(STATUS "3rd party libraries")

add_subdirectory(datasketches)
add_subdirectory(rng)
add_subdirectory(timer)
15 changes: 15 additions & 0 deletions 3rd/timer/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

add_library(timer INTERFACE)

target_include_directories(rng
INTERFACE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
)

target_compile_features(timer INTERFACE cxx_std_17)

target_sources(timer
INTERFACE
${CMAKE_CURRENT_SOURCE_DIR}/timer.hpp
${CMAKE_CURRENT_SOURCE_DIR}/event.hpp
)
8 changes: 8 additions & 0 deletions 3rd/timer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
https://vorbrodt.blog/2019/02/25/better-timer-class/
https://github.com/mvorbrodt/blog

mvorbrodt/blog is licensed under the

BSD Zero Clause License The BSD Zero Clause license goes further than the BSD 2-Clause license to allow you unlimited
freedom with the software without requirements to include the copyright notice, license text, or disclaimer in either
source or binary forms.
99 changes: 99 additions & 0 deletions 3rd/timer/event.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once

#include <mutex>
#include <chrono>
#include <condition_variable>
#include <chrono>

class manual_event
{
public:
explicit manual_event(bool signaled = false)
: m_signaled(signaled) {}

void signal()
{
{
std::unique_lock lock(m_mutex);
m_signaled = true;
}
m_cv.notify_all();
}

void wait()
{
std::unique_lock lock(m_mutex);
m_cv.wait(lock, [&]() { return m_signaled != false; });
}

template<typename Rep, typename Period>
bool wait_for(const std::chrono::duration<Rep, Period>& t)
{
std::unique_lock lock(m_mutex);
return m_cv.wait_for(lock, t, [&]() { return m_signaled != false; });
}

template<typename Clock, typename Duration>
bool wait_until(const std::chrono::time_point<Clock, Duration>& t)
{
std::unique_lock lock(m_mutex);
return m_cv.wait_until(lock, t, [&]() { return m_signaled != false; });
}

void reset()
{
std::unique_lock lock(m_mutex);
m_signaled = false;
}

private:
bool m_signaled = false;
std::mutex m_mutex;
std::condition_variable m_cv;
};

class auto_event
{
public:
explicit auto_event(bool signaled = false)
: m_signaled(signaled) {}

void signal()
{
{
std::unique_lock lock(m_mutex);
m_signaled = true;
}
m_cv.notify_one();
}

void wait()
{
std::unique_lock lock(m_mutex);
m_cv.wait(lock, [&]() { return m_signaled != false; });
m_signaled = false;
}

template<typename Rep, typename Period>
bool wait_for(const std::chrono::duration<Rep, Period>& t)
{
std::unique_lock lock(m_mutex);
bool result = m_cv.wait_for(lock, t, [&]() { return m_signaled != false; });
if(result) m_signaled = false;
return result;
}

template<typename Clock, typename Duration>
bool wait_until(const std::chrono::time_point<Clock, Duration>& t)
{
std::unique_lock lock(m_mutex);
bool result = m_cv.wait_until(lock, t, [&]() { return m_signaled != false; });
if(result) m_signaled = false;
return result;
}

private:
bool m_signaled = false;
std::mutex m_mutex;
std::condition_variable m_cv;
};
193 changes: 193 additions & 0 deletions 3rd/timer/timer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
#pragma once

#include <list>
#include <mutex>
#include <tuple>
#include <atomic>
#include <thread>
#include <chrono>
#include <memory>
#include <iterator>
#include <stdexcept>
#include <functional>
#include <cstdint>
#include "event.hpp"

class timer
{
public:
template<typename R, typename P>
explicit timer(const std::chrono::duration<R, P>& tick)
: m_tick{ std::chrono::duration_cast<std::chrono::nanoseconds>(tick) }
{
if(m_tick.count() <= 0)
{
throw std::invalid_argument("Invalid tick value: must be greater than zero!");
}

m_tick_thread = std::make_unique<std::thread>([this]()
{
auto start = std::chrono::steady_clock::now();

while(!m_tick_event.wait_until(start + m_tick * ++m_ticks))
{
std::scoped_lock lock{ m_events_lock };

auto it = std::begin(m_events);
auto end = std::end(m_events);

while(it != end)
{
auto& e = *it;

if(e->elapsed += m_tick.count(); e->elapsed >= e->ticks)
{
if(auto remove = e->proc())
{
m_events.erase(it++);
continue;
}
else
{
e->elapsed = 0;
}
}

++it;
}
}
});
}

~timer()
{
m_tick_event.signal();
m_tick_thread->join();
}

using manual_event_ptr = std::shared_ptr<manual_event>;
using auto_event_ptr = std::shared_ptr<auto_event>;

template<typename C, typename W>
struct event_handle
{
event_handle(C cancel_event, W work_event)
: m_cancel_event{ cancel_event }, m_work_event{ work_event } {}

void cancel() { m_cancel_event->signal(); }

void wait() { m_work_event->wait(); }

template<typename Rep, typename Period>
bool wait_for(const std::chrono::duration<Rep, Period>& t) { return m_work_event->wait_for(t); }

template<typename Clock, typename Duration>
bool wait_until(const std::chrono::time_point<Clock, Duration>& t) { return m_work_event->wait_until(t); }

private:
C m_cancel_event;
W m_work_event;
};

using timeout_handle = event_handle<manual_event_ptr, manual_event_ptr>;
using interval_handle = event_handle<manual_event_ptr, auto_event_ptr>;

template<typename R, typename P, typename F, typename... Args>
[[nodiscard]] auto set_timeout(const std::chrono::duration<R, P>& timeout, F&& f, Args&&... args)
{
if(timeout.count() <= 0)
{
throw std::invalid_argument("Invalid timeout value: must be greater than zero!");
}

auto cancel_event = std::make_shared<manual_event>();
auto work_event = std::make_shared<manual_event>();
auto handle = std::make_shared<timeout_handle>(cancel_event, work_event);

auto ctx = std::make_shared<event_ctx>(
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count(),
[=, p = std::forward<F>(f), t = std::make_tuple(std::forward<Args>(args)...)]() mutable
{
if(cancel_event->wait_for(std::chrono::seconds{0}))
{
return true;
}

std::apply(p, t);
work_event->signal();

return true;
});

{
std::scoped_lock lock{ m_events_lock };
m_events.push_back(ctx);
}

return handle;
}

template<typename R, typename P, typename F, typename... Args>
[[nodiscard]] auto set_interval(const std::chrono::duration<R, P>& interval, F&& f, Args&&... args)
{
if(interval.count() <= 0)
{
throw std::invalid_argument("Invalid interval value: must be greater than zero!");
}

auto cancel_event = std::make_shared<manual_event>();
auto work_event = std::make_shared<auto_event>();
auto handle = std::make_shared<interval_handle>(cancel_event, work_event);

auto ctx = std::make_shared<event_ctx>(
std::chrono::duration_cast<std::chrono::nanoseconds>(interval).count(),
[=, p = std::forward<F>(f), t = std::make_tuple(std::forward<Args>(args)...)]() mutable
{
if(cancel_event->wait_for(std::chrono::seconds{0}))
{
return true;
}

std::apply(p, t);
work_event->signal();

return false;
});

{
std::scoped_lock lock{ m_events_lock };
m_events.push_back(ctx);
}

return handle;
}

private:
std::chrono::nanoseconds m_tick;
std::uint64_t m_ticks = 0;

using thread_ptr = std::unique_ptr<std::thread>;
thread_ptr m_tick_thread;
manual_event m_tick_event;

struct event_ctx
{
using proc_t = std::function<bool(void)>;

event_ctx(std::uint64_t t, proc_t&& p)
: ticks{ t }, proc{ std::move(p) } {}

std::uint32_t seq_num = s_next.fetch_add(1);
std::uint64_t ticks;
std::uint64_t elapsed = 0;
proc_t proc;

private:
static inline std::atomic_uint32_t s_next = 0;
};

using event_ctx_ptr = std::shared_ptr<event_ctx>;
using event_list = std::list<event_ctx_ptr>;
event_list m_events;
std::recursive_mutex m_events_lock;
};
17 changes: 9 additions & 8 deletions cmd/pktvisor-pcap/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ int main(int argc, char *argv[])
input_stream->config_set("host_spec", host_spec);

input_stream->parse_host_spec();
console->info("{}", input_stream->config_json().dump(4));
console->info("{}", input_stream->info_json().dump(4));
json j;
input_stream->info_json(j["info"]);
console->info("{}", j.dump(4));

input_manager->module_add(std::move(input_stream), false);
auto [input_stream_, stream_mgr_lock] = input_manager->module_get_locked("pcap");
Expand All @@ -162,15 +163,15 @@ int main(int argc, char *argv[])

handler::net::NetStreamHandler *net_handler{nullptr};
{
auto handler_module = std::make_unique<handler::net::NetStreamHandler>("net", pcap_stream, periods, sample_rate);
auto handler_module = std::make_unique<handler::net::NetStreamHandler>("net", pcap_stream, periods, sample_rate, false);
handler_manager->module_add(std::move(handler_module));
auto [handler, handler_mgr_lock] = handler_manager->module_get_locked("net");
handler_mgr_lock.unlock();
net_handler = dynamic_cast<handler::net::NetStreamHandler *>(handler);
}
handler::dns::DnsStreamHandler *dns_handler{nullptr};
{
auto handler_module = std::make_unique<handler::dns::DnsStreamHandler>("dns", pcap_stream, periods, sample_rate);
auto handler_module = std::make_unique<handler::dns::DnsStreamHandler>("dns", pcap_stream, periods, sample_rate, false);
handler_manager->module_add(std::move(handler_module));
auto [handler, handler_mgr_lock] = handler_manager->module_get_locked("dns");
handler_mgr_lock.unlock();
Expand All @@ -182,12 +183,12 @@ int main(int argc, char *argv[])
json result;
if (periods == 1) {
// in summary mode we output a single summary of stats
net_handler->to_json(result, 0, false);
dns_handler->to_json(result, 0, false);
net_handler->window_json(result, 0, false);
dns_handler->window_json(result, 0, false);
} else {
// otherwise, merge the max time window available
net_handler->to_json(result, periods, true);
dns_handler->to_json(result, periods, true);
net_handler->window_json(result, periods, true);
dns_handler->window_json(result, periods, true);
}
console->info("{}", result.dump());
shutdown_handler(SIGUSR1);
Expand Down
Loading

0 comments on commit 0392e49

Please sign in to comment.