Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions src/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class BridgeClass {

struct k_mutex read_mutex{};
struct k_mutex write_mutex{};
struct k_mutex bridge_mutex{};

k_tid_t upd_tid{};
k_thread_stack_t *upd_stack_area{};
Expand All @@ -119,17 +120,27 @@ class BridgeClass {
serial_ptr = &serial;
}

operator bool() const {
return started;
operator bool() {
return is_started();
}

bool is_started() {
k_mutex_lock(&bridge_mutex, K_FOREVER);
bool out = started;
k_mutex_unlock(&bridge_mutex);
return out;
}

// Initialize the bridge
bool begin(unsigned long baud=DEFAULT_SERIAL_BAUD) {
serial_ptr->begin(baud);
transport = new SerialTransport(*serial_ptr);

k_mutex_init(&read_mutex);
k_mutex_init(&write_mutex);
k_mutex_init(&bridge_mutex);

if (is_started()) return true;

serial_ptr->begin(baud);
transport = new SerialTransport(*serial_ptr);

client = new RPCClient(*transport);
server = new RPCServer(*transport);
Expand All @@ -141,32 +152,29 @@ class BridgeClass {
NULL, NULL, NULL,
UPDATE_THREAD_PRIORITY, 0, K_NO_WAIT);

bool res;
call(RESET_METHOD).result(res);
if (res) {
started = true;
}
k_mutex_lock(&bridge_mutex, K_FOREVER);
bool res = false;
started = call(RESET_METHOD).result(res) && res;
k_mutex_unlock(&bridge_mutex);
return res;
}

template<typename F>
bool provide(const MsgPack::str_t& name, F&& func) {
k_mutex_lock(&bridge_mutex, K_FOREVER);
bool res;
if (!call(BIND_METHOD, name).result(res)) {
return false;
}
return server->bind(name, func);
bool out = call(BIND_METHOD, name).result(res) && res && server->bind(name, func);
k_mutex_unlock(&bridge_mutex);
return out;
}

template<typename F>
bool provide_safe(const MsgPack::str_t& name, F&& func) {
k_mutex_lock(&bridge_mutex, K_FOREVER);
bool res;
if (!call(BIND_METHOD, name).result(res)) {
return false;
}

return server->bind(name, func, "__safe__");

bool out = call(BIND_METHOD, name).result(res) && res && server->bind(name, func, "__safe__");
k_mutex_unlock(&bridge_mutex);
return out;
}

void update() {
Expand Down
56 changes: 35 additions & 21 deletions src/monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class BridgeMonitor: public Stream {
BridgeClass* bridge;
RingBufferN<BufferSize> temp_buffer;
struct k_mutex monitor_mutex{};
bool is_connected = false;
bool _connected = false;

public:
explicit BridgeMonitor(BridgeClass& bridge): bridge(&bridge) {}
Expand All @@ -40,15 +40,31 @@ class BridgeMonitor: public Stream {
bool begin(unsigned long _legacy_baud=0, uint16_t _legacy_config=0) {
k_mutex_init(&monitor_mutex);

if (is_connected()) return true;

bool bridge_started = (*bridge);
if (!bridge_started) {
bridge_started = bridge->begin();
}
return bridge_started && bridge->call(MON_CONNECTED_METHOD).result(is_connected);

if (!bridge_started) return false;

k_mutex_lock(&monitor_mutex, K_FOREVER);
bool out = false;
_connected = bridge->call(MON_CONNECTED_METHOD).result(out) && out;
k_mutex_unlock(&monitor_mutex);
return out;
}

explicit operator bool() const {
return is_connected;
bool is_connected() {
k_mutex_lock(&monitor_mutex, K_FOREVER);
bool out = _connected;
k_mutex_unlock(&monitor_mutex);
return out;
}

explicit operator bool() {
return is_connected();
}

int read() override {
Expand Down Expand Up @@ -78,12 +94,12 @@ class BridgeMonitor: public Stream {

int peek() override {
k_mutex_lock(&monitor_mutex, K_FOREVER);
int out = -1;
if (temp_buffer.available()) {
k_mutex_unlock(&monitor_mutex);
return temp_buffer.peek();
out = temp_buffer.peek();
}
k_mutex_unlock(&monitor_mutex);
return -1;
return out;
}

size_t write(uint8_t c) override {
Expand All @@ -100,43 +116,41 @@ class BridgeMonitor: public Stream {

size_t written;
const bool ret = bridge->call(MON_WRITE_METHOD, send_buffer).result(written);
if (ret) {
return written;
}

return 0;
return ret? written : 0;
}

bool reset() {
bool res;
bool ok = bridge->call(MON_RESET_METHOD).result(res);
if (ok && res) {
is_connected = false;
}
return (ok && res);
bool ok = bridge->call(MON_RESET_METHOD).result(res) && res;
k_mutex_lock(&monitor_mutex, K_FOREVER);
_connected = !ok;
k_mutex_unlock(&monitor_mutex);
return ok;
}

private:
void _read(size_t size) {

if (size == 0) return;

k_mutex_lock(&monitor_mutex, K_FOREVER);

MsgPack::arr_t<uint8_t> message;
RpcResult async_rpc = bridge->call(MON_READ_METHOD, size);

const bool ret = async_rpc.result(message);
const bool ret = _connected && async_rpc.result(message);

if (ret) {
k_mutex_lock(&monitor_mutex, K_FOREVER);
for (size_t i = 0; i < message.size(); ++i) {
temp_buffer.store_char(static_cast<char>(message[i]));
}
k_mutex_unlock(&monitor_mutex);
}

// if (async_rpc.error.code > NO_ERR) {
// is_connected = false;
// _connected = false;
// }

k_mutex_unlock(&monitor_mutex);
}

};
Expand Down
80 changes: 30 additions & 50 deletions src/tcp_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,73 +55,55 @@ class BridgeTCPClient : public Client {

int connect(const char *host, uint16_t port) override {

if (_connected) return 0;

String hostname = host;

k_mutex_lock(&client_mutex, K_FOREVER);

const bool resp = bridge->call(TCP_CONNECT_METHOD, hostname, port).result(connection_id);

if (!resp) {
_connected = false;
k_mutex_unlock(&client_mutex);
return -1;
}
_connected = true;
String hostname = host;
const bool ok = _connected || bridge->call(TCP_CONNECT_METHOD, hostname, port).result(connection_id);
_connected = ok;

k_mutex_unlock(&client_mutex);

return 0;
return ok? 0 : -1;
}

int connectSSL(const char *host, uint16_t port, const char *ca_cert) {

if (_connected) return 0;
k_mutex_lock(&client_mutex, K_FOREVER);

String hostname = host;
String ca_cert_str = ca_cert;

k_mutex_lock(&client_mutex, K_FOREVER);

const bool resp = bridge->call(TCP_CONNECT_SSL_METHOD, hostname, port, ca_cert_str).result(connection_id);

if (!resp) {
_connected = false;
k_mutex_unlock(&client_mutex);
return -1;
}
_connected = true;

const bool ok = _connected || bridge->call(TCP_CONNECT_SSL_METHOD, hostname, port, ca_cert_str).result(connection_id);
_connected = ok;
k_mutex_unlock(&client_mutex);
return 0;

return ok? 0 : -1;
}

uint32_t getId() const {
return connection_id;
uint32_t getId() {
k_mutex_lock(&client_mutex, K_FOREVER);
const uint32_t out = connection_id;
k_mutex_unlock(&client_mutex);
return out;
}

size_t write(uint8_t c) override {
return write(&c, 1);
}

size_t write(const uint8_t *buf, size_t size) override {
size_t write(const uint8_t *buffer, size_t size) override {

if (!_connected) return 0;
if (!connected()) return 0;

MsgPack::arr_t<uint8_t> payload;

for (size_t i = 0; i < size; ++i) {
payload.push_back(buf[i]);
payload.push_back(buffer[i]);
}

size_t written;
const bool ret = bridge->call(TCP_WRITE_METHOD, connection_id, payload).result(written);
if (ret) {
return written;
}

return 0;
const bool ok = bridge->call(TCP_WRITE_METHOD, connection_id, payload).result(written);
return ok? written : 0;
}

int available() override {
Expand Down Expand Up @@ -151,12 +133,12 @@ class BridgeTCPClient : public Client {

int peek() override {
k_mutex_lock(&client_mutex, K_FOREVER);
int out = -1;
if (temp_buffer.available()) {
k_mutex_unlock(&client_mutex);
return temp_buffer.peek();
out = temp_buffer.peek();
}
k_mutex_unlock(&client_mutex);
return -1;
return out;
}

void flush() override {
Expand All @@ -170,37 +152,35 @@ class BridgeTCPClient : public Client {
void stop() override {
k_mutex_lock(&client_mutex, K_FOREVER);
String msg;
const bool resp = bridge->call(TCP_CLOSE_METHOD, connection_id).result(msg);
if (resp) {
_connected = false;
if (_connected) {
_connected = !bridge->call(TCP_CLOSE_METHOD, connection_id).result(msg);
}
k_mutex_unlock(&client_mutex);
}

uint8_t connected() override {
if (_connected) return 1;
return 0;
k_mutex_lock(&client_mutex, K_FOREVER);
const uint8_t out = _connected? 1 : 0;
k_mutex_unlock(&client_mutex);
return out;
}

operator bool() override {
return available() || connected();
}

//friend class BridgeTCPServer;

using Print::write;

private:
void _read(size_t size) {

if (size == 0 || !_connected) return;
if (size == 0) return;

k_mutex_lock(&client_mutex, K_FOREVER);

MsgPack::arr_t<uint8_t> message;
RpcResult async_rpc = bridge->call(TCP_READ_METHOD, connection_id, size);

const bool ret = async_rpc.result(message);
const bool ret = _connected && async_rpc.result(message);

if (ret) {
for (size_t i = 0; i < message.size(); ++i) {
Expand Down
Loading