Skip to content

Commit

Permalink
Add log metrics (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
taglines authored Feb 11, 2021
1 parent bea7476 commit 2be0248
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 17 deletions.
4 changes: 2 additions & 2 deletions EventProcessorTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ BOOST_AUTO_TEST_CASE( basic_test ) {
auto filtersEngine = std::make_shared<FiltersEngine>();
auto processTree = std::make_shared<ProcessTree>(user_db, filtersEngine);

auto metrics = std::make_shared<Metrics>(metrics_builder);
auto metrics = std::make_shared<Metrics>("test", metrics_builder);

auto cmdline_redactor = std::make_shared<CmdlineRedactor>();

Expand Down Expand Up @@ -323,7 +323,7 @@ BOOST_AUTO_TEST_CASE( oversized_event_test ) {
auto filtersEngine = std::make_shared<FiltersEngine>();
auto processTree = std::make_shared<ProcessTree>(user_db, filtersEngine);

auto metrics = std::make_shared<Metrics>(metrics_builder);
auto metrics = std::make_shared<Metrics>("test", metrics_builder);

auto cmdline_redactor = std::make_shared<CmdlineRedactor>();

Expand Down
2 changes: 1 addition & 1 deletion ExecveConverterTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ BOOST_AUTO_TEST_CASE( basic_test ) {
auto metrics_queue = new TestEventQueue();
auto metrics_allocator = std::shared_ptr<IEventBuilderAllocator>(metrics_queue);
auto metrics_builder = std::make_shared<EventBuilder>(metrics_allocator, prioritizer);
auto metrics = std::make_shared<Metrics>(metrics_builder);
auto metrics = std::make_shared<Metrics>("test", metrics_builder);

RawEventAccumulator accumulator(raw_builder, metrics);

Expand Down
76 changes: 66 additions & 10 deletions Logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ extern "C" {
#include <syslog.h>
}

#define MAX_PAST_METRICS 10000

std::mutex Logger::_mutex;
std::string Logger::_ident;
bool Logger::_enable_syslog = false;
std::function<void(const char* ptr, size_t size)> Logger::_log_fn;
std::unordered_map<std::string, std::shared_ptr<LogMetric>> Logger::_current_metrics;
std::vector<std::shared_ptr<LogMetric>> Logger::_past_metrics;

void Logger::OpenSyslog(const std::string& ident, int facility)
{
std::lock_guard<std::mutex> lock(_mutex);
_ident = ident;
openlog(_ident.c_str(), LOG_PERROR, LOG_DAEMON);
_enable_syslog = true;
Expand All @@ -38,47 +44,97 @@ void Logger::Info(const char* fmt, ...)
va_list args;
va_start(args, fmt);
_log_write(LOG_INFO, fmt, args);
va_end(args);
}

void Logger::Warn(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
_log_write(LOG_WARNING, fmt, args);
va_end(args);
}

void Logger::Error(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
_log_write(LOG_ERR, fmt, args);
va_end(args);
}

void Logger::Debug(const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
_log_write(LOG_DEBUG, fmt, args);
va_end(args);
}

void Logger::_log_write(int level, const char* fmt, va_list ap)
{
if (_enable_syslog) {
vsyslog(level, fmt, ap);
} else {
char buffer[64*1024];
auto nr = vsnprintf(buffer, sizeof(buffer), fmt, ap);
if (nr > 0) {
if (buffer[nr - 1] != '\n') {
std::lock_guard<std::mutex> lock(_mutex);
char buffer[64*1024];
auto nr = vsnprintf(buffer, sizeof(buffer), fmt, ap);
if (nr > 0) {
if (nr > sizeof(buffer)-1) {
nr = sizeof(buffer)-1;
}
if (nr > 1 && buffer[nr - 1] != '\n') {
if (nr < sizeof(buffer)-1) {
buffer[nr] = '\n';
nr++;
} else {
buffer[nr - 1] = '\n';
}
buffer[nr] = 0;
}
buffer[nr] = 0;
if (_enable_syslog) {
syslog(level, "%s", buffer);
} else {
(void)write(2, buffer, nr);
if (_log_fn) {
_log_fn(buffer, nr);
}
if (_log_fn) {
_log_fn(buffer, nr);
}
auto now = std::chrono::system_clock::now();
std::string fmt_str = fmt;
auto itr = _current_metrics.find(fmt_str);
if (itr == _current_metrics.end()) {
auto ret = _current_metrics.emplace(fmt_str, std::make_shared<LogMetric>(now));
itr = ret.first;
} else if (std::chrono::duration_cast<std::chrono::milliseconds>(now - itr->second->_start_time).count() > 60000 ) {
if (_past_metrics.size() < MAX_PAST_METRICS) {
_past_metrics.emplace_back(itr->second);
}
_current_metrics.erase(itr);
auto ret = _current_metrics.emplace(fmt_str, std::make_shared<LogMetric>(now));
itr = ret.first;
}
itr->second->_count += 1;
itr->second->_end_time = now;
if (itr->second->_count == 1) {
itr->second->_fmt = fmt_str;
itr->second->_first_msg.assign(buffer, nr);
}
}
}

size_t Logger::GetMetrics(std::vector<std::shared_ptr<LogMetric>>& metrics, bool flush_all) {
std::lock_guard<std::mutex> lock(_mutex);

auto now = std::chrono::system_clock::now();
for (auto itr = _current_metrics.begin(); itr != _current_metrics.end(); ) {
if (flush_all || std::chrono::duration_cast<std::chrono::milliseconds>(now - itr->second->_start_time).count() > 60000 ) {
_past_metrics.emplace_back(itr->second);
itr = _current_metrics.erase(itr);
} else {
itr++;
}
}

metrics = _past_metrics;
_past_metrics.clear();

return metrics.size();
}
20 changes: 20 additions & 0 deletions Logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,22 @@

#include <string>
#include <cstdarg>
#include <unordered_map>
#include <functional>
#include <memory>
#include <chrono>
#include <mutex>

class LogMetric {
public:
LogMetric(std::chrono::system_clock::time_point time): _start_time(time), _end_time(time), _fmt(), _first_msg(), _count(0) {}
std::chrono::system_clock::time_point _start_time;
std::chrono::system_clock::time_point _end_time;

std::string _fmt;
std::string _first_msg;
size_t _count;
};

class Logger {
public:
Expand All @@ -28,12 +43,17 @@ class Logger {
static void Warn(const char* fmt, ...) __attribute__ ((format (printf, 1, 2)));
static void Error(const char* fmt, ...) __attribute__ ((format (printf, 1, 2)));
static void Debug(const char* fmt, ...) __attribute__ ((format (printf, 1, 2)));

static size_t GetMetrics(std::vector<std::shared_ptr<LogMetric>>& metrics, bool flush_all);
private:
static void _log_write(int level, const char* fmt, va_list ap);

static std::mutex _mutex;
static std::string _ident;
static bool _enable_syslog;
static std::function<void(const char* ptr, size_t size)> _log_fn;
static std::unordered_map<std::string, std::shared_ptr<LogMetric>> _current_metrics;
static std::vector<std::shared_ptr<LogMetric>> _past_metrics;
};


Expand Down
90 changes: 90 additions & 0 deletions Metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,18 @@ void Metrics::run() {
if (!send_metrics()) {
return;
}
if (!send_log_metrics(false)) {
return;
}
}
}

void system_time_sec_msec(const std::chrono::system_clock::time_point st, uint64_t& sec, uint32_t& msec) {
sec = std::chrono::system_clock::to_time_t(st);
auto sec_st = std::chrono::system_clock::from_time_t(sec);
msec = std::chrono::duration_cast<std::chrono::milliseconds>(st - sec_st).count();
}

std::string system_time_to_iso3339(const std::chrono::system_clock::time_point st) {
time_t secs = std::chrono::system_clock::to_time_t(st);
auto sec_st = std::chrono::system_clock::from_time_t(secs);
Expand Down Expand Up @@ -136,5 +145,86 @@ bool Metrics::send_metrics() {
}
}
}

return true;
}

bool Metrics::send_log_metrics(bool flush_all) {
auto rec_type = RecordType::AUOMS_METRIC;
auto rec_type_name = RecordTypeToName(RecordType::AUOMS_METRIC);

std::vector<std::shared_ptr<LogMetric>> log_metrics;
Logger::GetMetrics(log_metrics, flush_all);

for (auto& lm : log_metrics) {
uint64_t sec;
uint32_t msec;
system_time_sec_msec(lm->_start_time, sec, msec);

int num_fields = 11;
bool include_fist_msg = false;
if (lm->_first_msg.compare(0, lm->_first_msg.size()-1, lm->_fmt) != 0) {
num_fields = 12;
include_fist_msg = true;
}

if (!_builder->BeginEvent(sec, msec, 0, 1)) {
return false;
}
if (!_builder->BeginRecord(static_cast<uint32_t>(rec_type), rec_type_name, "", num_fields)) {
return false;
}
if (!_builder->AddField("version", AUOMS_VERSION, nullptr, field_type_t::UNCLASSIFIED)) {
return false;
}
if (!_builder->AddField("StartTime", system_time_to_iso3339(lm->_start_time), nullptr,
field_type_t::UNCLASSIFIED)) {
return false;
}
if (!_builder->AddField("EndTime", system_time_to_iso3339(lm->_end_time), nullptr,
field_type_t::UNCLASSIFIED)) {
return false;
}
if (!_builder->AddField("Namespace", _proc_name, nullptr, field_type_t::UNCLASSIFIED)) {
return false;
}
if (!_builder->AddField("Name", "log", nullptr, field_type_t::UNCLASSIFIED)) {
return false;
}
if (!_builder->AddField("SamplePeriod", std::to_string(static_cast<uint64_t>(MetricPeriod::MINUTE)), nullptr,
field_type_t::UNCLASSIFIED)) {
return false;
}
if (!_builder->AddField("NumSamples", std::to_string(1), nullptr,
field_type_t::UNCLASSIFIED)) {
return false;
}
if (!_builder->AddField("Min", std::to_string(static_cast<double>(lm->_count)), nullptr, field_type_t::UNCLASSIFIED)) {
return false;
}
if (!_builder->AddField("Max", std::to_string(static_cast<double>(lm->_count)), nullptr, field_type_t::UNCLASSIFIED)) {
return false;
}
if (!_builder->AddField("Avg", std::to_string(static_cast<double>(lm->_count)), nullptr, field_type_t::UNCLASSIFIED)) {
return false;
}
if (!_builder->AddField("Message", lm->_fmt, nullptr,
field_type_t::UNCLASSIFIED)) {
return false;
}
if (include_fist_msg) {
if (!_builder->AddField("Data", lm->_first_msg, nullptr,
field_type_t::UNCLASSIFIED)) {
return false;
}
}
if (!_builder->EndRecord()) {
return false;
}
if (!_builder->EndEvent() != 0) {
return false;
}
}

return true;
}
7 changes: 5 additions & 2 deletions Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,20 @@ class MetricFromTotal: public Metric {

class Metrics: public RunBase {
public:
explicit Metrics(std::shared_ptr<EventBuilder> builder): _builder(std::move(builder)) {}
explicit Metrics(std::shared_ptr<PriorityQueue> queue): _builder(std::make_shared<EventBuilder>(std::make_shared<EventQueue>(std::move(queue)), nullptr)) {}
explicit Metrics(const std::string& proc_name, std::shared_ptr<EventBuilder> builder): _proc_name(proc_name), _builder(std::move(builder)) {}
explicit Metrics(const std::string& proc_name, std::shared_ptr<PriorityQueue> queue): _proc_name(proc_name), _builder(std::make_shared<EventBuilder>(std::make_shared<EventQueue>(std::move(queue)), nullptr)) {}

std::shared_ptr<Metric> AddMetric(MetricType metric_type, const std::string& namespace_name, const std::string& name, MetricPeriod sample_period, MetricPeriod agg_period);

void FlushLogMetrics() { send_log_metrics(true); }
protected:
void run() override;

private:
bool send_metrics();
bool send_log_metrics(bool flush_all);

std::string _proc_name;
std::shared_ptr<EventBuilder> _builder;
std::mutex _mutex;
std::unordered_map<std::string, std::shared_ptr<Metric>> _metrics;
Expand Down
3 changes: 2 additions & 1 deletion auoms.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ int main(int argc, char**argv) {
}
operational_status->Start();

auto metrics = std::make_shared<Metrics>(queue);
auto metrics = std::make_shared<Metrics>("auoms", queue);
metrics->Start();

auto syscall_metrics = std::make_shared<SyscallMetrics>(metrics);
Expand Down Expand Up @@ -533,6 +533,7 @@ int main(int argc, char**argv) {
inputs.Stop();
outputs.Stop(false); // Trigger outputs shutdown but don't block
user_db->Stop(); // Stop user db monitoring
metrics->FlushLogMetrics();
queue->Close(); // Close queue, this will trigger exit of autosave thread
outputs.Wait(); // Wait for outputs to finish shutdown
autosave_thread.join(); // Wait for autosave thread to exit
Expand Down
3 changes: 2 additions & 1 deletion auomscollect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ int main(int argc, char**argv) {
auto event_queue = std::make_shared<EventQueue>(queue);
auto builder = std::make_shared<EventBuilder>(event_queue, event_prioritizer);

auto metrics = std::make_shared<Metrics>(queue);
auto metrics = std::make_shared<Metrics>("auomscollect", queue);
metrics->Start();

auto proc_metrics = std::make_shared<ProcMetrics>("auomscollect", queue, metrics, rss_limit, virt_limit, rss_pct_limit, []() {
Expand Down Expand Up @@ -674,6 +674,7 @@ int main(int argc, char**argv) {
sleep(stop_delay);
}
output.Stop();
metrics->FlushLogMetrics();
queue->Close(); // Close queue, this will trigger exit of autosave thread
autosave_thread.join(); // Wait for autosave thread to exit
} catch (const std::exception& ex) {
Expand Down

0 comments on commit 2be0248

Please sign in to comment.