Skip to content

Commit f2dba00

Browse files
committed
fix(websocket): Fix race conditions, memory leak, and data loss
- Add state check in abort_connection to prevent double-close - Fix memory leak: free errormsg_buffer on disconnect - Reset connection state on reconnect to prevent stale data - Implement lock ordering for separate TX lock mode - Read buffered data immediately after connection to prevent data loss - Added sdkconfig.ci.tx_lock config
1 parent 9e0bcd4 commit f2dba00

File tree

2 files changed

+143
-10
lines changed

2 files changed

+143
-10
lines changed

components/esp_websocket_client/esp_websocket_client.c

Lines changed: 128 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -241,9 +241,29 @@ static esp_err_t esp_websocket_client_dispatch_event(esp_websocket_client_handle
241241
return esp_event_loop_run(client->event_handle, 0);
242242
}
243243

244+
/**
245+
* @brief Abort the WebSocket connection and initiate reconnection or shutdown
246+
*
247+
* @param client WebSocket client handle
248+
* @param error_type Type of error that caused the abort
249+
*
250+
* @return ESP_OK on success, ESP_FAIL on failure
251+
*
252+
* @note PRECONDITION: client->lock MUST be held by the calling thread before calling this function.
253+
* This function does NOT acquire the lock itself. Calling without the lock will result in
254+
* race conditions and undefined behavior.
255+
*/
244256
static esp_err_t esp_websocket_client_abort_connection(esp_websocket_client_handle_t client, esp_websocket_error_type_t error_type)
245257
{
246258
ESP_WS_CLIENT_STATE_CHECK(TAG, client, return ESP_FAIL);
259+
260+
261+
if (client->state == WEBSOCKET_STATE_CLOSING || client->state == WEBSOCKET_STATE_UNKNOW ||
262+
client->state == WEBSOCKET_STATE_WAIT_TIMEOUT) {
263+
ESP_LOGW(TAG, "Connection already closing/closed, skipping abort");
264+
goto cleanup;
265+
}
266+
247267
esp_transport_close(client->transport);
248268

249269
if (!client->config->auto_reconnect) {
@@ -256,6 +276,18 @@ static esp_err_t esp_websocket_client_abort_connection(esp_websocket_client_hand
256276
}
257277
client->error_handle.error_type = error_type;
258278
esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_DISCONNECTED, NULL, 0);
279+
280+
cleanup:
281+
if (client->errormsg_buffer) {
282+
ESP_LOGD(TAG, "Freeing error buffer (%d bytes) - Free heap: %" PRIu32 " bytes",
283+
client->errormsg_size, esp_get_free_heap_size());
284+
free(client->errormsg_buffer);
285+
client->errormsg_buffer = NULL;
286+
client->errormsg_size = 0;
287+
} else {
288+
ESP_LOGD(TAG, "Disconnect - Free heap: %" PRIu32 " bytes", esp_get_free_heap_size());
289+
}
290+
259291
return ESP_OK;
260292
}
261293

@@ -453,6 +485,8 @@ static void destroy_and_free_resources(esp_websocket_client_handle_t client)
453485
esp_websocket_client_destroy_config(client);
454486
if (client->transport_list) {
455487
esp_transport_list_destroy(client->transport_list);
488+
client->transport_list = NULL;
489+
client->transport = NULL;
456490
}
457491
vSemaphoreDelete(client->lock);
458492
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
@@ -671,6 +705,11 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
671705
if (wlen < 0 || (wlen == 0 && need_write != 0)) {
672706
ret = wlen;
673707
esp_websocket_free_buf(client, true);
708+
709+
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
710+
xSemaphoreGiveRecursive(client->tx_lock);
711+
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
712+
#endif
674713
esp_tls_error_handle_t error_handle = esp_transport_get_error_handle(client->transport);
675714
if (error_handle) {
676715
esp_websocket_client_error(client, "esp_transport_write() returned %d, transport_error=%s, tls_error_code=%i, tls_flags=%i, errno=%d",
@@ -679,8 +718,16 @@ static int esp_websocket_client_send_with_exact_opcode(esp_websocket_client_hand
679718
} else {
680719
esp_websocket_client_error(client, "esp_transport_write() returned %d, errno=%d", ret, errno);
681720
}
721+
ESP_LOGD(TAG, "Calling abort_connection due to send error");
722+
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
723+
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
724+
xSemaphoreGiveRecursive(client->lock);
725+
return ret;
726+
#else
727+
// Already holding client->lock, safe to call
682728
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
683729
goto unlock_and_return;
730+
#endif
684731
}
685732
opcode = 0;
686733
widx += wlen;
@@ -1019,7 +1066,6 @@ static esp_err_t esp_websocket_client_recv(esp_websocket_client_handle_t client)
10191066
esp_websocket_free_buf(client, false);
10201067
return ESP_OK;
10211068
}
1022-
10231069
esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_DATA, client->rx_buffer, rlen);
10241070

10251071
client->payload_offset += rlen;
@@ -1030,15 +1076,35 @@ static esp_err_t esp_websocket_client_recv(esp_websocket_client_handle_t client)
10301076
const char *data = (client->payload_len == 0) ? NULL : client->rx_buffer;
10311077
ESP_LOGD(TAG, "Sending PONG with payload len=%d", client->payload_len);
10321078
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
1079+
xSemaphoreGiveRecursive(client->lock); // Release client->lock
1080+
1081+
// Now acquire tx_lock with timeout (consistent with PING/CLOSE handling)
10331082
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
1034-
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
1035-
return ESP_FAIL;
1083+
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout for PONG", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
1084+
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY); // Re-acquire client->lock before returning
1085+
esp_websocket_free_buf(client, false); // Free rx_buffer to prevent memory leak
1086+
return ESP_OK; // Return gracefully, caller expects client->lock to be held
10361087
}
1037-
#endif
1088+
1089+
// Re-acquire client->lock to maintain consistency
1090+
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
1091+
1092+
1093+
// Another thread may have closed it while we didn't hold client->lock
1094+
if (client->state == WEBSOCKET_STATE_CLOSING || client->state == WEBSOCKET_STATE_UNKNOW ||
1095+
client->state == WEBSOCKET_STATE_WAIT_TIMEOUT || client->transport == NULL) {
1096+
ESP_LOGW(TAG, "Transport closed while preparing PONG, skipping send");
1097+
xSemaphoreGiveRecursive(client->tx_lock);
1098+
esp_websocket_free_buf(client, false); // Free rx_buffer to prevent memory leak
1099+
return ESP_OK; // Caller expects client->lock to be held, which it is
1100+
}
1101+
10381102
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PONG | WS_TRANSPORT_OPCODES_FIN, data, client->payload_len,
10391103
client->config->network_timeout_ms);
1040-
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
10411104
xSemaphoreGiveRecursive(client->tx_lock);
1105+
#else
1106+
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PONG | WS_TRANSPORT_OPCODES_FIN, data, client->payload_len,
1107+
client->config->network_timeout_ms);
10421108
#endif
10431109
} else if (client->last_opcode == WS_TRANSPORT_OPCODES_PONG) {
10441110
client->wait_for_pong_resp = false;
@@ -1136,7 +1202,28 @@ static void esp_websocket_client_task(void *pv)
11361202
client->state = WEBSOCKET_STATE_CONNECTED;
11371203
client->wait_for_pong_resp = false;
11381204
client->error_handle.error_type = WEBSOCKET_ERROR_TYPE_NONE;
1205+
client->payload_len = 0;
1206+
client->payload_offset = 0;
1207+
client->last_fin = false;
1208+
client->last_opcode = WS_TRANSPORT_OPCODES_NONE;
1209+
11391210
esp_websocket_client_dispatch_event(client, WEBSOCKET_EVENT_CONNECTED, NULL, 0);
1211+
esp_err_t recv_result = esp_websocket_client_recv(client);
1212+
if (recv_result == ESP_OK) {
1213+
xSemaphoreGiveRecursive(client->lock);
1214+
esp_event_loop_run(client->event_handle, 0);
1215+
if (xSemaphoreTakeRecursive(client->lock, lock_timeout) != pdPASS) {
1216+
ESP_LOGE(TAG, "Failed to re-acquire lock after event loop within timeout, retrying with portMAX_DELAY");
1217+
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
1218+
}
1219+
if (client->state != WEBSOCKET_STATE_CONNECTED || client->transport == NULL) {
1220+
ESP_LOGD(TAG, "Connection state changed during handshake data processing");
1221+
break;
1222+
}
1223+
} else if (recv_result == ESP_FAIL) {
1224+
ESP_LOGE(TAG, "Error receive data during initial connection");
1225+
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
1226+
}
11401227
break;
11411228
case WEBSOCKET_STATE_CONNECTED:
11421229
if ((CLOSE_FRAME_SENT_BIT & xEventGroupGetBits(client->status_bits)) == 0) { // only send and check for PING
@@ -1145,8 +1232,23 @@ static void esp_websocket_client_task(void *pv)
11451232
client->ping_tick_ms = _tick_get_ms();
11461233
ESP_LOGD(TAG, "Sending PING...");
11471234
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
1235+
// Release client->lock first to avoid deadlock with send error path
1236+
xSemaphoreGiveRecursive(client->lock);
1237+
1238+
// Now acquire tx_lock with timeout (consistent with PONG handling)
11481239
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
1149-
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
1240+
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout for PING", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
1241+
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY); // Re-acquire client->lock before break
1242+
break;
1243+
}
1244+
1245+
// Re-acquire client->lock to check state
1246+
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
1247+
1248+
// Another thread may have closed it while we didn't hold client->lock
1249+
if (client->state != WEBSOCKET_STATE_CONNECTED || client->transport == NULL) {
1250+
ESP_LOGW(TAG, "Transport closed while preparing PING, skipping send");
1251+
xSemaphoreGiveRecursive(client->tx_lock);
11501252
break;
11511253
}
11521254
#endif
@@ -1182,8 +1284,23 @@ static void esp_websocket_client_task(void *pv)
11821284
if ((CLOSE_FRAME_SENT_BIT & xEventGroupGetBits(client->status_bits)) == 0) {
11831285
ESP_LOGD(TAG, "Closing initiated by the server, sending close frame");
11841286
#ifdef CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK
1287+
// Release client->lock first to avoid deadlock with send error path
1288+
xSemaphoreGiveRecursive(client->lock);
1289+
1290+
// Now acquire tx_lock with timeout (consistent with PONG/PING handling)
11851291
if (xSemaphoreTakeRecursive(client->tx_lock, WEBSOCKET_TX_LOCK_TIMEOUT_MS) != pdPASS) {
1186-
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
1292+
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout for CLOSE", WEBSOCKET_TX_LOCK_TIMEOUT_MS);
1293+
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY); // Re-acquire client->lock before break
1294+
break;
1295+
}
1296+
1297+
// Re-acquire client->lock to check state
1298+
xSemaphoreTakeRecursive(client->lock, portMAX_DELAY);
1299+
1300+
// Another thread may have closed it while we didn't hold client->lock
1301+
if (client->state != WEBSOCKET_STATE_CLOSING || client->transport == NULL) {
1302+
ESP_LOGW(TAG, "Transport closed while preparing CLOSE frame, skipping send");
1303+
xSemaphoreGiveRecursive(client->tx_lock);
11871304
break;
11881305
}
11891306
#endif
@@ -1202,6 +1319,7 @@ static void esp_websocket_client_task(void *pv)
12021319
if (WEBSOCKET_STATE_CONNECTED == client->state) {
12031320
read_select = esp_transport_poll_read(client->transport, 1000); //Poll every 1000ms
12041321
if (read_select < 0) {
1322+
xSemaphoreTakeRecursive(client->lock, lock_timeout);
12051323
esp_tls_error_handle_t error_handle = esp_transport_get_error_handle(client->transport);
12061324
if (error_handle) {
12071325
esp_websocket_client_error(client, "esp_transport_poll_read() returned %d, transport_error=%s, tls_error_code=%i, tls_flags=%i, errno=%d",
@@ -1210,16 +1328,16 @@ static void esp_websocket_client_task(void *pv)
12101328
} else {
12111329
esp_websocket_client_error(client, "esp_transport_poll_read() returned %d, errno=%d", read_select, errno);
12121330
}
1213-
xSemaphoreTakeRecursive(client->lock, lock_timeout);
12141331
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
12151332
xSemaphoreGiveRecursive(client->lock);
12161333
} else if (read_select > 0) {
1334+
xSemaphoreTakeRecursive(client->lock, lock_timeout);
12171335
if (esp_websocket_client_recv(client) == ESP_FAIL) {
12181336
ESP_LOGE(TAG, "Error receive data");
1219-
xSemaphoreTakeRecursive(client->lock, lock_timeout);
1337+
// Note: Already holding client->lock from line above
12201338
esp_websocket_client_abort_connection(client, WEBSOCKET_ERROR_TYPE_TCP_TRANSPORT);
1221-
xSemaphoreGiveRecursive(client->lock);
12221339
}
1340+
xSemaphoreGiveRecursive(client->lock);
12231341
} else {
12241342
ESP_LOGV(TAG, "Read poll timeout: skipping esp_transport_poll_read().");
12251343
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
CONFIG_IDF_TARGET="esp32"
2+
CONFIG_IDF_TARGET_LINUX=n
3+
CONFIG_WEBSOCKET_URI_FROM_STDIN=n
4+
CONFIG_WEBSOCKET_URI_FROM_STRING=y
5+
CONFIG_EXAMPLE_CONNECT_ETHERNET=y
6+
CONFIG_EXAMPLE_CONNECT_WIFI=n
7+
CONFIG_EXAMPLE_USE_INTERNAL_ETHERNET=y
8+
CONFIG_EXAMPLE_ETH_PHY_IP101=y
9+
CONFIG_EXAMPLE_ETH_MDC_GPIO=23
10+
CONFIG_EXAMPLE_ETH_MDIO_GPIO=18
11+
CONFIG_EXAMPLE_ETH_PHY_RST_GPIO=5
12+
CONFIG_EXAMPLE_ETH_PHY_ADDR=1
13+
CONFIG_EXAMPLE_CONNECT_IPV6=y
14+
CONFIG_ESP_WS_CLIENT_SEPARATE_TX_LOCK=y
15+
CONFIG_ESP_WS_CLIENT_TX_LOCK_TIMEOUT_MS=2000

0 commit comments

Comments
 (0)