Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infra - MQTT node to monitor health of the system #120

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c5f51cc
keyence tests
TomLonergan03 May 2, 2024
6330bd9
dummy updates
TomLonergan03 May 2, 2024
22e4f99
mqtt logger tests
TomLonergan03 May 9, 2024
7929330
Merge branch 'master' into sns-testing
TomLonergan03 May 9, 2024
93158de
continue mqtt logger tests
TomLonergan03 May 9, 2024
7d42f56
mqtt logger logs strings correctly
TomLonergan03 May 9, 2024
20fba43
mqtt logger logs formats properly
TomLonergan03 May 9, 2024
6aff728
fix lint
TomLonergan03 May 9, 2024
d5dae73
fix lint pt 2
TomLonergan03 May 9, 2024
a2b0476
Merge branch 'master' into sns-testing
TomLonergan03 May 24, 2024
7092758
Merge branch 'master' into sns-testing
TomLonergan03 May 24, 2024
62d8765
fix keyence test
TomLonergan03 May 24, 2024
c8e81e9
port spi changes from #60 because i leave branches open for far too l…
TomLonergan03 May 24, 2024
9287ca6
dummy spi
TomLonergan03 May 24, 2024
9820629
change mqtt timestamps to be automatically converted to and from Time…
TomLonergan03 May 24, 2024
2e670b4
start of the health monitor node
TomLonergan03 May 24, 2024
1d8911d
do the state transition request properly
TomLonergan03 May 24, 2024
a247594
state machine subs to states
TomLonergan03 May 24, 2024
f717182
fix lint
TomLonergan03 May 24, 2024
f847a32
cmake format
TomLonergan03 May 25, 2024
068dbd1
fix build
TomLonergan03 May 25, 2024
f4430ba
publish messages
TomLonergan03 May 25, 2024
5fc5631
cmake format
TomLonergan03 May 25, 2024
a217c91
start tests, somehow unchanged tests also segfault????
TomLonergan03 May 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config/pod.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[hostnames]
hostnames = ["raspberry", "banoffee", "pumpkin", "rhubarb"]

[ips]
raspberry = "192.168.1.0"
banoffee = "192.168.1.1"
pumpkin = "192.168.1.2"
Expand Down
10 changes: 7 additions & 3 deletions lib/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ set(target hyped_core)
file(GLOB headers "${CMAKE_CURRENT_SOURCE_DIR}/*.hpp")
file(GLOB code "${CMAKE_CURRENT_SOURCE_DIR}/*.cpp")
add_library(${target} STATIC ${headers} ${code})
target_include_directories(
${target} INTERFACE "${CMAKE_SOURCE_DIR}/lib" ${rapidjson_SOURCE_DIR}/include
include_directories(
${target}
INTERFACE
"${CMAKE_SOURCE_DIR}/lib"
${CMAKE_CURRENT_SOURCE_DIR}
${rapidjson_SOURCE_DIR}/include
)
target_link_libraries(${target} PahoMqttCpp::paho-mqttpp3)
target_link_libraries(${target} PahoMqttCpp::paho-mqttpp3 hyped_statemachine)
158 changes: 158 additions & 0 deletions lib/core/health_monitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#include "health_monitor.hpp"
#include "logger.hpp"
#include "time.hpp"
#include "types.hpp"

#include <utility>

namespace hyped::core {

std::optional<HealthMonitor> HealthMonitor::create(ILogger &logger,
ITimeSource &time,
std::shared_ptr<core::IMqtt> mqtt,
toml::table &config)
{
const auto *const optional_hostnames = config["hostnames"].as_array();
if (optional_hostnames == nullptr) {
logger.log(core::LogLevel::kFatal, "No hostnames specified in config");
return std::nullopt;
}
const auto hostnames = *optional_hostnames;
std::vector<std::string> nodes;
// Get all nodes running across all hosts
for (const auto &hostname : hostnames) {
const auto optional_hostname = hostname.value<std::string>();
if (!optional_hostname) {
logger.log(core::LogLevel::kFatal, "Invalid hostname in config");
return std::nullopt;
}
const std::string &hostname_str = *optional_hostname;
const auto *const optional_nodes = config[hostname_str]["nodes"].as_array();
if (optional_nodes == nullptr) {
logger.log(core::LogLevel::kFatal, "No nodes for host %s", hostname_str.c_str());
return std::nullopt;
}
const auto nodes_for_host = *optional_nodes;
for (const auto &node : nodes_for_host) {
const auto optional_node = node.value<std::string>();
if (!optional_node) {
logger.log(core::LogLevel::kFatal, "Invalid node in config");
return std::nullopt;
}
const auto node_str = hostname_str + "." + *optional_node;
nodes.push_back(node_str);
}
}
return HealthMonitor(logger, time, std::move(mqtt), nodes);
}

HealthMonitor::HealthMonitor(ILogger &logger,
ITimeSource &time,
std::shared_ptr<core::IMqtt> mqtt,
std::vector<std::string> &nodes)
: logger_(logger),
time_(time),
mqtt_(std::move(mqtt))
{
for (const auto &node : nodes) {
checkins_[node] = std::nullopt;
}
}

void HealthMonitor::run()
{
{
const auto result = startup();
if (result == core::Result::kFailure) {
logger_.log(core::LogLevel::kFatal, "Failed to start up");
publishTransitionRequest(state_machine::State::kFailure);
return;
}
publishTransitionRequest(state_machine::State::kReady);
}
while (true) {
const auto result = processBatch();
if (result == core::Result::kFailure) {
logger_.log(core::LogLevel::kFatal, "Failed to process batch");
publishTransitionRequest(state_machine::State::kFailure);
return;
}
const auto current_time = time_.now();
for (auto &[name, checkin] : checkins_) {
if (current_time - *checkin > kCheckinTimeout) {
logger_.log(core::LogLevel::kFatal, "System %s has not checked in", name.c_str());
publishTransitionRequest(state_machine::State::kFailure);
return;
}
}
}
}

core::Result HealthMonitor::startup()
{
const auto start_time = time_.now();
while (time_.now() - start_time < kStartupTimeout) {
const auto result = processBatch();
if (result == core::Result::kFailure) {
logger_.log(core::LogLevel::kFatal, "Failed to process batch");
return core::Result::kFailure;
}
bool all_nodes_checked_in = true;
for (auto &[_, checkin] : checkins_) {
if (!checkin) {
all_nodes_checked_in = false;
break;
}
}
if (all_nodes_checked_in) {
logger_.log(core::LogLevel::kInfo,
"All nodes checked in in %d seconds",
(time_.now() - start_time).count());
publishTransitionRequest(state_machine::State::kCalibrate);
return core::Result::kSuccess;
}
}
std::string missing_nodes;
for (auto &[name, checkin] : checkins_) {
if (!checkin) { missing_nodes += name + ", "; }
}
logger_.log(core::LogLevel::kFatal, "%s failed to check in", missing_nodes.c_str());
return core::Result::kFailure;
}

core::Result HealthMonitor::processBatch()
{
const auto consume_result = mqtt_->consume();
if (consume_result == core::Result::kFailure) {
logger_.log(core::LogLevel::kFatal, "Invalid MQTT message received");
return core::Result::kFailure;
}
for (auto i = 0; i < 200; i++) {
mqtt::const_message_ptr received_msg;
auto optional_message = mqtt_->getMessage();
if (!optional_message) { break; }
const auto message = *optional_message;
const auto payload = message.payload;
const std::string node_name = payload->GetString();
if (checkins_.find(node_name) == checkins_.end()) {
logger_.log(core::LogLevel::kFatal, "Invalid node %s", node_name.c_str());
return core::Result::kFailure;
}
checkins_.emplace(node_name, time_.now());
}
return core::Result::kSuccess;
}

void HealthMonitor::publishTransitionRequest(state_machine::State state)
{
auto message_payload = std::make_shared<rapidjson::Document>();
message_payload->SetObject();
const std::string &state_str = state_machine::state_to_string.at(state);
rapidjson::Value requested_state(state_str.c_str(), message_payload->GetAllocator());
message_payload->AddMember("transition", requested_state, message_payload->GetAllocator());
const core::MqttMessage::Header header{time_.now(), core::MqttMessagePriority::kCritical};
const core::MqttMessage message{core::MqttTopic::kStateRequest, header, message_payload};
mqtt_->publish(message, kExactlyOnce);
}

} // namespace hyped::core
56 changes: 56 additions & 0 deletions lib/core/health_monitor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include "logger.hpp"
#include "mqtt.hpp"
#include "time.hpp"

#include <optional>
#include <string>
#include <unordered_map>

#include "state_machine/state.hpp"
#include <toml++/toml.hpp>

namespace hyped::core {

static constexpr Duration kCheckinTimeout = TimePoint::duration(1);
static constexpr Duration kStartupTimeout = TimePoint::duration(60);

/**
* @brief HealthMonitor tracks the status of all nodes running on the pod, and if any fail to check
* in within a certain period it will send a critical failure message. It also ensures all nodes
* have started up before the pod leaves the initialisation state.
*/
class HealthMonitor {
public:
static std::optional<HealthMonitor> create(ILogger &logger,
ITimeSource &time,
std::shared_ptr<core::IMqtt> mqtt,
toml::table &config);

HealthMonitor(core::ILogger &logger,
core::ITimeSource &time,
std::shared_ptr<core::IMqtt> mqtt,
std::vector<std::string> &nodes);

void run();

private:
/**
* @brief Checks if all nodes have checked in, then publishes a message saying we are safe to
* start
*/
core::Result startup();
/**
* @brief Updates the checkin times for all pending checkin messages
*/
core::Result processBatch();
void publishTransitionRequest(state_machine::State state);

core::ILogger &logger_;
core::ITimeSource &time_;
std::shared_ptr<core::IMqtt> mqtt_;
std::unordered_map<std::string, std::optional<TimePoint>> checkins_;
};

} // namespace hyped::core
6 changes: 4 additions & 2 deletions lib/core/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ mqtt::message_ptr Mqtt::messageToMessagePtr(const MqttMessage &message)
rapidjson::Document header;
header.SetObject();
header.AddMember("priority", message.header.priority, header.GetAllocator());
header.AddMember("timestamp", message.header.timestamp, header.GetAllocator());
const auto timestamp = message.header.timestamp.time_since_epoch().count();
header.AddMember("timestamp", timestamp, header.GetAllocator());
payload.AddMember("header", header, payload.GetAllocator());
rapidjson::Document payload_json;
payload_json.CopyFrom(*message.payload, payload_json.GetAllocator());
Expand Down Expand Up @@ -151,6 +152,7 @@ std::optional<MqttMessage> Mqtt::messagePtrToMessage(std::shared_ptr<const mqtt:
logger_.log(core::LogLevel::kFatal, "Failed to parse MQTT message: missing priority");
}
const auto timestamp = header["timestamp"].GetUint64();
const auto timepoint = TimePoint(std::chrono::nanoseconds(timestamp));
const auto priority = header["priority"].GetUint();
if (!message_contents_json.HasMember("payload")) {
logger_.log(core::LogLevel::kFatal, "Failed to parse MQTT message: missing payload");
Expand All @@ -162,7 +164,7 @@ std::optional<MqttMessage> Mqtt::messagePtrToMessage(std::shared_ptr<const mqtt:
auto mqtt_priority = static_cast<MqttMessagePriority>(priority);
std::shared_ptr mqtt_payload = std::make_shared<rapidjson::Document>();
mqtt_payload->CopyFrom(message_contents_json["payload"], mqtt_payload->GetAllocator());
MqttMessage::Header mqtt_header{timestamp, mqtt_priority};
MqttMessage::Header mqtt_header{timepoint, mqtt_priority};
return MqttMessage{mqtt_topic->second, mqtt_header, mqtt_payload};
}

Expand Down
3 changes: 2 additions & 1 deletion lib/core/mqtt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "logger.hpp"
#include "mqtt_topics.hpp"
#include "time.hpp"
#include "types.hpp"

#include <optional>
Expand All @@ -28,7 +29,7 @@ enum MqttMessageQos {
struct MqttMessage {
MqttTopic topic;
struct Header {
std::uint64_t timestamp;
TimePoint timestamp;
MqttMessagePriority priority;
};
Header header;
Expand Down
20 changes: 16 additions & 4 deletions lib/core/mqtt_logger.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#include "mqtt.hpp"
#include "mqtt_logger.hpp"

#include <cstdarg>
#include <cstdio>
#include <utility>

namespace hyped::core {

MqttLogger::MqttLogger(const char *const label,
const LogLevel level,
const core::ITimeSource &timer,
Expand All @@ -19,13 +22,22 @@ MqttLogger::MqttLogger(const char *const label,

void MqttLogger::log(const LogLevel level, const char *format, ...)
{
logger_.log(level, format);
const auto topic = MqttTopic::kTest;
char buffer[256]; // NOLINT
va_list args;
va_start(args, format);
vsprintf(buffer, format, args);
va_end(args);
logger_.log(level, "%s", buffer);
const auto topic = MqttTopic::kLog;
std::shared_ptr<rapidjson::Document> message_payload = std::make_shared<rapidjson::Document>();
message_payload->SetObject();
message_payload->AddMember("log", *format, message_payload->GetAllocator());
const MqttMessage::Header header{.timestamp = 0, .priority = MqttMessagePriority::kCritical};
rapidjson::Value log_string_value;
log_string_value.SetString(buffer, message_payload->GetAllocator());
message_payload->AddMember("log", log_string_value, message_payload->GetAllocator());
const MqttMessage::Header header{.timestamp = time_source_.now(),
.priority = MqttMessagePriority::kCritical};
const MqttMessage message{topic, header, message_payload};
mqtt_->publish(message, MqttMessageQos::kAtLeastOnce);
}

} // namespace hyped::core
17 changes: 13 additions & 4 deletions lib/core/mqtt_topics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,20 @@

namespace hyped::core {

enum class MqttTopic { kTest, kState };
enum class MqttTopic { kTest, kState, kLog, kNodeCheckin, kHeartbeat, kStateRequest };

const std::unordered_map<MqttTopic, std::string> mqtt_topic_to_string
= {{MqttTopic::kTest, "test"}, {MqttTopic::kState, "state"}};
= {{MqttTopic::kTest, "test"},
{MqttTopic::kState, "state"},
{MqttTopic::kLog, "log"},
{MqttTopic::kNodeCheckin, "node_checkin"},
{MqttTopic::kStateRequest, "state_request"}};

const std::unordered_map<std::string, MqttTopic> mqtt_string_to_topic
= {{"test", MqttTopic::kTest}, {"state", MqttTopic::kState}};
= {{"test", MqttTopic::kTest},
{"state", MqttTopic::kState},
{"log", MqttTopic::kLog},
{"node_checkin", MqttTopic::kNodeCheckin},
{"state_request", MqttTopic::kStateRequest}};

} // namespace hyped::core
} // namespace hyped::core
17 changes: 8 additions & 9 deletions lib/debug/commands/spi_commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ core::Result SpiCommands::addCommands(core::ILogger &logger, std::shared_ptr<Rep
const auto spi = std::move(*optional_spi);

std::uint16_t register_address = std::stoi(args[5], nullptr, 16);
std::uint8_t read_buffer;
const core::Result result = spi->read(register_address, &read_buffer, 1);
if (result == core::Result::kFailure) {
auto optional_read_byte = spi->read(register_address, 1);
if (!optional_read_byte) {
logger.log(core::LogLevel::kFatal, "Failed to read from SPI bus %d", bus);
return;
}
logger.log(core::LogLevel::kDebug, "SPI value from bus %d: %d", bus, read_buffer);
const auto read_byte = *optional_read_byte;
logger.log(core::LogLevel::kDebug, "SPI value from bus %d: %d", bus, read_byte[0]);
};
auto spi_read_byte_command = std::make_unique<Command>(spi_read_byte_command_name,
spi_read_byte_command_description,
Expand Down Expand Up @@ -66,11 +66,10 @@ core::Result SpiCommands::addCommands(core::ILogger &logger, std::shared_ptr<Rep
}
const auto spi = std::move(*optional_spi);

std::uint16_t register_address = std::stoi(args[5], nullptr, 16);
std::uint8_t data = std::stoi(args[6], nullptr, 16);

const auto *data_ptr = reinterpret_cast<const std::uint8_t *>(&data);
const core::Result result = spi->write(register_address, data_ptr, 1);
std::uint16_t register_address = std::stoi(args[5], nullptr, 16);
std::uint8_t data = std::stoi(args[6], nullptr, 16);
const std::vector<std::uint8_t> data_array = {data};
const core::Result result = spi->write(register_address, data_array);
if (result == core::Result::kFailure) {
logger.log(core::LogLevel::kFatal, "Failed to write to SPI bus: %d", bus);
return;
Expand Down
Loading
Loading