Skip to content

Commit

Permalink
Automatically fix time jitter when using internet NTP server
Browse files Browse the repository at this point in the history
  • Loading branch information
bclswl0827 committed Aug 8, 2024
1 parent 732d9df commit ac50096
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 61 deletions.
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,24 @@

Starting from v2.2.5, all notable changes to this project will be documented in this file.

## v3.0.2

### New Features

- Added ability to automatically fix time jitter when using internet NTP server as time source.

### Bug Fixes

- Fixed "insufficient arguments" error when using PostgreSQL as the database backend (see https://github.com/go-gorm/gorm/issues/6832#issuecomment-1946211186).
- Never check for sample rate consistency in MiniSEED and SAC records when in legacy mode.
- Send history buffer only if client requests in WebSocket API to avoid flooding the client.
- Lowered `MINISEED_ALLOWED_JITTER_MS` constant to 2 ms for better jitter tolerance.

## v3.0.1

### Bug Fixes

- Fixed the issue where MiniSEED recording in legacy mode would be interrupted due to sampling rate jitter
- Fixed the issue where MiniSEED recording in legacy mode would be interrupted due to sampling rate jitter.

## v3.0.0

Expand Down
21 changes: 9 additions & 12 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
FROM golang:alpine AS builder

RUN apk update && apk add --no-cache git bash wget curl make npm
WORKDIR /build
RUN git clone --progress https://github.com/anyshake/observer.git ./observer && \
export VERSION=`cat ./observer/VERSION` && \
cd ./observer/frontend/src && \
npm install && \
make && \
cd ../../docs && \
make && \
cd ../cmd && \
go mod tidy && \
RUN apk update && apk add --no-cache git bash wget curl make npm && \
mkdir -p /build && cd /build && \
git clone --progress https://github.com/anyshake/observer.git observer && \
export VERSION=`cat observer/VERSION` && \
cd observer/frontend/src && \
npm install && make && \
cd ../../docs && make && \
cd ../cmd && go mod tidy && \
go build -ldflags "-s -w -X main.version=$VERSION -X main.release=docker_build" -trimpath -o /tmp/observer *.go

FROM alpine
FROM alpine:latest

COPY --from=builder /tmp/observer /usr/bin/observer
RUN chmod 755 /usr/bin/observer && \
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v3.0.1
v3.0.2
1 change: 1 addition & 0 deletions api/v1/history/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (h *History) Register(rg *gin.RouterGroup, resolver *v1.Resolver) error {
}
fileName, dataBytes, err := h.getSACBytes(
result,
resolver.Config.Explorer.Legacy,
resolver.Config.Stream.Station,
resolver.Config.Stream.Network,
resolver.Config.Stream.Location,
Expand Down
14 changes: 9 additions & 5 deletions api/v1/history/sac.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package history

import (
"fmt"
"math"
"time"

"github.com/anyshake/observer/drivers/explorer"
"github.com/bclswl0827/sacio"
)

func (h *History) getSACBytes(data []explorer.ExplorerData, stationCode, networkCode, locationCode, channelPrefix, channelCode string) (string, []byte, error) {
func (h *History) getSACBytes(data []explorer.ExplorerData, legacyMode bool, stationCode, networkCode, locationCode, channelPrefix, channelCode string) (string, []byte, error) {
var (
startSampleRate = data[0].SampleRate
startTimestamp = data[0].Timestamp
Expand All @@ -19,14 +20,17 @@ func (h *History) getSACBytes(data []explorer.ExplorerData, stationCode, network

var channelBuffer []int32
for index, record := range data {

// Make sure timestamp is continuous
if record.Timestamp != startTimestamp+int64(index*1000) {
if math.Abs(float64(record.Timestamp-startTimestamp-int64(index*1000))) != 0 {
return "", nil, fmt.Errorf("timestamp is not continuous")
}

// Make sure sample rate is the same
if record.SampleRate != startSampleRate {
return "", nil, fmt.Errorf("sample rate is not the same")
if !legacyMode {
// Make sure sample rate is the same
if record.SampleRate != startSampleRate {
return "", nil, fmt.Errorf("sample rate is not the same")
}
}

switch channelCode {
Expand Down
42 changes: 20 additions & 22 deletions api/v1/socket/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package socket
import (
"encoding/json"
"net/http"
"time"

v1 "github.com/anyshake/observer/api/v1"
"github.com/anyshake/observer/drivers/explorer"
Expand Down Expand Up @@ -36,7 +35,7 @@ func (s *Socket) Register(rg *gin.RouterGroup, resolver *v1.Resolver) error {
func(data *explorer.ExplorerData) {
s.messageBus.Publish(s.GetApiName(), data)
s.historyBuffer[s.historyBufferIndex] = *data
s.historyBufferIndex = (s.historyBufferIndex + 1) % EXPLORER_BUFFER_SIZE
s.historyBufferIndex = (s.historyBufferIndex + 1) % HISTORY_BUFFER_SIZE
},
)

Expand All @@ -57,25 +56,6 @@ func (s *Socket) Register(rg *gin.RouterGroup, resolver *v1.Resolver) error {
}
defer conn.Close()

// Send history buffer to the client
for _, buffer := range s.historyBuffer {
if buffer.Timestamp == 0 {
continue
}
dataBytes, err := json.Marshal(buffer)
if err != nil {
logger.GetLogger(s.GetApiName()).Errorln(err)
return
}
err = conn.WriteMessage(websocket.TextMessage, dataBytes)
if err != nil {
logger.GetLogger(s.GetApiName()).Errorln(err)
return
}
// To prevent flooding the client
time.Sleep(time.Millisecond * 10)
}

// Subscribe to the internal message bus
clienrId := conn.RemoteAddr().String()
handler := func(data *explorer.ExplorerData) {
Expand All @@ -99,10 +79,28 @@ func (s *Socket) Register(rg *gin.RouterGroup, resolver *v1.Resolver) error {

// Listen for incoming messages
for {
_, _, err := conn.ReadMessage()
_, dataBytes, err := conn.ReadMessage()
if err != nil {
return
}
// Respond with history buffer if the client sends a "client hello" message
if string(dataBytes) == "client hello" {
for _, buffer := range s.historyBuffer {
if buffer.Timestamp == 0 {
continue
}
dataBytes, err := json.Marshal(buffer)
if err != nil {
logger.GetLogger(s.GetApiName()).Errorln(err)
return
}
err = conn.WriteMessage(websocket.TextMessage, dataBytes)
if err != nil {
logger.GetLogger(s.GetApiName()).Errorln(err)
return
}
}
}
}
})

Expand Down
4 changes: 2 additions & 2 deletions api/v1/socket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
messagebus "github.com/vardius/message-bus"
)

const EXPLORER_BUFFER_SIZE = 180
const HISTORY_BUFFER_SIZE = 180

type Socket struct {
messageBus messagebus.MessageBus // An independent message bus for the socket module
subscribers cmap.ConcurrentMap[string, explorer.ExplorerEventHandler]
historyBuffer [EXPLORER_BUFFER_SIZE]explorer.ExplorerData
historyBuffer [HISTORY_BUFFER_SIZE]explorer.ExplorerData
historyBufferIndex int
}
20 changes: 17 additions & 3 deletions drivers/explorer/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,9 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) {

// Read data from the FIFO buffer continuously
var (
dataBuffer = []legacyPacket{}
ticker = time.NewTicker(1 * time.Second)
dataBuffer = []legacyPacket{}
prevTime, _ = deps.FallbackTime.GetTime()
ticker = time.NewTicker(1 * time.Second)
)
for {
select {
Expand All @@ -231,7 +232,14 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) {
if len(dataBuffer) > 0 {
deps.Health.Received++
deps.Health.UpdatedAt = time.Now()

// Fix jitter in the timestamp
t, _ := deps.FallbackTime.GetTime()
if time.Duration(math.Abs(float64(t.Sub(prevTime).Milliseconds()))) <= EXPLORER_GENERAL_JITTER {
t = deps.FallbackTime.Fix(t, prevTime, time.Second)
}
prevTime = t

var (
z_axis_count []int32
e_axis_count []int32
Expand Down Expand Up @@ -276,6 +284,8 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) {
}

func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency) {
prevTime, _ := deps.FallbackTime.GetTime()

for {
select {
case <-deps.CancelToken.Done():
Expand Down Expand Up @@ -345,8 +355,12 @@ func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency)
if e.mainlinePacketHeader.timestamp != 0 {
finalPacket.Timestamp = e.mainlinePacketHeader.timestamp
} else {
// Fix jitter in the timestamp
t, _ := deps.FallbackTime.GetTime()
finalPacket.Timestamp = t.UTC().UnixMilli()
if time.Duration(math.Abs(float64(t.Sub(prevTime).Milliseconds()))) <= EXPLORER_GENERAL_JITTER {
t = deps.FallbackTime.Fix(t, prevTime, time.Second)
}
prevTime = t
}
deps.messageBus.Publish("explorer", &finalPacket)
deps.Health.UpdatedAt = time.Now()
Expand Down
2 changes: 2 additions & 0 deletions drivers/explorer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
messagebus "github.com/vardius/message-bus"
)

const EXPLORER_GENERAL_JITTER = 2 * time.Millisecond

const (
EXPLORER_CHANNEL_CODE_Z = "Z"
EXPLORER_CHANNEL_CODE_E = "E"
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
REACT_APP_VERSION=v3.0.1
REACT_APP_RELEASE=f0315abf-20240808033910
REACT_APP_VERSION=v3.0.2
REACT_APP_RELEASE=732d9df6-20240808110748
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/wille/osutil v0.0.0-20240729075835-ba7d4216ffe2
go.uber.org/dig v1.17.1
gorm.io/driver/mysql v1.5.2
gorm.io/driver/postgres v1.5.4
gorm.io/driver/postgres v1.5.7
gorm.io/driver/sqlserver v1.5.2
gorm.io/gorm v1.25.10
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/mysql v1.5.2 h1:QC2HRskSE75wBuOxe0+iCkyJZ+RqpudsQtqkp+IMuXs=
gorm.io/driver/mysql v1.5.2/go.mod h1:pQLhh1Ut/WUAySdTHwBpBv6+JKcj+ua4ZFx1QQTBzb8=
gorm.io/driver/postgres v1.5.4 h1:Iyrp9Meh3GmbSuyIAGyjkN+n9K+GHX9b9MqsTL4EJCo=
gorm.io/driver/postgres v1.5.4/go.mod h1:Bgo89+h0CRcdA33Y6frlaHHVuTdOf87pmyzwW9C/BH0=
gorm.io/driver/postgres v1.5.7 h1:8ptbNJTDbEmhdr62uReG5BGkdQyeasu/FZHxI0IMGnM=
gorm.io/driver/postgres v1.5.7/go.mod h1:3e019WlBaYI5o5LIdNV+LyxCMNtLOQETBXL2h4chKpA=
gorm.io/driver/sqlserver v1.5.2 h1:+o4RQ8w1ohPbADhFqDxeeZnSWjwOcBnxBckjTbcP4wk=
gorm.io/driver/sqlserver v1.5.2/go.mod h1:gaKF0MO0cfTq9Q3/XhkowSw4g6nIwHPGAs4hzKCmvBo=
gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
Expand Down
9 changes: 4 additions & 5 deletions services/miniseed/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import (
)

const (
MINISEED_BIT_ORDER = mseedio.MSBFIRST
MINISEED_ENCODE_TYPE = mseedio.STEIM2
MINISEED_WRITE_INTERVAL = 5
MINISEED_CLEANUP_INTERVAL = 60
MINISEED_ALLOWED_JITTER_MS = 10
MINISEED_BIT_ORDER = mseedio.MSBFIRST
MINISEED_ENCODE_TYPE = mseedio.STEIM2
MINISEED_WRITE_INTERVAL = 5
MINISEED_CLEANUP_INTERVAL = 60
)

type MiniSeedService struct {
Expand Down
9 changes: 4 additions & 5 deletions services/miniseed/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ func (m *MiniSeedService) handleWrite() error {
startSampleRate = m.miniseedBuffer[0].SampleRate
)

// Check if the timestamp is within the allowed jitter
for i := 1; i < len(m.miniseedBuffer); i++ {
if math.Abs(float64(m.miniseedBuffer[i].Timestamp-startTimestamp-int64(i*1000))) > 1000+MINISEED_ALLOWED_JITTER_MS {
// Make sure timestamp is continuous
if math.Abs(float64(m.miniseedBuffer[i].Timestamp-startTimestamp-int64(i*1000))) != 0 {
return fmt.Errorf("timestamp is not continuous, expected %d, got %d", startTimestamp+int64(i*1000), m.miniseedBuffer[i].Timestamp)
}
}

if !m.legacyMode {
for i := 1; i < len(m.miniseedBuffer); i++ {
if !m.legacyMode {
// Make sure sample rate is the same
if m.miniseedBuffer[i].SampleRate != startSampleRate {
return fmt.Errorf("sample rate is not the same, expected %d, got %d", startSampleRate, m.miniseedBuffer[i].SampleRate)
}
Expand Down
12 changes: 12 additions & 0 deletions utils/timesource/jitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package timesource

import (
"time"
)

func (s *Source) Fix(currentTime, prevTime time.Time, span time.Duration) time.Time {
expectedTime := prevTime.Add(span)
discrepancy := expectedTime.Sub(currentTime)

return currentTime.Add(discrepancy)
}

0 comments on commit ac50096

Please sign in to comment.