Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 131 additions & 10 deletions components/esp_websocket_client/esp_websocket_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,29 @@
return esp_event_loop_run(client->event_handle, 0);
}

/**
* @brief Abort the WebSocket connection and initiate reconnection or shutdown
*
* @param client WebSocket client handle
* @param error_type Type of error that caused the abort
*
* @return ESP_OK on success, ESP_FAIL on failure
*
* @note PRECONDITION: client->lock MUST be held by the calling thread before calling this function.
* This function does NOT acquire the lock itself. Calling without the lock will result in
* race conditions and undefined behavior.
*/
static esp_err_t esp_websocket_client_abort_connection(esp_websocket_client_handle_t client, esp_websocket_error_type_t error_type)
{
ESP_WS_CLIENT_STATE_CHECK(TAG, client, return ESP_FAIL);


if (client->state == WEBSOCKET_STATE_CLOSING || client->state == WEBSOCKET_STATE_UNKNOW ||
client->state == WEBSOCKET_STATE_WAIT_TIMEOUT) {
ESP_LOGW(TAG, "Connection already closing/closed, skipping abort");
goto cleanup;
}

esp_transport_close(client->transport);

if (!client->config->auto_reconnect) {
Expand All @@ -256,6 +276,18 @@
}
client->error_handle.error_type = error_type;
esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_DISCONNECTED, NULL, 0);

cleanup:
if (client->errormsg_buffer) {
ESP_LOGD(TAG, "Freeing error buffer (%d bytes) - Free heap: %" PRIu32 " bytes",
client->errormsg_size, esp_get_free_heap_size());
free(client->errormsg_buffer);
client->errormsg_buffer = NULL;
client->errormsg_size = 0;
} else {
ESP_LOGD(TAG, "Disconnect - Free heap: %" PRIu32 " bytes", esp_get_free_heap_size());
}

return ESP_OK;
}

Expand Down Expand Up @@ -453,6 +485,8 @@
esp_websocket_client_destroy_config(client);
if (client->transport_list) {
esp_transport_list_destroy(client->transport_list);
client->transport_list = NULL;
client->transport = NULL;
}
vSemaphoreDelete(client->lock);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
Expand Down Expand Up @@ -671,6 +705,11 @@
if (wlen < 0 || (wlen == 0 && need_write != 0)) {
ret = wlen;
esp_websocket_free_buf(client, true);

#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
#endif
esp_tls_error_handle_t error_handle = esp_transport_get_error_handle(client->transport);
if (error_handle) {
esp_websocket_client_error(client, "esp_transport_write() returned %d, transport_error=%s, tls_error_code=%i, tls_flags=%i, errno=%d",
Expand All @@ -679,8 +718,16 @@
} else {
esp_websocket_client_error(client, "esp_transport_write() returned %d, errno=%d", ret, errno);
}
ESP_LOGD(TAG, "Calling abort_connection due to send error");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
xSemaphoreGiveRecursive(client->lock);
return ret;
#else
// Already holding client->lock, safe to call
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
goto unlock_and_return;
#endif
}
opcode = 0;
widx += wlen;
Expand Down Expand Up @@ -1019,7 +1066,6 @@
esp_websocket_free_buf(client, false);
return ESP_OK;
}

esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_DATA, client->rx_buffer, rlen);

client->payload_offset += rlen;
Expand All @@ -1030,15 +1076,35 @@
const char *data = (client->payload_len == 0) ? NULL : client->rx_buffer;
ESP_LOGD(TAG, "Sending PONG with payload len=%d", client->payload_len);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->lock); // Release client->lock

// Now acquire tx_lock with timeout (consistent with PING/CLOSE handling)
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
return ESP_FAIL;
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout for PONG", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY); // Re-acquire client->lock before returning
esp_websocket_free_buf(client, false); // Free rx_buffer to prevent memory leak
return ESP_OK; // Return gracefully, caller expects client->lock to be held
}
#endif

// Re-acquire client->lock to maintain consistency
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);


// Another thread may have closed it while we didn't hold client->lock
if (client->state == WEBSOCKET_STATE_CLOSING || client->state == WEBSOCKET_STATE_UNKNOW ||
client->state == WEBSOCKET_STATE_WAIT_TIMEOUT || client->transport == NULL) {
ESP_LOGW(TAG, "Transport closed while preparing PONG, skipping send");
xSemaphoreGiveRecursive(client->tx_lock);
esp_websocket_free_buf(client, false); // Free rx_buffer to prevent memory leak
return ESP_OK; // Caller expects client->lock to be held, which it is
}

esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PONG | WS_TRANSPORT_OPCODES_FIN, data, client->payload_len,
client->config->network_timeout_ms);
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
xSemaphoreGiveRecursive(client->tx_lock);
#else
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PONG | WS_TRANSPORT_OPCODES_FIN, data, client->payload_len,

Check warning

Code scanning / clang-tidy

The value '138' provided to the cast expression is not in the valid range of values for 'ws_transport_opcodes' [clang-analyzer-optin.core.EnumCastOutOfRange] Warning

The value '138' provided to the cast expression is not in the valid range of values for 'ws_transport_opcodes' [clang-analyzer-optin.core.EnumCastOutOfRange]
client->config->network_timeout_ms);
#endif
} else if (client->last_opcode == WS_TRANSPORT_OPCODES_PONG) {
client->wait_for_pong_resp = false;
Expand Down Expand Up @@ -1136,7 +1202,32 @@
client->state = WEBSOCKET_STATE_CONNECTED;
client->wait_for_pong_resp = false;
client->error_handle.error_type = WEBSOCKET_ERROR_TYPE_NONE;
client->payload_len = 0;
client->payload_offset = 0;
client->last_fin = false;
client->last_opcode = WS_TRANSPORT_OPCODES_NONE;

esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_CONNECTED, NULL, 0);

// Check if there is data pending to be read (e.g. piggybacked with handshake)
if (esp_transport_poll_read(client->transport, 0) > 0) {
esp_err_t recv_result = esp_websocket_client_recv(client);
if (recv_result == ESP_OK) {
xSemaphoreGiveRecursive(client->lock);
esp_event_loop_run(client->event_handle, 0);
if (xSemaphoreTakeRecursive(client->lock, lock_timeout) != pdPASS) {
ESP_LOGE(TAG, "Failed to re-acquire lock after event loop within timeout, retrying with portMAX_DELAY");
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
}
if (client->state != WEBSOCKET_STATE_CONNECTED || client->transport == NULL) {
ESP_LOGD(TAG, "Connection state changed during handshake data processing");
break;
}
} else if (recv_result == ESP_FAIL) {
ESP_LOGE(TAG, "Error receive data during initial connection");
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
}
}
break;
case WEBSOCKET_STATE_CONNECTED:
if ((CLOSE_FRAME_SENT_BIT & xEventGroupGetBits(client->status_bits)) == 0) { // only send and check for PING
Expand All @@ -1145,8 +1236,23 @@
client->ping_tick_ms = _tick_get_ms();
ESP_LOGD(TAG, "Sending PING...");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
// Release client->lock first to avoid deadlock with send error path
xSemaphoreGiveRecursive(client->lock);

// Now acquire tx_lock with timeout (consistent with PONG handling)
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout for PING", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY); // Re-acquire client->lock before break
break;
}

// Re-acquire client->lock to check state
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);

// Another thread may have closed it while we didn't hold client->lock
if (client->state != WEBSOCKET_STATE_CONNECTED || client->transport == NULL) {
ESP_LOGW(TAG, "Transport closed while preparing PING, skipping send");
xSemaphoreGiveRecursive(client->tx_lock);
break;
}
#endif
Expand Down Expand Up @@ -1182,8 +1288,23 @@
if ((CLOSE_FRAME_SENT_BIT & xEventGroupGetBits(client->status_bits)) == 0) {
ESP_LOGD(TAG, "Closing initiated by the server, sending close frame");
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
// Release client->lock first to avoid deadlock with send error path
xSemaphoreGiveRecursive(client->lock);

// Now acquire tx_lock with timeout (consistent with PONG/PING handling)
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout for CLOSE", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY); // Re-acquire client->lock before break
break;
}

// Re-acquire client->lock to check state
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);

// Another thread may have closed it while we didn't hold client->lock
if (client->state != WEBSOCKET_STATE_CLOSING || client->transport == NULL) {
ESP_LOGW(TAG, "Transport closed while preparing CLOSE frame, skipping send");
xSemaphoreGiveRecursive(client->tx_lock);
break;
}
#endif
Expand All @@ -1202,6 +1323,7 @@
if (WEBSOCKET_STATE_CONNECTED == client->state) {
read_select = esp_transport_poll_read(client->transport, 1000); //Poll every 1000ms
if (read_select < 0) {
xSemaphoreTakeRecursive(client->lock, lock_timeout);
esp_tls_error_handle_t error_handle = esp_transport_get_error_handle(client->transport);
if (error_handle) {
esp_websocket_client_error(client, "esp_transport_poll_read() returned %d, transport_error=%s, tls_error_code=%i, tls_flags=%i, errno=%d",
Expand All @@ -1210,16 +1332,15 @@
} else {
esp_websocket_client_error(client, "esp_transport_poll_read() returned %d, errno=%d", read_select, errno);
}
xSemaphoreTakeRecursive(client->lock, lock_timeout);
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
xSemaphoreGiveRecursive(client->lock);
} else if (read_select > 0) {
xSemaphoreTakeRecursive(client->lock, lock_timeout);
if (esp_websocket_client_recv(client) == ESP_FAIL) {
ESP_LOGE(TAG, "Error receive data");
xSemaphoreTakeRecursive(client->lock, lock_timeout);
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
xSemaphoreGiveRecursive(client->lock);
}
xSemaphoreGiveRecursive(client->lock);
} else {
ESP_LOGV(TAG, "Read poll timeout: skipping esp_transport_poll_read().");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CONFIG_IDF_TARGET="esp32"
CONFIG_IDF_TARGET_LINUX=n
CONFIG_WEBSOCKET_URI_FROM_STDIN=n
CONFIG_WEBSOCKET_URI_FROM_STRING=y
CONFIG_EXAMPLE_CONNECT_ETHERNET=y
CONFIG_EXAMPLE_CONNECT_WIFI=n
CONFIG_EXAMPLE_USE_INTERNAL_ETHERNET=y
CONFIG_EXAMPLE_ETH_PHY_IP101=y
CONFIG_EXAMPLE_ETH_MDC_GPIO=23
CONFIG_EXAMPLE_ETH_MDIO_GPIO=18
CONFIG_EXAMPLE_ETH_PHY_RST_GPIO=5
CONFIG_EXAMPLE_ETH_PHY_ADDR=1
CONFIG_EXAMPLE_CONNECT_IPV6=y
CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK=y
CONFIG_ESP_WS_CLIENT_TX_LOCK_TIMEOUT_MS=2000
Loading