Skip to content

Commit

Permalink
Bug fixes! ESP32 BLIP now works!!
Browse files Browse the repository at this point in the history
- Stub versions of CoroLifecycle functions (with CROUTON_LIFECYCLES
  off) were returning some wrong values, leading to crashes. This was
  causing ESP32 to crash. (It also caused the regular test suite to
  crash, but I hadn't tried running it with lifecycle logging disabled
  in a while...)
- Fixed a race condition in Scheduler where an instance might not
  have an EventLoop in time.
- Added a mutex to ESPTCPSocket to deal with the concurrency of
  input arriving on a background thread.
- Log messages now identify the thread. Thread numbers are just
  allocated consecutively; they don't mean anything.
  • Loading branch information
snej committed Nov 26, 2023
1 parent 658a0da commit 96e257a
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 29 deletions.
4 changes: 2 additions & 2 deletions crouton.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@
277B65332ADA07E0006F053D /* sdkconfig */ = {isa = PBXFileReference; lastKnownFileType = text; path = sdkconfig; sourceTree = "<group>"; };
277B65362ADA07E0006F053D /* CMakeLists.txt */ = {isa = PBXFileReference; lastKnownFileType = text; path = CMakeLists.txt; sourceTree = "<group>"; };
277B6C122ADA07E3006F053D /* CMakeLists.txt */ = {isa = PBXFileReference; lastKnownFileType = text; path = CMakeLists.txt; sourceTree = "<group>"; };
277B6C132ADA07E3006F053D /* main.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = main.cc; sourceTree = "<group>"; };
277B6C132ADA07E3006F053D /* test_esp32.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = test_esp32.cc; sourceTree = "<group>"; };
277B6C142ADA0873006F053D /* ESPEventLoop.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = ESPEventLoop.cc; sourceTree = "<group>"; };
277B6C152ADDB6B7006F053D /* ESPTCPSocket.hh */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = ESPTCPSocket.hh; sourceTree = "<group>"; };
277B6C162ADDB6B7006F053D /* ESPTCPSocket.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = ESPTCPSocket.cc; sourceTree = "<group>"; };
Expand Down Expand Up @@ -1209,7 +1209,7 @@
children = (
277B6C122ADA07E3006F053D /* CMakeLists.txt */,
277B6C212ADDEEF3006F053D /* Kconfig.projbuild */,
277B6C132ADA07E3006F053D /* main.cc */,
277B6C132ADA07E3006F053D /* test_esp32.cc */,
);
path = main;
sourceTree = "<group>";
Expand Down
12 changes: 8 additions & 4 deletions include/crouton/CoroLifecycle.hh
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ namespace crouton {
inline void suspendInitial(coro_handle cur) { }
inline coro_handle suspendingTo(coro_handle cur,
std::type_info const& toType, const void* to,
coro_handle next = CORO_NS::noop_coroutine()) {return next;}
coro_handle next = CORO_NS::noop_coroutine()) {
return next ? next : CORO_NS::noop_coroutine();}
inline coro_handle suspendingTo(coro_handle cur,
coro_handle awaiting,
coro_handle next) {return next;}
inline coro_handle yieldingTo(coro_handle cur, coro_handle next, bool) {return next;}
inline coro_handle finalSuspend(coro_handle cur, coro_handle next) {return next;}
coro_handle next) {
return next ? next : CORO_NS::noop_coroutine();}
inline coro_handle yieldingTo(coro_handle cur, coro_handle next, bool) {
return next ? next : CORO_NS::noop_coroutine();}
inline coro_handle finalSuspend(coro_handle cur, coro_handle next) {
return next ? next : CORO_NS::noop_coroutine();}
inline void threw(coro_handle) { }
inline void returning(coro_handle) { }
inline void ended(coro_handle) { }
Expand Down
1 change: 1 addition & 0 deletions src/Scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ namespace crouton {
LSched->debug("suspend {}", logCoro{h});
precondition(isCurrent());
assert(!isReady(h));
(void)eventLoop(); // Must have an event loop in order to wake up (see _wakeUp)
auto [i, added] = _suspended->try_emplace(h.address(), h, this);
i->second._visible = true;
return Suspension(&i->second);
Expand Down
31 changes: 21 additions & 10 deletions src/io/esp32/ESPTCPSocket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ namespace crouton::io::esp {
Future<void> TCPSocket::close() {
if (_isOpen) {
LNet->info("Closing TCPSocket");
unique_lock lock(_mutex);
err_t err = tcp_close(_tcp);
_tcp = nullptr;
_isOpen = false;
Expand Down Expand Up @@ -149,7 +150,8 @@ namespace crouton::io::esp {
/// Low-level read method that points `_inputBuf` to the next input buffer.
Future<ConstBytes> TCPSocket::fillInputBuf() {
precondition(isOpen() && _inputBuf.empty());
//FIXME: Needs a mutex accessing _readBufs?

unique_lock lock(_mutex);
_readBlocker.reset();
if (_readBufs) {
// Clean up the pbuf I just completed:
Expand All @@ -162,40 +164,49 @@ namespace crouton::io::esp {
}

if (!_readBufs && !_readErr) {
// Wait for buffers to arrive:
// Wait for buffers to arrive. (Don't hold the mutex while waiting!)
LNet->debug("TCPSocket: waiting to receive data...");
lock.unlock();
AWAIT _readBlocker;
LNet->debug("...TCPSocket: received data");
assert(_readBufs || _readErr);
lock.lock();
LNet->debug("...TCPSocket: end wait");
}

assert(_readBufs || _readErr);
if (_readBufs) {
_inputBuf = {_readBufs->payload, _readBufs->len};
LNet->debug("TCPSocket: received {} bytes", _inputBuf.size());
RETURN _inputBuf;
} else if (_readErr == CroutonError::UnexpectedEOF) {
LNet->debug("TCPSocket: EOF, no data to read");
LNet->info("TCPSocket: EOF, no data to read");
RETURN ConstBytes{};
} else {
LNet->error("TCPSocket: error {}", _readErr);
RETURN _readErr;
}
}


int TCPSocket::_readCallback(::pbuf *pb, int err) {
// Warning: This is called on the lwip thread.
//FIXME: Needs a mutex accessing _readBufs?
unique_lock lock(_mutex);
LNet->debug("---- entering TCPSocket::_readCallback ...");
if (pb) {
LNet->debug("read completed, {} bytes", pb->tot_len);
LNet->debug(" read completed, {} bytes", pb->tot_len);
if (_readBufs == nullptr)
_readBufs = pb; // Note: I take over the reference to pb
else
pbuf_cat(_readBufs, pb);
} else if (err) {
_readErr = Error(LWIPError(err));
LNet->debug(" read completed: error {}", _readErr);
} else {
_readErr = err ? Error(LWIPError(err)) : Error(CroutonError::UnexpectedEOF);
LNet->error("read completed, error {}", _readErr);
_readErr = Error(CroutonError::UnexpectedEOF);
LNet->debug(" read completed: EOF", _readErr);
}
assert(_readBufs || _readErr);
_readBlocker.notify();
LNet->debug("---- ... exiting TCPSocket::_readCallback");
return ERR_OK;
}

Expand Down Expand Up @@ -233,7 +244,7 @@ namespace crouton::io::esp {

int TCPSocket::_writeCallback(uint16_t len) {
// Warning: This is called on the lwip thread.
LNet->debug("write completed, {} bytes", len);
LNet->debug("TCPSocket write completed, {} bytes", len);
_writeBlocker.notify();
return 0;
}
Expand Down
5 changes: 3 additions & 2 deletions src/io/esp32/ESPTCPSocket.hh
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ namespace crouton::io::esp {
ASYNC<ConstBytes> fillInputBuf();

tcp_pcb* _tcp;
bool _isOpen = false;

std::mutex _mutex;
ConstBytes _inputBuf;
pbuf* _readBufs = nullptr;
Error _readErr;

Blocker<void> _readBlocker;
Blocker<void> _writeBlocker;

bool _isOpen = false;
};

}
19 changes: 12 additions & 7 deletions src/support/MiniLogger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "crouton/io/Process.hh"
#include "support/StringUtils.hh"

#include <atomic>
#include <cstring>
#include <mutex>

Expand All @@ -43,11 +44,13 @@ namespace crouton::mini {
};


static std::mutex sLogMutex; // Thread-safety & prevents overlapping msgs
static std::mutex sLogMutex; // Thread-safety & prevents overlapping msgs
static std::vector<logger*>* sLoggers; // All registered Loggers
static logger::Sink sLogSink = nullptr; // Optional function to write messages
static const char* sEnvLevelsStr;
static logger::Sink sLogSink = nullptr; // Optional function to write messages
static const char* sEnvLevelsStr;

static std::atomic_int sThreadCounter = 0;
thread_local int tThreadID = ++sThreadCounter;



Expand Down Expand Up @@ -164,8 +167,8 @@ namespace crouton::mini {
else if (lvl == level::warn)
color = tty.yellow;

format_to(cerr, "{}{:06d}{} {}{}| <{}> ",
sTimeBuf, now.tv_nsec / 1000, tty.reset,
format_to(cerr, "{}{:06d} ⇅{:02}{} {}{}| <{}> ",
sTimeBuf, now.tv_nsec / 1000, tThreadID, tty.reset,
color, kLevelDisplayName[int(lvl)], _name);
}

Expand Down Expand Up @@ -224,18 +227,20 @@ namespace crouton::mini {
default: color = ""; break;
}
#if CONFIG_LOG_TIMESTAMP_SOURCE_RTOS
esp_log_write(kESPLevel[lvl], "Crouton", "%s%c (%4ld) <%s> %.*s%s\n",
esp_log_write(kESPLevel[lvl], "Crouton", "%s%c (%4ld) t%02d <%s> %.*s%s\n",
color,
kESPLevelChar[lvl],
esp_log_timestamp(),
tThreadID,
_name.c_str(),
int(msg.size()), msg.data(),
tty.reset);
#else
esp_log_write(kESPLevel[lvl], "Crouton", "%s%c (%s) <%s> %.*s%s\n",
esp_log_write(kESPLevel[lvl], "Crouton", "%s%c (%s) t%02d <%s> %.*s%s\n",
color,
kESPLevelChar[lvl],
esp_log_system_timestamp(),
tThreadID,
_name.c_str(),
int(msg.size()), msg.data(),
tty.reset);
Expand Down
2 changes: 1 addition & 1 deletion tests/ESP32/main/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
idf_component_register(
SRCS
"main.cc"
"test_esp32.cc"
PRIV_INCLUDE_DIRS
"../../../src/"
)
Expand Down
6 changes: 3 additions & 3 deletions tests/ESP32/main/main.cc → tests/ESP32/main/test_esp32.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ Task mainTask() {

printf("---------- TESTING CROUTON ----------\n\n");
esp_log_level_set("Crouton", ESP_LOG_VERBOSE);
log::logger::load_env_levels("Net=debug,BLIP=info,Coro=trace");
log::logger::load_env_levels("Net=debug,BLIP=debug");

#if 1
#if 0
Log->info("---------- Testing Generator");
{
Generator<int64_t> fib = fibonacci(100, true);
Expand Down Expand Up @@ -172,7 +172,7 @@ Task mainTask() {
#endif

Log->info("---------- Testing BLIP");
testBLIP().waitForResult();
AWAIT testBLIP();

Log->info("---------- End of tests");
postcondition(Scheduler::current().assertEmpty());
Expand Down

0 comments on commit 96e257a

Please sign in to comment.