Skip to content

Commit

Permalink
Add longpoll fallback and make LP enabled by default
Browse files Browse the repository at this point in the history
Closes #5672
  • Loading branch information
chrismccord committed Jan 5, 2024
1 parent dec89f2 commit 29a67e8
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 17 deletions.
3 changes: 2 additions & 1 deletion assets/js/phoenix/longpoll.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ export default class LongPoll {
this.onclose = function (){ } // noop
this.pollEndpoint = this.normalizeEndpoint(endPoint)
this.readyState = SOCKET_STATES.connecting
this.poll()
// we must wait for the caller to finish setting up our callbacks and timeout properties
setTimeout(() => this.poll(), 0)
}

normalizeEndpoint(endPoint){
Expand Down
106 changes: 92 additions & 14 deletions assets/js/phoenix/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ import Timer from "./timer"
* @param {Object} [opts] - Optional configuration
* @param {Function} [opts.transport] - The Websocket Transport, for example WebSocket or Phoenix.LongPoll.
*
* Defaults to WebSocket with automatic LongPoll fallback.
* Defaults to WebSocket with automatic LongPoll fallback if WebSocket is not defined.
* To fallback to LongPoll when WebSocket attempts fail, use `longPollFallbackMs: 2500`.
*
* @param {Function} [opts.longPollFallbackMs] - The millisecond time to attempt the primary transport
* before falling back to the LongPoll transport. Disabled by default.
*
* @param {Function} [opts.debug] - When true, enables debug logging. Default false.
*
* @param {Function} [opts.encode] - The function to encode outgoing messages.
*
* Defaults to JSON encoder.
Expand Down Expand Up @@ -86,6 +93,19 @@ import Timer from "./timer"
* @param {vsn} [opts.vsn] - The serializer's protocol version to send on connect.
*
* Defaults to DEFAULT_VSN.
*
* @param {Object} [opts.sessionStorage] - An optional Storage compatible object
* Phoenix uses sessionStorage for longpoll fallback history. Overriding the store is
* useful when Phoenix won't have access to `sessionStorage`. For example, This could
* happen if a site loads a cross-domain channel in an iframe. Example usage:
*
* class InMemoryStorage {
* constructor() { this.storage = {} }
* getItem(keyName) { return this.storage[keyName] || null }
* removeItem(keyName) { delete this.storage[keyName] }
* setItem(keyName, keyValue) { this.storage[keyName] = keyValue }
* }
*
*/
export default class Socket {
constructor(endPoint, opts = {}){
Expand All @@ -95,6 +115,9 @@ export default class Socket {
this.ref = 0
this.timeout = opts.timeout || DEFAULT_TIMEOUT
this.transport = opts.transport || global.WebSocket || LongPoll
this.longPollFallbackMs = opts.longPollFallbackMs
this.fallbackTimer = null
this.sessionStore = opts.sessionStorage || global.sessionStorage
this.establishedConnections = 0
this.defaultEncoder = Serializer.encode.bind(Serializer)
this.defaultDecoder = Serializer.decode.bind(Serializer)
Expand Down Expand Up @@ -139,6 +162,9 @@ export default class Socket {
}
}
this.logger = opts.logger || null
if(!this.logger && opts.debug){
this.logger = (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) }
}
this.longpollerTimeout = opts.longpollerTimeout || 20000
this.params = closure(opts.params || {})
this.endPoint = `${endPoint}/${TRANSPORTS.websocket}`
Expand All @@ -165,8 +191,8 @@ export default class Socket {
replaceTransport(newTransport){
this.connectClock++
this.closeWasClean = true
clearTimeout(this.fallbackTimer)
this.reconnectTimer.reset()
this.sendBuffer = []
if(this.conn){
this.conn.close()
this.conn = null
Expand Down Expand Up @@ -207,6 +233,7 @@ export default class Socket {
disconnect(callback, code, reason){
this.connectClock++
this.closeWasClean = true
clearTimeout(this.fallbackTimer)
this.reconnectTimer.reset()
this.teardown(callback, code, reason)
}
Expand All @@ -224,16 +251,11 @@ export default class Socket {
this.params = closure(params)
}
if(this.conn){ return }

this.connectClock++
this.closeWasClean = false
this.conn = new this.transport(this.endPointURL())
this.conn.binaryType = this.binaryType
this.conn.timeout = this.longpollerTimeout
this.conn.onopen = () => this.onConnOpen()
this.conn.onerror = error => this.onConnError(error)
this.conn.onmessage = event => this.onConnMessage(event)
this.conn.onclose = event => this.onConnClose(event)
if(this.longPollFallbackMs && this.transport !== LongPoll){
this.connectWithFallback(LongPoll, this.longPollFallbackMs)
} else {
this.transportConnect()
}
}

/**
Expand All @@ -242,7 +264,7 @@ export default class Socket {
* @param {string} msg
* @param {Object} data
*/
log(kind, msg, data){ this.logger(kind, msg, data) }
log(kind, msg, data){ this.logger && this.logger(kind, msg, data) }

/**
* Returns true if a logger has been set on this socket.
Expand Down Expand Up @@ -319,13 +341,69 @@ export default class Socket {
* @private
*/

transportConnect(){
this.connectClock++
this.closeWasClean = false
this.conn = new this.transport(this.endPointURL())
this.conn.binaryType = this.binaryType
this.conn.timeout = this.longpollerTimeout
this.conn.onopen = () => this.onConnOpen()
this.conn.onerror = error => this.onConnError(error)
this.conn.onmessage = event => this.onConnMessage(event)
this.conn.onclose = event => this.onConnClose(event)
}

getSession(key){ return this.sessionStore && this.sessionStore.getItem(key) }

storeSession(key, val){ this.sessionStore && this.sessionStore.setItem(key, val) }

connectWithFallback(fallbackTransport, fallbackThreshold = 2500){
clearTimeout(this.fallbackTimer)
let established = false
let primaryTransport = true
let openRef, errorRef
let fallback = (reason) => {
this.log("transport", `falling back to ${fallbackTransport.name}...`, reason)
this.off([openRef, errorRef])
primaryTransport = false
this.storeSession("phx:longpoll", "true")
this.replaceTransport(fallbackTransport)
this.transportConnect()
}
if(this.getSession("phx:longpoll")){ return fallback("memorized") }

this.fallbackTimer = setTimeout(fallback, fallbackThreshold)

errorRef = this.onError(reason => {
this.log("transport", "error", reason)
if(primaryTransport && !established) {
clearTimeout(this.fallbackTimer)
fallback(reason)
}
})
this.onOpen(() => {
established = true
if(!primaryTransport){
return console.log("transport", `established ${fallbackTransport.name} fallback`)
}
// if we've established primary, give the fallback a new period to attempt ping
clearTimeout(this.fallbackTimer)
this.fallbackTimer = setTimeout(fallback, fallbackThreshold)
this.ping(rtt => {
this.log("transport", "connected to primary after", rtt)
clearTimeout(this.fallbackTimer)
})
})
this.transportConnect()
}

clearHeartbeats(){
clearTimeout(this.heartbeatTimer)
clearTimeout(this.heartbeatTimeoutTimer)
}

onConnOpen(){
if(this.hasLogger()) this.log("transport", `connected to ${this.endPointURL()}`)
if(this.hasLogger()) this.log("transport", `${this.transport.name} connected to ${this.endPointURL()}`)
this.closeWasClean = false
this.establishedConnections++
this.flushSendBuffer()
Expand Down
22 changes: 22 additions & 0 deletions assets/test/socket_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import sinon from "sinon"
import {WebSocket, Server as WebSocketServer} from "mock-socket"
import {encode} from "./serializer"
import {Socket, LongPoll} from "../js/phoenix"
import {
SOCKET_STATES
} from "../js/phoenix/constants"

let socket

Expand Down Expand Up @@ -72,6 +75,25 @@ describe("with transports", function(){
mockServer.stop(() => done())
})
})

describe("longPollFallbackMs", function(){
it("falls back to longpoll when set after primary transport failure", function(done){
let mockServer
socket = new Socket("/socket", {longPollFallbackMs: 20})
let replaceSpy = sinon.spy(socket, "replaceTransport")
mockServer = new WebSocketServer("wss://example.test/")
mockServer.stop(() => {
assert.equal(socket.transport, WebSocket)
socket.onError((reason) => {
setTimeout(() => {
assert(replaceSpy.calledWith(LongPoll))
done()
}, 100)
})
socket.connect()
})
})
})
})

describe("protocol", function(){
Expand Down
5 changes: 4 additions & 1 deletion installer/templates/phx_assets/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import "phoenix_html"
<%= @live_comment %>import topbar from "../vendor/topbar"

<%= @live_comment %>let csrfToken = document.querySelector("meta[name='csrf-token']").getAttribute("content")
<%= @live_comment %>let liveSocket = new LiveSocket("/live", Socket, {params: {_csrf_token: csrfToken}})
<%= @live_comment %>let liveSocket = new LiveSocket("/live", Socket, {
<%= @live_comment %> longPollFallbackMs: 2500,
<%= @live_comment %> params: {_csrf_token: csrfToken}
<%= @live_comment %>})

// Show progress bar on live navigation and form submits
<%= @live_comment %>topbar.config({barColors: {0: "#29d"}, shadowColor: "rgba(0, 0, 0, .3)"})
Expand Down
4 changes: 3 additions & 1 deletion installer/templates/phx_web/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ defmodule <%= @endpoint_module %> do
same_site: "Lax"
]

<%= if !(@dashboard || @live) do %><%= "# " %><% end %>socket "/live", Phoenix.LiveView.Socket, websocket: [connect_info: [session: @session_options]]
<%= if !(@dashboard || @live) do %><%= "# " %><% end %>socket "/live", Phoenix.LiveView.Socket,
<%= if !(@dashboard || @live) do %><%= "# " %><% end %> websocket: [connect_info: [session: @session_options]],
<%= if !(@dashboard || @live) do %><%= "# " %><% end %> longpoll: [connect_info: [session: @session_options]]

# Serve at "/" the static files from "priv/static" directory.
#
Expand Down

0 comments on commit 29a67e8

Please sign in to comment.