Skip to content

poc: SWITCH relay implementation (moq-transport PR #1378)#287

Draft
gwendalsimon wants to merge 21 commits into
openmoq:mainfrom
gwendalsimon:feat/switch-poc
Draft

poc: SWITCH relay implementation (moq-transport PR #1378)#287
gwendalsimon wants to merge 21 commits into
openmoq:mainfrom
gwendalsimon:feat/switch-poc

Conversation

@gwendalsimon
Copy link
Copy Markdown

@gwendalsimon gwendalsimon commented May 5, 2026

  • Add MoqxSession: thin MoQRelaySession subclass that dispatches onSwitch() to MoqxRelay and exposes publishForSwitch() + writeCatchupToHandle() — both require protected MoQSession access
  • Wire MoqxRelayServer::createSession() to produce MoqxSession instances and call setRelay() immediately after construction
  • Add SwitchTypes.h: SwitchTransition struct + kSwitchTransitionParamKey
  • Add GroupStartObserver.h: lightweight TrackConsumer that fires a callback on each beginSubgroup() for G_switch detection
  • Add SwitchHandler: full relay-side state machine (VALIDATE → SUBSCRIBE_TARGET → OBSERVE_GSWITCH → CUT_OLD → CATCHUP+LIVE) including pre-population of availableTarget from forwarder.largest(), inline FETCH catch-up via writeCatchupToHandle(), and Phase 2 drain loop
  • Add MoqxRelay::handleSwitch(), getForwarder(), getOrSubscribeForwarder(), getSubscriptionHandle(), cache() to complete the relay glue
  • Add moqx_switch_handler_test placeholder target to test/CMakeLists.txt
  • Build not yet verified — requires cmake 3.25+ and system deps: run: sudo deps/moxygen/standalone/install-system-deps.sh

This change is Reviewable

* Add MoqxSession: thin MoQRelaySession subclass that dispatches
  onSwitch() to MoqxRelay and exposes publishForSwitch() +
  writeCatchupToHandle() — both require protected MoQSession access
* Wire MoqxRelayServer::createSession() to produce MoqxSession
  instances and call setRelay() immediately after construction
* Add SwitchTypes.h: SwitchTransition struct + kSwitchTransitionParamKey
* Add GroupStartObserver.h: lightweight TrackConsumer that fires a
  callback on each beginSubgroup() for G_switch detection
* Add SwitchHandler: full relay-side state machine
  (VALIDATE → SUBSCRIBE_TARGET → OBSERVE_GSWITCH → CUT_OLD → CATCHUP+LIVE)
  including pre-population of availableTarget from forwarder.largest(),
  inline FETCH catch-up via writeCatchupToHandle(), and Phase 2 drain loop
* Add MoqxRelay::handleSwitch(), getForwarder(), getOrSubscribeForwarder(),
  getSubscriptionHandle(), cache() to complete the relay glue
* Add moqx_switch_handler_test placeholder target to test/CMakeLists.txt
* Build not yet verified — requires cmake 3.25+ and system deps:
  run: sudo deps/moxygen/standalone/install-system-deps.sh

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@gwendalsimon
Copy link
Copy Markdown
Author

gwendalsimon commented May 5, 2026

The moxygen changes are now browsable at: https://github.com/gwendalsimon/moxygen/tree/feat/switch-poc

The moqx implementation is self-contained in this branch. The moxygen submodule carries local-only changes that would need to land in moxygen before this can be merged. Here is what those changes are:

  1. MoQTypes.h — SWITCH frame type (0x1B per moq-transport PR #1378), Switch struct, and 4 new PublishDoneStatusCode values (DOES_NOT_EXIST, TIMEOUT, NOT_SUPPORTED, EXCESSIVE_LOAD)
  2. MoQFramer.h/cpp — parseSwitch() to decode the SWITCH wire format
  3. MoQCodec.h/cpp — onSwitch() virtual callback in ControlCallback, SWITCH allowed on the control stream, dispatch in
    parseFrame()
  4. MoQSession.h/cpp — two additions:
    - getBidiStreamConfig() made virtual + a PUBLISH case so relay-initiated PUBLISH bidi streams are accepted by
    moxygen-based subscribers without PROTOCOL_VIOLATION
    - registerPublishConsumerForSwitch() protected factory — registers TrackPublisherImpl in pubTracks_ and
    pendingRequests_ before the PUBLISH frame hits the wire (needed to avoid a PUBLISH_OK race)
  5. relay/MoQCache.h — getObject() public accessor for catch-up delivery from the cache

@gwendalsimon gwendalsimon marked this pull request as draft May 5, 2026 08:28
gwendalsimon and others added 18 commits May 5, 2026 12:01
Picks up fix(switch): assign SWITCH = 0x1F per moq-transport PR #1378

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* test_ports.sh: reserve ports 19680 (relay) and 19681 (admin) for
  the switch integration test
* test_switch.sh: run two scenarios against a live relay instance —
  clean SWITCH (no lag) and SWITCH under 2s artificial subscriber lag;
  verify pass:true and catchup_groups>=15 for the lag scenario

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Follows the same pattern as relay_chain: bash + TARGET_FILE:moqx +
TIMEOUT 120 + MOQBIN environment variable pointing at the moxygen
install tree.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* Add writeSwitch()/sendSwitch() for subscriber-side SWITCH framing
* Add moqswitchpub and moqswitchsub integration test binaries
* Fix decodeSwitchTransition optional guards and publishPromise safety

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
createSession() runs before service routing — context_ has no single
relay field. Move setRelay() into MoqxRelayContext::validateAuthority()
where the matched service entry is known; cast to MoqxSession there.
The folly experimental/coro namespace was promoted; the old include
paths no longer exist in the installed headers.
* encodeQuicInteger callback: push(b) -> writeBE(b) (QueueAppender API)
* writeStreamData: add missing nullptr third arg (callback parameter)
* ObjectHeader: replace designated initializers with positional ctor
…tObserver tests

* SwitchTypesTest: verify kSwitchTransitionParamKey value and
  SwitchTransition field layout.
* GroupStartObserverTest: callback fires for each beginSubgroup call,
  all no-op overrides return expected values without crashing.
* Update relay config to current moqx YAML schema (name, udp socket,
  tls.insecure, endpoint, services with cache, admin.plaintext).

* Set max_groups_per_track: 25 so the relay retains enough "low" groups
  during the 2-second lag window (~20 groups at 100ms/group).

* Replace fixed sleep(1) publisher wait with metrics-poll loop checking
  moqActiveSessions ≥ 1; add full error diagnostics (stderr, relay log)
  when subscriber produces no result line.

* Update .gitmodules to HTTPS remote URL for submodule.

* Bump deps/moxygen to 7c622cae (low-track priming + forward-guard fix).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* writeCatchup() (renamed from writeCatchupToHandle) opens a relay-
  initiated unidirectional stream via createUniStream(), sets urgency=1,
  writes FETCH_HEADER + objects [G_switch, Live Edge), then FINs.
  The PUBLISH bidi write side is now reserved for PUBLISH_DONE only.
* SwitchPublishResult drops writeHandle — relay opens the FETCH stream
  itself inside writeCatchup(), no handle needs to pass through.
* SwitchHandler updated to call writeCatchup() without a write handle.
* Bump deps/moxygen to f2ca8196 (SwitchFetchConsumerAdapter + session
  layer routing for relay-initiated FETCH streams).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ppress nodiscard

The catch-up FETCH stream is now written by writeCatchup() directly
(on a dedicated uni stream), so publishForSwitch() no longer needs the
currentSubscribeRequestID to embed a FETCH_HEADER.  Remove the unused
parameter from the declaration, definition, and both call-sites in
SwitchHandler.

Also suppress [[nodiscard]] warnings on the two encodeQuicInteger calls
with (void) casts.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
CLAUDE.md holds project-specific Claude Code instructions that are
local to the developer's machine and must not be checked in.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* SwitchTransitionParamEncoding: verifies (42, 63) encodes to exact
  bytes 0x2A 0x3F (single-byte QUIC VARINTs).
* SwitchTransitionParamRoundTrip: round-trips (1000, 200000) across
  different VARINT widths (2-byte and 4-byte), verifying values survive.

Helper functions in anonymous namespace mirror the encoding used by
publishForSwitch() so the test logic is not coupled to implementation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ispatch for testability

* SwitchAlgorithm.h: pure free function findGswitch() extracted from
  SwitchHandler::run() for isolated unit testing.
* SwitchHandler.cpp: replace inline 14-line loop with findGswitch() call.
* MoqxSession.h: make publishForSwitch() and writeCatchup() virtual so
  FakeMoqxSession can override them without touching the network.
* MoqxRelay.h: add getForwarderByName() test accessor (same pattern as
  existing findPublishState()).
* test/CMakeLists.txt: wire moqx_switch_handler_test to moqx_test_main,
  moqx_test_utils and moxygen_events_moq_folly_executor_impl.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
CI environments without SSH agent configured cannot clone the submodule
via git@github.com. HTTPS works without credentials for public repos.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
gwendalsimon and others added 2 commits May 18, 2026 15:20
GswitchFoundDeferred_PublishForSwitchCalledCorrectly exercises the async
branch of SwitchHandler::run() where the GroupStartObserver registers
and the coroutine suspends on co_await gswitchFound:

* Target forwarder starts with no largest → tryFindGswitch() returns early.
* collectAll runs handlerTask first (suspends at baton), then injectTask
  which delivers group 5 via beginSubgroup().
* Forwarder updates largest_ then calls observer→tryFindGswitch()→post().
* Handler resumes; verifies publishForSwitch(gswitch=5, liveEdge=5).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* CutOld_ResetsOpenSubgroupsAndDrainsSubscription: verifies subgroup
  reset(CANCELLED) for groups >= gswitch and publishDone(SUBSCRIPTION_ENDED)
  on current consumer after successful publishForSwitch.
* Phase2DrainLoop_DeliversCachedObjectsToNewConsumer: verifies cached
  objects in [liveEdge, currentEdge) are delivered via beginSubgroup +
  object on the new consumer returned by publishForSwitch.
* Add afterPublishForSwitchHook to FakeMoqxSession so tests can advance
  forwarder state between publishForSwitch and Phase 2 execution.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant