Skip to content
Open
67 changes: 67 additions & 0 deletions docs/docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,32 @@ The `handshakeResponse` object contains the HTTP response that upgraded the conn

This information is particularly useful for debugging and monitoring WebSocket connections, as it provides access to the initial HTTP handshake response that established the WebSocket connection.

## `undici:websocket:created`

This message is published when a `WebSocket` instance is created, before the opening handshake is sent.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:created').subscribe(({ websocket, url }) => {
console.log(websocket) // the WebSocket instance
console.log(url) // serialized websocket URL
})
```

## `undici:websocket:handshakeRequest`

This message is published when the HTTP upgrade request is about to be sent.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:handshakeRequest').subscribe(({ websocket, request }) => {
console.log(websocket) // the WebSocket instance
console.log(request.headers) // handshake request headers assembled so far
})
```

## `undici:websocket:close`

This message is published after the connection has closed.
Expand All @@ -227,6 +253,47 @@ diagnosticsChannel.channel('undici:websocket:socket_error').subscribe((error) =>
})
```

## `undici:websocket:frameSent`

This message is published after a WebSocket frame is written to the socket.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:frameSent').subscribe(({ websocket, opcode, payloadData }) => {
console.log(websocket) // the WebSocket instance
console.log(opcode) // RFC 6455 opcode
console.log(payloadData) // unmasked payload bytes
})
```

## `undici:websocket:frameReceived`

This message is published after a WebSocket frame is parsed from the socket.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:frameReceived').subscribe(({ websocket, opcode, payloadData }) => {
console.log(websocket) // the WebSocket instance
console.log(opcode) // RFC 6455 opcode
console.log(payloadData) // payload bytes as received
})
```

## `undici:websocket:frameError`

This message is published when Undici rejects an invalid incoming WebSocket frame.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:frameError').subscribe(({ websocket, error }) => {
console.log(websocket) // the WebSocket instance
console.log(error.message)
})
```

## `undici:websocket:ping`

This message is published after the client receives a ping frame, if the connection is not closing.
Expand Down
38 changes: 35 additions & 3 deletions lib/core/diagnostics.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ const channels = {
trailers: diagnosticsChannel.channel('undici:request:trailers'),
error: diagnosticsChannel.channel('undici:request:error'),
// WebSocket
created: diagnosticsChannel.channel('undici:websocket:created'),
handshakeRequest: diagnosticsChannel.channel('undici:websocket:handshakeRequest'),
open: diagnosticsChannel.channel('undici:websocket:open'),
close: diagnosticsChannel.channel('undici:websocket:close'),
frameSent: diagnosticsChannel.channel('undici:websocket:frameSent'),
frameReceived: diagnosticsChannel.channel('undici:websocket:frameReceived'),
frameError: diagnosticsChannel.channel('undici:websocket:frameError'),
socketError: diagnosticsChannel.channel('undici:websocket:socket_error'),
ping: diagnosticsChannel.channel('undici:websocket:ping'),
pong: diagnosticsChannel.channel('undici:websocket:pong'),
Expand Down Expand Up @@ -166,15 +171,27 @@ function trackWebSocketEvents (debugLog = websocketDebuglog) {

// Check if any of the channels already have subscribers to prevent duplicate subscriptions
// This can happen when both Node.js built-in undici and undici as a dependency are present
if (channels.open.hasSubscribers || channels.close.hasSubscribers ||
channels.socketError.hasSubscribers || channels.ping.hasSubscribers ||
channels.pong.hasSubscribers) {
if (channels.created.hasSubscribers || channels.handshakeRequest.hasSubscribers ||
channels.open.hasSubscribers || channels.close.hasSubscribers ||
channels.frameSent.hasSubscribers || channels.frameReceived.hasSubscribers ||
channels.frameError.hasSubscribers || channels.socketError.hasSubscribers ||
channels.ping.hasSubscribers || channels.pong.hasSubscribers) {
isTrackingWebSocketEvents = true
return
}

isTrackingWebSocketEvents = true

diagnosticsChannel.subscribe('undici:websocket:created',
evt => {
debugLog('created websocket for %s', evt.url)
})

diagnosticsChannel.subscribe('undici:websocket:handshakeRequest',
evt => {
debugLog('sending opening handshake for %s', evt.websocket?.url ?? '<unknown>')
})

diagnosticsChannel.subscribe('undici:websocket:open',
evt => {
const {
Expand All @@ -194,6 +211,21 @@ function trackWebSocketEvents (debugLog = websocketDebuglog) {
)
})

diagnosticsChannel.subscribe('undici:websocket:frameSent',
evt => {
debugLog('frame sent opcode=%d bytes=%d', evt.opcode, evt.payloadData.length)
})

diagnosticsChannel.subscribe('undici:websocket:frameReceived',
evt => {
debugLog('frame received opcode=%d bytes=%d', evt.opcode, evt.payloadData.length)
})

diagnosticsChannel.subscribe('undici:websocket:frameError',
evt => {
debugLog('frame errored for %s - %s', evt.websocket?.url ?? '<unknown>', evt.error.message)
})

diagnosticsChannel.subscribe('undici:websocket:socket_error',
err => {
debugLog('connection errored - %s', err.message)
Expand Down
10 changes: 10 additions & 0 deletions lib/web/websocket/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const { getDecodeSplit } = require('../fetch/util')
const { WebsocketFrameSend } = require('./frame')
const assert = require('node:assert')
const { runtimeFeatures } = require('../../util/runtime-features')
const { channels } = require('../../core/diagnostics')

const crypto = runtimeFeatures.has('crypto')
? require('node:crypto')
Expand Down Expand Up @@ -89,6 +90,15 @@ function establishWebSocketConnection (url, protocols, client, handler, options)

// 11. Fetch request with useParallelQueue set to true, and
// processResponse given response being these steps:
if (channels.handshakeRequest.hasSubscribers) {
channels.handshakeRequest.publish({
websocket: handler.websocket,
request: {
headers: request.headersList.entries
}
})
}

const controller = fetching({
request,
useParallelQueue: true,
Expand Down
52 changes: 38 additions & 14 deletions lib/web/websocket/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const { failWebsocketConnection } = require('./connection')
const { WebsocketFrameSend } = require('./frame')
const { PerMessageDeflate } = require('./permessage-deflate')
const { MessageSizeExceededError } = require('../../core/errors')
const { channels } = require('../../core/diagnostics')

// This code was influenced by ws released under the MIT license.
// Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com>
Expand Down Expand Up @@ -97,12 +98,12 @@ class ByteParser extends Writable {
const rsv3 = buffer[0] & 0x10

if (!isValidOpcode(opcode)) {
failWebsocketConnection(this.#handler, 1002, 'Invalid opcode received')
this.failWebsocketConnection(1002, 'Invalid opcode received')
return callback()
}

if (masked) {
failWebsocketConnection(this.#handler, 1002, 'Frame cannot be masked')
this.failWebsocketConnection(1002, 'Frame cannot be masked')
return callback()
}

Expand All @@ -116,43 +117,43 @@ class ByteParser extends Writable {
// WebSocket connection where a PMCE is in use, this bit indicates
// whether a message is compressed or not.
if (rsv1 !== 0 && !this.#extensions.has('permessage-deflate')) {
failWebsocketConnection(this.#handler, 1002, 'Expected RSV1 to be clear.')
this.failWebsocketConnection(1002, 'Expected RSV1 to be clear.')
return
}

if (rsv2 !== 0 || rsv3 !== 0) {
failWebsocketConnection(this.#handler, 1002, 'RSV1, RSV2, RSV3 must be clear')
this.failWebsocketConnection(1002, 'RSV1, RSV2, RSV3 must be clear')
return
}

if (fragmented && !isTextBinaryFrame(opcode)) {
// Only text and binary frames can be fragmented
failWebsocketConnection(this.#handler, 1002, 'Invalid frame type was fragmented.')
this.failWebsocketConnection(1002, 'Invalid frame type was fragmented.')
return
}

// If we are already parsing a text/binary frame and do not receive either
// a continuation frame or close frame, fail the connection.
if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) {
failWebsocketConnection(this.#handler, 1002, 'Expected continuation frame')
this.failWebsocketConnection(1002, 'Expected continuation frame')
return
}

if (this.#info.fragmented && fragmented) {
// A fragmented frame can't be fragmented itself
failWebsocketConnection(this.#handler, 1002, 'Fragmented frame exceeded 125 bytes.')
this.failWebsocketConnection(1002, 'Fragmented frame exceeded 125 bytes.')
return
}

// "All control frames MUST have a payload length of 125 bytes or less
// and MUST NOT be fragmented."
if ((payloadLength > 125 || fragmented) && isControlFrame(opcode)) {
failWebsocketConnection(this.#handler, 1002, 'Control frame either too large or fragmented')
this.failWebsocketConnection(1002, 'Control frame either too large or fragmented')
return
}

if (isContinuationFrame(opcode) && this.#fragments.length === 0 && !this.#info.compressed) {
failWebsocketConnection(this.#handler, 1002, 'Unexpected continuation frame')
this.failWebsocketConnection(1002, 'Unexpected continuation frame')
return
}

Expand Down Expand Up @@ -199,7 +200,7 @@ class ByteParser extends Writable {
// https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275
// https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e
if (upper !== 0 || lower > 2 ** 31 - 1) {
failWebsocketConnection(this.#handler, 1009, 'Received payload length > 2^31 bytes.')
this.failWebsocketConnection(1009, 'Received payload length > 2^31 bytes.')
return
}

Expand All @@ -212,6 +213,14 @@ class ByteParser extends Writable {

const body = this.consume(this.#info.payloadLength)

if (channels.frameReceived.hasSubscribers) {
channels.frameReceived.publish({
websocket: this.#handler.websocket,
opcode: this.#info.opcode,
payloadData: Buffer.from(body)
})
}

if (isControlFrame(this.#info.opcode)) {
this.#loop = this.parseControlFrame(body)
this.#state = parserStates.INFO
Expand All @@ -231,9 +240,8 @@ class ByteParser extends Writable {
} else {
this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (error, data) => {
if (error) {
// Use 1009 (Message Too Big) for decompression size limit errors
const code = error instanceof MessageSizeExceededError ? 1009 : 1007
failWebsocketConnection(this.#handler, code, error.message)
this.failWebsocketConnection(code, error.message, error)
return
}

Expand Down Expand Up @@ -384,7 +392,7 @@ class ByteParser extends Writable {

if (opcode === opcodes.CLOSE) {
if (payloadLength === 1) {
failWebsocketConnection(this.#handler, 1002, 'Received close frame with a 1-byte body.')
this.failWebsocketConnection(1002, 'Received close frame with a 1-byte body.')
return false
}

Expand All @@ -393,7 +401,7 @@ class ByteParser extends Writable {
if (this.#info.closeInfo.error) {
const { code, reason } = this.#info.closeInfo

failWebsocketConnection(this.#handler, code, reason)
this.failWebsocketConnection(code, reason)
return false
}

Expand Down Expand Up @@ -448,6 +456,22 @@ class ByteParser extends Writable {
get closingInfo () {
return this.#info.closeInfo
}

publishFrameError (error) {
if (!channels.frameError.hasSubscribers) {
return
}

channels.frameError.publish({
websocket: this.#handler.websocket,
error
})
}

failWebsocketConnection (code, reason, error = new Error(reason)) {
this.publishFrameError(error)
failWebsocketConnection(this.#handler, code, reason)
}
}

module.exports = {
Expand Down
Loading
Loading