Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/OCAP2/web
go 1.26.0

require (
github.com/OCAP2/extension/v5 v5.0.0-alpha.1.0.20260216221044-4932fc4f0a04
github.com/gorilla/websocket v1.5.3
github.com/labstack/echo/v4 v4.15.0
github.com/mattn/go-sqlite3 v1.14.34
Expand Down Expand Up @@ -31,9 +32,9 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.46.0 // indirect
golang.org/x/net v0.48.0 // indirect
golang.org/x/sys v0.39.0 // indirect
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.34.0 // indirect
golang.org/x/time v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/OCAP2/extension/v5 v5.0.0-alpha.1.0.20260216221044-4932fc4f0a04 h1:D+cP4mCigM7dCgPwcS+YcJPbE/tH2BqB6+85N9WbcIs=
github.com/OCAP2/extension/v5 v5.0.0-alpha.1.0.20260216221044-4932fc4f0a04/go.mod h1:vhxiM92vYBjJ/J6zrwbYDiaPrUNwMRyWZ05duCGjHXg=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
Expand Down Expand Up @@ -52,17 +54,17 @@ github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQ
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU=
golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0=
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
golang.org/x/image v0.36.0 h1:Iknbfm1afbgtwPTmHnS2gTM/6PPZfH+z2EFuOkSbqwc=
golang.org/x/image v0.36.0/go.mod h1:YsWD2TyyGKiIX1kZlu9QfKIsQ4nAAK9bdgdrIsE7xy4=
golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU=
golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk=
golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
Expand Down
173 changes: 173 additions & 0 deletions internal/ingestion/chunk_flusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package ingestion

import (
"fmt"
"os"
"path/filepath"
"sort"

"google.golang.org/protobuf/proto"

pbv2 "github.com/OCAP2/web/pkg/schemas/protobuf/v2"
)

// ChunkFlusher writes v2 protobuf chunks incrementally during streaming.
// Every chunkSize frames, accumulated states are flushed to disk.
// All methods are called from a single goroutine (the WebSocket read loop).
type ChunkFlusher struct {
chunksDir string
chunkSize uint32
currentChunkIdx uint32
chunkStartFrame uint32

// Buffered states keyed by frame number.
soldierStates map[uint32][]*pbv2.SoldierState
vehicleStates map[uint32][]*pbv2.VehicleState

flushedChunks uint32
}

// NewChunkFlusher creates a flusher that writes chunks of chunkSize frames
// to the given output directory.
func NewChunkFlusher(outputDir string, chunkSize uint32) (*ChunkFlusher, error) {
chunksDir := filepath.Join(outputDir, "chunks")
if err := os.MkdirAll(chunksDir, 0755); err != nil {
return nil, fmt.Errorf("create chunks dir: %w", err)
}
return &ChunkFlusher{
chunksDir: chunksDir,
chunkSize: chunkSize,
soldierStates: make(map[uint32][]*pbv2.SoldierState),
vehicleStates: make(map[uint32][]*pbv2.VehicleState),
}, nil
}

// AddSoldierState buffers a soldier state and flushes if a chunk boundary is crossed.
func (cf *ChunkFlusher) AddSoldierState(frameNum uint32, state *pbv2.SoldierState) error {
cf.soldierStates[frameNum] = append(cf.soldierStates[frameNum], state)
return cf.maybeFlush(frameNum)
}

// AddVehicleState buffers a vehicle state and flushes if a chunk boundary is crossed.
func (cf *ChunkFlusher) AddVehicleState(frameNum uint32, state *pbv2.VehicleState) error {
cf.vehicleStates[frameNum] = append(cf.vehicleStates[frameNum], state)
return cf.maybeFlush(frameNum)
}

// Flush writes any remaining buffered frames as the final chunk.
func (cf *ChunkFlusher) Flush() error {
if len(cf.soldierStates) == 0 && len(cf.vehicleStates) == 0 {
return nil
}
return cf.writeCurrentChunk()
}

// ChunkCount returns the total number of chunks written (including pending).
func (cf *ChunkFlusher) ChunkCount() uint32 {
return cf.flushedChunks
}

func (cf *ChunkFlusher) maybeFlush(frameNum uint32) error {
// Check if this frame crosses the next chunk boundary.
chunkEnd := cf.chunkStartFrame + cf.chunkSize
if frameNum >= chunkEnd {
return cf.writeCurrentChunk()
}
return nil
}

func (cf *ChunkFlusher) writeCurrentChunk() error {
// Collect all frame numbers in this chunk.
frameSet := make(map[uint32]bool)
for f := range cf.soldierStates {
frameSet[f] = true
}
for f := range cf.vehicleStates {
frameSet[f] = true
}

if len(frameSet) == 0 {
return nil
}

// Sort frame numbers.
frames := make([]uint32, 0, len(frameSet))
for f := range frameSet {
frames = append(frames, f)
}
sort.Slice(frames, func(i, j int) bool { return frames[i] < frames[j] })

// Determine which frames belong to the current chunk vs next.
chunkEnd := cf.chunkStartFrame + cf.chunkSize
var currentFrames, nextFrames []uint32
for _, f := range frames {
if f < chunkEnd {
currentFrames = append(currentFrames, f)
} else {
nextFrames = append(nextFrames, f)
}
}

// Build and write the current chunk from currentFrames.
if len(currentFrames) > 0 {
chunk := cf.buildChunk(cf.currentChunkIdx, cf.chunkStartFrame, currentFrames)
if err := cf.writeChunkFile(chunk); err != nil {
return err
}

// Remove flushed frames from buffers.
for _, f := range currentFrames {
delete(cf.soldierStates, f)
delete(cf.vehicleStates, f)
}

cf.flushedChunks++
cf.currentChunkIdx++
cf.chunkStartFrame = chunkEnd
}

// If we have frames that spill into the next chunk, check again.
// (Typically only one chunk boundary is crossed per state message.)
if len(nextFrames) > 0 {
// The next frames are already in the buffer, check if they cross another boundary.
maxNext := nextFrames[len(nextFrames)-1]
if maxNext >= cf.chunkStartFrame+cf.chunkSize {
return cf.writeCurrentChunk()
}
}

return nil
}

func (cf *ChunkFlusher) buildChunk(idx, startFrame uint32, frameNums []uint32) *pbv2.Chunk {
chunk := &pbv2.Chunk{
Index: idx,
StartFrame: startFrame,
FrameCount: cf.chunkSize,
}

for _, fn := range frameNums {
frame := &pbv2.Frame{
FrameNum: fn,
Soldiers: cf.soldierStates[fn],
Vehicles: cf.vehicleStates[fn],
}
chunk.Frames = append(chunk.Frames, frame)
}

return chunk
}

func (cf *ChunkFlusher) writeChunkFile(chunk *pbv2.Chunk) error {
data, err := proto.Marshal(chunk)
if err != nil {
return fmt.Errorf("marshal chunk %d: %w", chunk.Index, err)
}

path := filepath.Join(cf.chunksDir, fmt.Sprintf("%04d.pb", chunk.Index))
if err := os.WriteFile(path, data, 0644); err != nil {
return fmt.Errorf("write chunk %d: %w", chunk.Index, err)
}

return nil
}
120 changes: 120 additions & 0 deletions internal/ingestion/chunk_flusher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package ingestion

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

pbv2 "github.com/OCAP2/web/pkg/schemas/protobuf/v2"
)

func TestChunkFlusher_FlushesAtBoundary(t *testing.T) {
dir := t.TempDir()
cf, err := NewChunkFlusher(dir, 3) // chunk size = 3 frames
require.NoError(t, err)

// Add states for frames 0, 1, 2 (fills chunk 0).
for frame := uint32(0); frame < 3; frame++ {
err := cf.AddSoldierState(frame, &pbv2.SoldierState{
Id: 1,
Bearing: frame * 10,
Position: &pbv2.Position3D{X: float32(frame), Y: 0, Z: 0},
})
require.NoError(t, err)
}

// Frame 3 crosses boundary → chunk 0 should be flushed.
err = cf.AddSoldierState(3, &pbv2.SoldierState{Id: 1, Bearing: 30})
require.NoError(t, err)
assert.Equal(t, uint32(1), cf.ChunkCount())

// Verify chunk 0 file exists and is valid.
chunkPath := filepath.Join(dir, "chunks", "0000.pb")
data, err := os.ReadFile(chunkPath)
require.NoError(t, err)

var chunk pbv2.Chunk
require.NoError(t, proto.Unmarshal(data, &chunk))
assert.Equal(t, uint32(0), chunk.Index)
assert.Equal(t, uint32(0), chunk.StartFrame)
assert.Equal(t, uint32(3), chunk.FrameCount)
assert.Len(t, chunk.Frames, 3)

// Flush remaining (frame 3).
require.NoError(t, cf.Flush())
assert.Equal(t, uint32(2), cf.ChunkCount())

chunkPath1 := filepath.Join(dir, "chunks", "0001.pb")
data1, err := os.ReadFile(chunkPath1)
require.NoError(t, err)

var chunk1 pbv2.Chunk
require.NoError(t, proto.Unmarshal(data1, &chunk1))
assert.Equal(t, uint32(1), chunk1.Index)
assert.Len(t, chunk1.Frames, 1)
}

func TestChunkFlusher_VehicleStates(t *testing.T) {
dir := t.TempDir()
cf, err := NewChunkFlusher(dir, 5)
require.NoError(t, err)

for frame := uint32(0); frame < 4; frame++ {
err := cf.AddVehicleState(frame, &pbv2.VehicleState{
Id: 10,
Fuel: 0.8,
})
require.NoError(t, err)
}
assert.Equal(t, uint32(0), cf.ChunkCount()) // Not yet flushed.

require.NoError(t, cf.Flush())
assert.Equal(t, uint32(1), cf.ChunkCount())
}

func TestChunkFlusher_EmptyFlush(t *testing.T) {
dir := t.TempDir()
cf, err := NewChunkFlusher(dir, 10)
require.NoError(t, err)

// Flushing with no data should succeed silently.
require.NoError(t, cf.Flush())
assert.Equal(t, uint32(0), cf.ChunkCount())
}

func TestChunkFlusher_MixedSoldierVehicle(t *testing.T) {
dir := t.TempDir()
cf, err := NewChunkFlusher(dir, 2)
require.NoError(t, err)

// Frame 0: soldier + vehicle.
require.NoError(t, cf.AddSoldierState(0, &pbv2.SoldierState{Id: 1}))
require.NoError(t, cf.AddVehicleState(0, &pbv2.VehicleState{Id: 10}))

// Frame 1: only soldier.
require.NoError(t, cf.AddSoldierState(1, &pbv2.SoldierState{Id: 1}))

// Frame 2 crosses boundary.
require.NoError(t, cf.AddSoldierState(2, &pbv2.SoldierState{Id: 1}))
assert.Equal(t, uint32(1), cf.ChunkCount())

// Verify chunk 0 has both soldiers and vehicles.
data, err := os.ReadFile(filepath.Join(dir, "chunks", "0000.pb"))
require.NoError(t, err)

var chunk pbv2.Chunk
require.NoError(t, proto.Unmarshal(data, &chunk))
assert.Len(t, chunk.Frames, 2) // frames 0 and 1

// Frame 0 has both soldier and vehicle.
assert.Len(t, chunk.Frames[0].Soldiers, 1)
assert.Len(t, chunk.Frames[0].Vehicles, 1)

// Frame 1 has only soldier.
assert.Len(t, chunk.Frames[1].Soldiers, 1)
assert.Len(t, chunk.Frames[1].Vehicles, 0)
}
Loading
Loading