Skip to content

Commit

Permalink
Implement next track and player syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepyfran committed Sep 9, 2024
1 parent a894bdf commit 72f8409
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 39 deletions.
3 changes: 3 additions & 0 deletions .eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ module.exports = {
},
},
rules: {
"@typescript-eslint/no-empty-object-type": {
allowObjectTypes: true,
},
"@typescript-eslint/no-unused-vars": [
"error",
{
Expand Down
5 changes: 5 additions & 0 deletions packages/core/types/src/services/media-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ export type MediaPlayer = FileBasedMediaPlayer & {
*/
readonly id: MediaPlayerId;

/**
* Pauses the current track.
*/
pause: Effect<void>;

/**
* Returns a stream that emits events from the media player.
*/
Expand Down
3 changes: 3 additions & 0 deletions packages/infrastructure/html-audio-media-player/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const make = Effect.succeed(
catch: () => new PlayNotFoundError(),
});
}),
pause: Effect.sync(() => {
audioElement.pause();
}),
observe: Stream.async((emit) => {
// TODO: Keep track in the state? If something, it can be done via a ref.
audioElement.onplay = () => emit.single("trackPlaying");
Expand Down
155 changes: 116 additions & 39 deletions packages/services/player/src/player.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import {
type Track,
} from "@echo/core-types";
import {
Data,
Effect,
Layer,
Match,
Option,
Queue,
Ref,
Stream,
SubscriptionRef,
Expand All @@ -22,37 +24,126 @@ import {
CurrentlyActivePlayerRef,
PlayerStateRef,
type ICurrentlyActivePlayerRef,
type IPlayerStateRef,
} from "./state";

/**
* Internal commands that can be sent to the player to trigger actions. Mostly
* used to synchronize the player state with the media player's events.
*/
type PlayerCommand =
| { _tag: "NextTrack" }
| { _tag: "SyncPlayerState"; withMediaPlayer: MediaPlayer }
| { _tag: "UpdateState"; updateFn: (state: PlayerState) => PlayerState };

const { NextTrack, UpdateState, SyncPlayerState } =
Data.taggedEnum<PlayerCommand>();

/**
* Error thrown when the player was asked to play a track but the list of tracks
* was empty.
*/
class NoMoreTracksAvailable extends Data.TaggedError(
"NoMoreTracksAvailable",
)<{}> {}

const makePlayer = Effect.gen(function* () {
const state = yield* PlayerStateRef;
const activeMediaPlayer = yield* CurrentlyActivePlayerRef;
const providerCache = yield* ActiveMediaProviderCache;

const commandQueue = yield* Queue.sliding<PlayerCommand>(10);
yield* consumeCommandsInBackground(commandQueue);

return Player.of({
playAlbum: (album) =>
Effect.gen(function* () {
const [track, ...restOfTracks] = album.tracks;
if (!track) {
yield* Effect.logError(
playTracks(album.tracks, providerCache, commandQueue).pipe(
Effect.catchTag("NoMoreTracksAvailable", () =>
Effect.logError(
`Attempted to play album ${album.name}, but it has no tracks.`,
);
return;
}

const { provider, player } = yield* resolveDependenciesForTrack(
providerCache,
track,
);
yield* syncPlayerState(player, activeMediaPlayer, state);
yield* playTrack(provider, player, track);
yield* Ref.update(state, toPlayingState(track, restOfTracks));
}),
),
),
),
observe: state,
});
});

/**
* Consumes the player commands in the background, triggering the appropriate
* actions based on the command type.
*/
const consumeCommandsInBackground = (
commandQueue: Queue.Queue<PlayerCommand>,
) =>
Effect.forkScoped(
Stream.fromQueue(commandQueue).pipe(
Stream.tap((command) =>
Match.value(command).pipe(
Match.tag("NextTrack", () =>
Effect.gen(function* () {
const state = yield* PlayerStateRef;
const providerCache = yield* ActiveMediaProviderCache;

const { comingUpTracks } = yield* Ref.get(state);
yield* playTracks(
comingUpTracks,
providerCache,
commandQueue,
).pipe(
Effect.catchTag("NoMoreTracksAvailable", () =>
Effect.logWarning("There are no more tracks to play."),
),
);
}),
),
Match.tag("UpdateState", ({ updateFn }) =>
Effect.gen(function* () {
const state = yield* PlayerStateRef;
yield* Ref.update(state, updateFn);
}),
),
Match.tag("SyncPlayerState", ({ withMediaPlayer }) =>
Effect.gen(function* () {
const activeMediaPlayer = yield* CurrentlyActivePlayerRef;
yield* syncPlayerState(
withMediaPlayer,
activeMediaPlayer,
commandQueue,
);
}),
),
Match.exhaustive,
),
),
Stream.runDrain,
),
);

/**
* Given a list of tracks, a provider cache and a command queue, attempts to play
* the first track in the list and updates the player state accordingly.
*/
const playTracks = (
tracks: Track[],
providerCache: IActiveMediaProviderCache,
commandQueue: Queue.Enqueue<PlayerCommand>,
) =>
Effect.gen(function* () {
const [nextTrack, ...restOfTracks] = tracks;
if (!nextTrack) {
return yield* Effect.fail(new NoMoreTracksAvailable());
}

const { provider, player } = yield* resolveDependenciesForTrack(
providerCache,
nextTrack,
);

yield* commandQueue.offer(SyncPlayerState({ withMediaPlayer: player }));
yield* playTrack(provider, player, nextTrack);
yield* commandQueue.offer(
UpdateState({ updateFn: toPlayingState(nextTrack, restOfTracks) }),
);
});

/**
* Attempts to retrieve the provider assigned by its resource for the given
* track. If the provider is not active, logs an error and fails the effect.
Expand Down Expand Up @@ -81,35 +172,21 @@ const resolveDependenciesForTrack = (
const syncPlayerState = (
mediaPlayer: MediaPlayer,
activeMediaPlayer: ICurrentlyActivePlayerRef,
playerState: IPlayerStateRef,
commandQueue: Queue.Enqueue<PlayerCommand>,
) =>
Effect.gen(function* () {
yield* overrideActivePlayer(mediaPlayer, activeMediaPlayer);

yield* Effect.log(`Starting to observe player ${mediaPlayer.id}.`);

// TODO: Do not use daemon, we need to scope this to the player's lifecycle.
yield* Effect.forkDaemon(
mediaPlayer.observe.pipe(
Stream.tap((event) =>
Match.value(event).pipe(
Match.when("trackPlaying", () =>
Ref.update(playerState, (currentState) => ({
...currentState,
status: "playing" as const,
})),
),
Match.when("trackEnded", () =>
Ref.update(playerState, (currentState) => ({
...currentState,
status: "stopped" as const,
})),
),
Match.when("trackPaused", () =>
Ref.update(playerState, (currentState) => ({
...currentState,
status: "paused" as const,
})),
),
Match.when("trackPlaying", () => Effect.void),
Match.when("trackEnded", () => commandQueue.offer(NextTrack())),
Match.when("trackPaused", () => Effect.void),
Match.exhaustive,
),
),
Expand Down Expand Up @@ -180,7 +257,7 @@ const playTrack = (
*/
const toPlayingState =
(currentTrack: Track, comingUpTracks: Track[]) =>
(currentState: PlayerState) =>
(currentState: PlayerState): PlayerState =>
({
...currentState,
status: "playing" as const,
Expand All @@ -194,7 +271,7 @@ const toPlayingState =
comingUpTracks,
}) satisfies PlayerState;

const PlayerLiveWithState = Layer.effect(Player, makePlayer);
const PlayerLiveWithState = Layer.scoped(Player, makePlayer);

const PlayerStateLive = Layer.effect(
PlayerStateRef,
Expand Down

0 comments on commit 72f8409

Please sign in to comment.