Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions features/latency2.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
Feature: Geo-based latency2 estimates

Background:
Given the "signaling" backend is running
And the "testproxy" backend is running

Scenario: latency2 is computed from requester to lobby peers
Given these lobbies exist:
| code | game | peers | public |
| 0qva9vyurwbbl | 123e4567-e89b-12d3-a456-426614174000 | {peerA,peerB} | true |
And these peers exist:
| peer | game | geo |
| peerA | 123e4567-e89b-12d3-a456-426614174000 | 10, 20 |
| peerB | 123e4567-e89b-12d3-a456-426614174000 | 30, 40 |
And "blue" is connected as "1u8fw4aph5ypt" with lat,lon as 50,60 and ready for game "123e4567-e89b-12d3-a456-426614174000"

When "blue" requests lobbies with:
"""json
{}
"""

Then "blue" should have received only these lobbies:
| code | latency2 |
| 0qva9vyurwbbl | 69 |


Scenario: latency2 is undefined when requester has no geo
Given these lobbies exist:
| code | game | peers | public |
| 0qva9vyurwbbl | 223e4567-e89b-12d3-a456-426614174000 | {peerC} | true |
And these peers exist:
| peer | game | geo |
| peerC | 223e4567-e89b-12d3-a456-426614174000 | 10, 10 |
And "green" is connected as "1u8fw4aph5ypt" and ready for game "223e4567-e89b-12d3-a456-426614174000"

When "green" requests lobbies with:
"""json
{}
"""

Then "green" should have received only these lobbies:
| code | latency2 |
| 0qva9vyurwbbl | undefined |
14 changes: 12 additions & 2 deletions features/support/steps/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,23 @@ After(async function (this: World) {
this.players.clear()
})

Given('{string} is connected as {string} and ready for game {string}', async function (this: World, playerName: string, peerID: string, gameID: string) {
const player = await this.createPlayer(playerName, gameID)
async function playerIsConnectedAndReadyForGame (this: World, playerName: string, peerID: string, gameID: string, lat?: number, lon?: number): Promise<void> {
const player = await this.createPlayer(playerName, gameID, lat, lon)
const event = await player.waitForEvent('ready')
if (event == null) {
throw new Error(`unable to add player ${playerName} to network`)
}
if (player.network.id !== peerID) {
throw new Error(`expected peer ID ${peerID} but got ${player.network.id}`)
}
}

Given('{string} is connected as {string} and ready for game {string}', async function (this: World, playerName: string, peerID: string, gameID: string) {
await playerIsConnectedAndReadyForGame.call(this, playerName, peerID, gameID)
})

Given('{string} is connected as {string} with lat,lon as {float},{float} and ready for game {string}', async function (this: World, playerName: string, peerID: string, lat: number, lon: number, gameID: string) {
await playerIsConnectedAndReadyForGame.call(this, playerName, peerID, gameID, lat, lon)
})

async function areJoinedInALobby (this: World, playerNamesRaw: string, publc: boolean): Promise<void> {
Expand Down Expand Up @@ -152,6 +160,8 @@ Given('these peers exist:', async function (this: World, peers: DataTable) {
v.push('NULL')
} else if (key === 'latency_vector') {
v.push(`ARRAY[${value.substring(1, value.length - 1)}]::vector(11)`)
} else if (key === 'geo') {
v.push(`ll_to_earth(${value})`)
} else {
v.push(`'${value}'`)
}
Expand Down
13 changes: 11 additions & 2 deletions features/support/world.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class World extends CucumberWorld {
}
}

public async createPlayer (playerName: string, gameID: string): Promise<Player> {
public async createPlayer (playerName: string, gameID: string, lat?: number, lon?: number): Promise<Player> {
return await new Promise((resolve) => {
const config: PeerConfiguration = {}
if (this.useTestProxy) {
Expand All @@ -58,7 +58,16 @@ export class World extends CucumberWorld {
config.testLatency = {
vector: this.latencyVector
}
const network = new Network(gameID, config, this.signalingURL)

let signalingURL = this.signalingURL
if (lat !== undefined && lon !== undefined && signalingURL !== undefined) {
const url = new URL(signalingURL)
url.searchParams.set('lat', lat.toString())
url.searchParams.set('lon', lon.toString())
signalingURL = url.toString()
}

const network = new Network(gameID, config, signalingURL)
const player = new Player(playerName, network)
this.players.set(playerName, player)

Expand Down
48 changes: 48 additions & 0 deletions internal/signaling/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package signaling
import (
"context"
"encoding/json"
"math"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -66,11 +68,31 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre
wg.Add(1)
defer wg.Done()

lat := parseLatLon(r.Header.Get("X-Geo-Lat"), -90, 90)
lon := parseLatLon(r.Header.Get("X-Geo-Lon"), -180, 180)
if lat == nil || lon == nil {
// Allow lat/lon to be passed as query parameters as a fallback.
// This is mainly for testing purposes, but can also be used
// in the `signalingURL` argument to `new Network()` when deploying
// in environments that can't set the headers.
// In production on Poki, Cloudflare will set the headers.
q := r.URL.Query()
if lat == nil {
lat = parseLatLon(q.Get("lat"), -90, 90)
}
if lon == nil {
lon = parseLatLon(q.Get("lon"), -180, 180)
}
}

peer := &Peer{
store: store,
conn: conn,

retrievedIDCallback: manager.Reconnected,

Lat: lat,
Lon: lon,
}
defer func() {
logger.Debug("peer websocket closed", zap.String("peer", peer.ID), zap.String("game", peer.Game), zap.String("origin", r.Header.Get("Origin")))
Expand Down Expand Up @@ -162,6 +184,15 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre
if err := json.Unmarshal(raw, &params); err != nil {
util.ErrorAndDisconnect(ctx, conn, err)
}

// Add lat/lon to event data of the avg-latency-at-10s event.
// We want to use this data to build a latency world map.
if params.Action == "avg-latency-at-10s" && params.Data != nil && peer != nil && peer.Lat != nil && peer.Lon != nil {
// Round to 2 decimal places to reduce precision for privacy reasons.
params.Data["lat"] = strconv.FormatFloat(*peer.Lat, 'f', 2, 64)
params.Data["lon"] = strconv.FormatFloat(*peer.Lon, 'f', 2, 64)
}

go metrics.RecordEvent(ctx, params)

case "ping", "pong":
Expand All @@ -179,3 +210,20 @@ func Handler(ctx context.Context, store stores.Store, cloudflare *cloudflare.Cre
}
})
}

func parseLatLon(value string, min, max float64) *float64 {
if value == "" {
return nil
}
v, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil
}
if math.IsNaN(v) || math.IsInf(v, 0) {
return nil
}
if v < min || v > max {
return nil
}
return &v
}
10 changes: 9 additions & 1 deletion internal/signaling/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Peer struct {
Game string
Lobby string
LatencyVector []float32
Lat *float64
Lon *float64
}

func (p *Peer) Send(ctx context.Context, packet any) error {
Expand Down Expand Up @@ -235,6 +237,12 @@ func (p *Peer) HandleHelloPacket(ctx context.Context, packet HelloPacket) error
}
}

if p.Lat != nil && p.Lon != nil {
if err := p.store.UpdatePeerGeo(ctx, p.ID, p.Lat, p.Lon); err != nil {
logger.Warn("failed to persist peer geolocation", zap.Error(err))
}
}

err := p.Send(ctx, WelcomePacket{
Type: "welcome",
ID: p.ID,
Expand Down Expand Up @@ -361,7 +369,7 @@ func (p *Peer) HandleListPacket(ctx context.Context, packet ListPacket) error {
if p.ID == "" {
return fmt.Errorf("peer not connected")
}
lobbies, err := p.store.ListLobbies(ctx, p.Game, p.LatencyVector, packet.Filter, packet.Sort, packet.Limit)
lobbies, err := p.store.ListLobbies(ctx, p.Game, p.LatencyVector, p.Lat, p.Lon, packet.Filter, packet.Sort, packet.Limit)
if err != nil {
return err
}
Expand Down
42 changes: 35 additions & 7 deletions internal/signaling/stores/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"regexp"
"slices"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -39,7 +40,7 @@ type PostgresStore struct {

func NewPostgresStore(ctx context.Context, db *pgxpool.Pool) (*PostgresStore, error) {
filterConverter, err := filter.NewConverter(
filter.WithNestedJSONB("custom_data", "code", "playerCount", "createdAt", "updatedAt", "latency"),
filter.WithNestedJSONB("custom_data", "code", "playerCount", "createdAt", "updatedAt", "latency", "latency2"),
filter.WithEmptyCondition("TRUE"), // No filter returns all lobbies.
)
if err != nil {
Expand Down Expand Up @@ -313,7 +314,7 @@ func (s *PostgresStore) GetLobby(ctx context.Context, game, lobbyCode string) (L
return lobby, nil
}

func (s *PostgresStore) ListLobbies(ctx context.Context, game string, latency []float32, filter, sort string, limit int) ([]Lobby, error) {
func (s *PostgresStore) ListLobbies(ctx context.Context, game string, latency []float32, lat, lon *float64, filter, sort string, limit int) ([]Lobby, error) {
// TODO: Remove this.
if filter == "" {
filter = "{}"
Expand All @@ -328,7 +329,7 @@ func (s *PostgresStore) ListLobbies(ctx context.Context, game string, latency []
latencyVector = pgvector.NewVector(latency)
}

preValues := []any{game, latencyVector, limit}
preValues := []any{game, latencyVector, lat, lon, limit}

where, values, err := s.filterConverter.Convert([]byte(filter), len(preValues)+1)
if err != nil {
Expand Down Expand Up @@ -374,9 +375,18 @@ func (s *PostgresStore) ListLobbies(ctx context.Context, game string, latency []
SELECT p.latency_vector
FROM peers p
WHERE p.peer = ANY (lobbies.peers)
AND p.latency_vector IS NOT NULL
AND p.latency_vector IS NOT NULL
)
) AS latency
) AS latency,
CASE
WHEN $3::double precision IS NULL OR $4::double precision IS NULL THEN NULL
ELSE (
SELECT ROUND(AVG(earth_distance(ll_to_earth($3::double precision, $4::double precision), p.geo) / 1000.0 * 0.015 + 5.0))
FROM peers p
WHERE p.peer = ANY (lobbies.peers)
AND p.geo IS NOT NULL
)
END AS latency2
FROM lobbies
WHERE game = $1
AND public = true
Expand All @@ -385,7 +395,7 @@ func (s *PostgresStore) ListLobbies(ctx context.Context, game string, latency []
FROM game_lobbies
WHERE `+where+`
ORDER BY `+order+`
LIMIT $3
LIMIT $`+strconv.Itoa(len(preValues))+`
`, append(preValues, values...)...)
if err != nil {
return nil, err
Expand All @@ -394,7 +404,7 @@ func (s *PostgresStore) ListLobbies(ctx context.Context, game string, latency []

for rows.Next() {
var lobby Lobby
err = rows.Scan(&lobby.Code, &lobby.PlayerCount, &lobby.Public, &lobby.CustomData, &lobby.CreatedAt, &lobby.UpdatedAt, &lobby.Leader, &lobby.Term, &lobby.CanUpdateBy, &lobby.Creator, &lobby.HasPassword, &lobby.MaxPlayers, &lobby.Latency)
err = rows.Scan(&lobby.Code, &lobby.PlayerCount, &lobby.Public, &lobby.CustomData, &lobby.CreatedAt, &lobby.UpdatedAt, &lobby.Leader, &lobby.Term, &lobby.CanUpdateBy, &lobby.Creator, &lobby.HasPassword, &lobby.MaxPlayers, &lobby.Latency, &lobby.Latency2)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -447,6 +457,24 @@ func (s *PostgresStore) UpdatePeerLatency(ctx context.Context, peerID string, la
return nil
}

func (s *PostgresStore) UpdatePeerGeo(ctx context.Context, peerID string, lat, lon *float64) error {
now := util.NowUTC(ctx)

_, err := s.DB.Exec(ctx, `
UPDATE peers
SET
lat = $1,
lon = $2,
geo = CASE
WHEN $1 IS NOT NULL AND $2 IS NOT NULL THEN ll_to_earth($1, $2)
ELSE NULL
END,
updated_at = $3
WHERE peer = $4
`, lat, lon, now, peerID)
return err
}

func (s *PostgresStore) MarkPeerAsActive(ctx context.Context, peerID string) error {
now := util.NowUTC(ctx)

Expand Down
8 changes: 5 additions & 3 deletions internal/signaling/stores/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ type Store interface {
JoinLobby(ctx context.Context, game, lobby, id, password string) error
LeaveLobby(ctx context.Context, game, lobby, id string) error
GetLobby(ctx context.Context, game, lobby string) (Lobby, error)
ListLobbies(ctx context.Context, game string, latency []float32, filter, sort string, limit int) ([]Lobby, error)
ListLobbies(ctx context.Context, game string, latency []float32, lat, lon *float64, filter, sort string, limit int) ([]Lobby, error)

Subscribe(ctx context.Context, callback SubscriptionCallback, game, lobby, peerID string)
Publish(ctx context.Context, topic string, data []byte) error

CreatePeer(ctx context.Context, peerID, secret, gameID string) error
UpdatePeerLatency(ctx context.Context, peerID string, latency []float32) error
UpdatePeerGeo(ctx context.Context, peerID string, lat, lon *float64) error
MarkPeerAsActive(ctx context.Context, peerID string) error
MarkPeerAsDisconnected(ctx context.Context, peerID string) error
MarkPeerAsReconnected(ctx context.Context, peerID, secret, gameID string) (bool, []string, error)
Expand All @@ -60,7 +61,7 @@ const (

type Lobby struct {
Code string `json:"code"`
Peers []string `json:"peers"`
Peers []string `json:"peers,omitempty"`
PlayerCount int `json:"playerCount"`
Creator string `json:"creator"`

Expand All @@ -73,7 +74,8 @@ type Lobby struct {
Leader string `json:"leader,omitempty"`
Term int `json:"term"`

Latency *float32 `json:"latency,omitempty"`
Latency *float32 `json:"latency,omitempty"`
Latency2 *float32 `json:"latency2,omitempty"`

CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
Expand Down
1 change: 1 addition & 0 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export interface LobbyListEntry {
createdAt: string
updatedAt: string
latency?: number
latency2?: number
}

interface Base {
Expand Down
6 changes: 6 additions & 0 deletions migrations/1764692636_latency2_geo.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
BEGIN;

ALTER TABLE "peers"
DROP COLUMN IF EXISTS "geo";

COMMIT;
10 changes: 10 additions & 0 deletions migrations/1764692636_latency2_geo.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
BEGIN;

-- Required for 'earth' data type and distance calculations used in lobby listing query.
CREATE EXTENSION IF NOT EXISTS cube;
CREATE EXTENSION IF NOT EXISTS earthdistance;

ALTER TABLE "peers"
ADD COLUMN IF NOT EXISTS "geo" earth;

COMMIT;
2 changes: 1 addition & 1 deletion migrations/latest.lock
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1758799200_latency_vector
1764692636_latency2_geo