From ac50096d9d95a395c7ccd8fa2fa4c0b397a43273 Mon Sep 17 00:00:00 2001 From: Chengxun Lee <24319042+bclswl0827@users.noreply.github.com> Date: Thu, 8 Aug 2024 11:55:24 +0800 Subject: [PATCH] Automatically fix time jitter when using internet NTP server --- CHANGELOG.md | 15 +++++++++++++- Dockerfile | 21 ++++++++----------- VERSION | 2 +- api/v1/history/module.go | 1 + api/v1/history/sac.go | 14 ++++++++----- api/v1/socket/module.go | 42 ++++++++++++++++++-------------------- api/v1/socket/types.go | 4 ++-- drivers/explorer/impl.go | 20 +++++++++++++++--- drivers/explorer/types.go | 2 ++ frontend/src/.env | 4 ++-- go.mod | 2 +- go.sum | 4 ++-- services/miniseed/types.go | 9 ++++---- services/miniseed/write.go | 9 ++++---- utils/timesource/jitter.go | 12 +++++++++++ 15 files changed, 100 insertions(+), 61 deletions(-) create mode 100644 utils/timesource/jitter.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e8a0b7f4..a59e0cf6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,24 @@ Starting from v2.2.5, all notable changes to this project will be documented in this file. +## v3.0.2 + +### New Features + +- Added ability to automatically fix time jitter when using internet NTP server as time source. + +### Bug Fixes + +- Fixed "insufficient arguments" error when using PostgreSQL as the database backend (see https://github.com/go-gorm/gorm/issues/6832#issuecomment-1946211186). +- Never check for sample rate consistency in MiniSEED and SAC records when in legacy mode. +- Send history buffer only if client requests in WebSocket API to avoid flooding the client. +- Lowered `MINISEED_ALLOWED_JITTER_MS` constant to 2 ms for better jitter tolerance. + ## v3.0.1 ### Bug Fixes -- Fixed the issue where MiniSEED recording in legacy mode would be interrupted due to sampling rate jitter +- Fixed the issue where MiniSEED recording in legacy mode would be interrupted due to sampling rate jitter. ## v3.0.0 diff --git a/Dockerfile b/Dockerfile index d1c03c9d..c0051a49 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,19 +1,16 @@ FROM golang:alpine AS builder -RUN apk update && apk add --no-cache git bash wget curl make npm -WORKDIR /build -RUN git clone --progress https://github.com/anyshake/observer.git ./observer && \ - export VERSION=`cat ./observer/VERSION` && \ - cd ./observer/frontend/src && \ - npm install && \ - make && \ - cd ../../docs && \ - make && \ - cd ../cmd && \ - go mod tidy && \ +RUN apk update && apk add --no-cache git bash wget curl make npm && \ + mkdir -p /build && cd /build && \ + git clone --progress https://github.com/anyshake/observer.git observer && \ + export VERSION=`cat observer/VERSION` && \ + cd observer/frontend/src && \ + npm install && make && \ + cd ../../docs && make && \ + cd ../cmd && go mod tidy && \ go build -ldflags "-s -w -X main.version=$VERSION -X main.release=docker_build" -trimpath -o /tmp/observer *.go -FROM alpine +FROM alpine:latest COPY --from=builder /tmp/observer /usr/bin/observer RUN chmod 755 /usr/bin/observer && \ diff --git a/VERSION b/VERSION index b105cea1..96506fd2 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v3.0.1 +v3.0.2 diff --git a/api/v1/history/module.go b/api/v1/history/module.go index c68e345b..b1833037 100644 --- a/api/v1/history/module.go +++ b/api/v1/history/module.go @@ -61,6 +61,7 @@ func (h *History) Register(rg *gin.RouterGroup, resolver *v1.Resolver) error { } fileName, dataBytes, err := h.getSACBytes( result, + resolver.Config.Explorer.Legacy, resolver.Config.Stream.Station, resolver.Config.Stream.Network, resolver.Config.Stream.Location, diff --git a/api/v1/history/sac.go b/api/v1/history/sac.go index b9f2744b..2c5645e2 100644 --- a/api/v1/history/sac.go +++ b/api/v1/history/sac.go @@ -2,13 +2,14 @@ package history import ( "fmt" + "math" "time" "github.com/anyshake/observer/drivers/explorer" "github.com/bclswl0827/sacio" ) -func (h *History) getSACBytes(data []explorer.ExplorerData, stationCode, networkCode, locationCode, channelPrefix, channelCode string) (string, []byte, error) { +func (h *History) getSACBytes(data []explorer.ExplorerData, legacyMode bool, stationCode, networkCode, locationCode, channelPrefix, channelCode string) (string, []byte, error) { var ( startSampleRate = data[0].SampleRate startTimestamp = data[0].Timestamp @@ -19,14 +20,17 @@ func (h *History) getSACBytes(data []explorer.ExplorerData, stationCode, network var channelBuffer []int32 for index, record := range data { + // Make sure timestamp is continuous - if record.Timestamp != startTimestamp+int64(index*1000) { + if math.Abs(float64(record.Timestamp-startTimestamp-int64(index*1000))) != 0 { return "", nil, fmt.Errorf("timestamp is not continuous") } - // Make sure sample rate is the same - if record.SampleRate != startSampleRate { - return "", nil, fmt.Errorf("sample rate is not the same") + if !legacyMode { + // Make sure sample rate is the same + if record.SampleRate != startSampleRate { + return "", nil, fmt.Errorf("sample rate is not the same") + } } switch channelCode { diff --git a/api/v1/socket/module.go b/api/v1/socket/module.go index 4033a509..2ae0c094 100644 --- a/api/v1/socket/module.go +++ b/api/v1/socket/module.go @@ -3,7 +3,6 @@ package socket import ( "encoding/json" "net/http" - "time" v1 "github.com/anyshake/observer/api/v1" "github.com/anyshake/observer/drivers/explorer" @@ -36,7 +35,7 @@ func (s *Socket) Register(rg *gin.RouterGroup, resolver *v1.Resolver) error { func(data *explorer.ExplorerData) { s.messageBus.Publish(s.GetApiName(), data) s.historyBuffer[s.historyBufferIndex] = *data - s.historyBufferIndex = (s.historyBufferIndex + 1) % EXPLORER_BUFFER_SIZE + s.historyBufferIndex = (s.historyBufferIndex + 1) % HISTORY_BUFFER_SIZE }, ) @@ -57,25 +56,6 @@ func (s *Socket) Register(rg *gin.RouterGroup, resolver *v1.Resolver) error { } defer conn.Close() - // Send history buffer to the client - for _, buffer := range s.historyBuffer { - if buffer.Timestamp == 0 { - continue - } - dataBytes, err := json.Marshal(buffer) - if err != nil { - logger.GetLogger(s.GetApiName()).Errorln(err) - return - } - err = conn.WriteMessage(websocket.TextMessage, dataBytes) - if err != nil { - logger.GetLogger(s.GetApiName()).Errorln(err) - return - } - // To prevent flooding the client - time.Sleep(time.Millisecond * 10) - } - // Subscribe to the internal message bus clienrId := conn.RemoteAddr().String() handler := func(data *explorer.ExplorerData) { @@ -99,10 +79,28 @@ func (s *Socket) Register(rg *gin.RouterGroup, resolver *v1.Resolver) error { // Listen for incoming messages for { - _, _, err := conn.ReadMessage() + _, dataBytes, err := conn.ReadMessage() if err != nil { return } + // Respond with history buffer if the client sends a "client hello" message + if string(dataBytes) == "client hello" { + for _, buffer := range s.historyBuffer { + if buffer.Timestamp == 0 { + continue + } + dataBytes, err := json.Marshal(buffer) + if err != nil { + logger.GetLogger(s.GetApiName()).Errorln(err) + return + } + err = conn.WriteMessage(websocket.TextMessage, dataBytes) + if err != nil { + logger.GetLogger(s.GetApiName()).Errorln(err) + return + } + } + } } }) diff --git a/api/v1/socket/types.go b/api/v1/socket/types.go index 6afd2ee9..5cc4d167 100644 --- a/api/v1/socket/types.go +++ b/api/v1/socket/types.go @@ -6,11 +6,11 @@ import ( messagebus "github.com/vardius/message-bus" ) -const EXPLORER_BUFFER_SIZE = 180 +const HISTORY_BUFFER_SIZE = 180 type Socket struct { messageBus messagebus.MessageBus // An independent message bus for the socket module subscribers cmap.ConcurrentMap[string, explorer.ExplorerEventHandler] - historyBuffer [EXPLORER_BUFFER_SIZE]explorer.ExplorerData + historyBuffer [HISTORY_BUFFER_SIZE]explorer.ExplorerData historyBufferIndex int } diff --git a/drivers/explorer/impl.go b/drivers/explorer/impl.go index 023ea5e4..7eef6b95 100644 --- a/drivers/explorer/impl.go +++ b/drivers/explorer/impl.go @@ -220,8 +220,9 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { // Read data from the FIFO buffer continuously var ( - dataBuffer = []legacyPacket{} - ticker = time.NewTicker(1 * time.Second) + dataBuffer = []legacyPacket{} + prevTime, _ = deps.FallbackTime.GetTime() + ticker = time.NewTicker(1 * time.Second) ) for { select { @@ -231,7 +232,14 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { if len(dataBuffer) > 0 { deps.Health.Received++ deps.Health.UpdatedAt = time.Now() + + // Fix jitter in the timestamp t, _ := deps.FallbackTime.GetTime() + if time.Duration(math.Abs(float64(t.Sub(prevTime).Milliseconds()))) <= EXPLORER_GENERAL_JITTER { + t = deps.FallbackTime.Fix(t, prevTime, time.Second) + } + prevTime = t + var ( z_axis_count []int32 e_axis_count []int32 @@ -276,6 +284,8 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { } func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency) { + prevTime, _ := deps.FallbackTime.GetTime() + for { select { case <-deps.CancelToken.Done(): @@ -345,8 +355,12 @@ func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency) if e.mainlinePacketHeader.timestamp != 0 { finalPacket.Timestamp = e.mainlinePacketHeader.timestamp } else { + // Fix jitter in the timestamp t, _ := deps.FallbackTime.GetTime() - finalPacket.Timestamp = t.UTC().UnixMilli() + if time.Duration(math.Abs(float64(t.Sub(prevTime).Milliseconds()))) <= EXPLORER_GENERAL_JITTER { + t = deps.FallbackTime.Fix(t, prevTime, time.Second) + } + prevTime = t } deps.messageBus.Publish("explorer", &finalPacket) deps.Health.UpdatedAt = time.Now() diff --git a/drivers/explorer/types.go b/drivers/explorer/types.go index ae6e8dc3..be19c70a 100644 --- a/drivers/explorer/types.go +++ b/drivers/explorer/types.go @@ -10,6 +10,8 @@ import ( messagebus "github.com/vardius/message-bus" ) +const EXPLORER_GENERAL_JITTER = 2 * time.Millisecond + const ( EXPLORER_CHANNEL_CODE_Z = "Z" EXPLORER_CHANNEL_CODE_E = "E" diff --git a/frontend/src/.env b/frontend/src/.env index f1f355ac..d106c803 100644 --- a/frontend/src/.env +++ b/frontend/src/.env @@ -1,2 +1,2 @@ -REACT_APP_VERSION=v3.0.1 -REACT_APP_RELEASE=f0315abf-20240808033910 +REACT_APP_VERSION=v3.0.2 +REACT_APP_RELEASE=732d9df6-20240808110748 diff --git a/go.mod b/go.mod index 7d696640..160aa2d7 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/wille/osutil v0.0.0-20240729075835-ba7d4216ffe2 go.uber.org/dig v1.17.1 gorm.io/driver/mysql v1.5.2 - gorm.io/driver/postgres v1.5.4 + gorm.io/driver/postgres v1.5.7 gorm.io/driver/sqlserver v1.5.2 gorm.io/gorm v1.25.10 ) diff --git a/go.sum b/go.sum index 782dd1e6..e5e0c5dc 100644 --- a/go.sum +++ b/go.sum @@ -372,8 +372,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/mysql v1.5.2 h1:QC2HRskSE75wBuOxe0+iCkyJZ+RqpudsQtqkp+IMuXs= gorm.io/driver/mysql v1.5.2/go.mod h1:pQLhh1Ut/WUAySdTHwBpBv6+JKcj+ua4ZFx1QQTBzb8= -gorm.io/driver/postgres v1.5.4 h1:Iyrp9Meh3GmbSuyIAGyjkN+n9K+GHX9b9MqsTL4EJCo= -gorm.io/driver/postgres v1.5.4/go.mod h1:Bgo89+h0CRcdA33Y6frlaHHVuTdOf87pmyzwW9C/BH0= +gorm.io/driver/postgres v1.5.7 h1:8ptbNJTDbEmhdr62uReG5BGkdQyeasu/FZHxI0IMGnM= +gorm.io/driver/postgres v1.5.7/go.mod h1:3e019WlBaYI5o5LIdNV+LyxCMNtLOQETBXL2h4chKpA= gorm.io/driver/sqlserver v1.5.2 h1:+o4RQ8w1ohPbADhFqDxeeZnSWjwOcBnxBckjTbcP4wk= gorm.io/driver/sqlserver v1.5.2/go.mod h1:gaKF0MO0cfTq9Q3/XhkowSw4g6nIwHPGAs4hzKCmvBo= gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= diff --git a/services/miniseed/types.go b/services/miniseed/types.go index 76964db7..8fef9ac1 100644 --- a/services/miniseed/types.go +++ b/services/miniseed/types.go @@ -6,11 +6,10 @@ import ( ) const ( - MINISEED_BIT_ORDER = mseedio.MSBFIRST - MINISEED_ENCODE_TYPE = mseedio.STEIM2 - MINISEED_WRITE_INTERVAL = 5 - MINISEED_CLEANUP_INTERVAL = 60 - MINISEED_ALLOWED_JITTER_MS = 10 + MINISEED_BIT_ORDER = mseedio.MSBFIRST + MINISEED_ENCODE_TYPE = mseedio.STEIM2 + MINISEED_WRITE_INTERVAL = 5 + MINISEED_CLEANUP_INTERVAL = 60 ) type MiniSeedService struct { diff --git a/services/miniseed/write.go b/services/miniseed/write.go index 64a6f75b..7a9f056a 100644 --- a/services/miniseed/write.go +++ b/services/miniseed/write.go @@ -15,15 +15,14 @@ func (m *MiniSeedService) handleWrite() error { startSampleRate = m.miniseedBuffer[0].SampleRate ) - // Check if the timestamp is within the allowed jitter for i := 1; i < len(m.miniseedBuffer); i++ { - if math.Abs(float64(m.miniseedBuffer[i].Timestamp-startTimestamp-int64(i*1000))) > 1000+MINISEED_ALLOWED_JITTER_MS { + // Make sure timestamp is continuous + if math.Abs(float64(m.miniseedBuffer[i].Timestamp-startTimestamp-int64(i*1000))) != 0 { return fmt.Errorf("timestamp is not continuous, expected %d, got %d", startTimestamp+int64(i*1000), m.miniseedBuffer[i].Timestamp) } - } - if !m.legacyMode { - for i := 1; i < len(m.miniseedBuffer); i++ { + if !m.legacyMode { + // Make sure sample rate is the same if m.miniseedBuffer[i].SampleRate != startSampleRate { return fmt.Errorf("sample rate is not the same, expected %d, got %d", startSampleRate, m.miniseedBuffer[i].SampleRate) } diff --git a/utils/timesource/jitter.go b/utils/timesource/jitter.go new file mode 100644 index 00000000..4e060666 --- /dev/null +++ b/utils/timesource/jitter.go @@ -0,0 +1,12 @@ +package timesource + +import ( + "time" +) + +func (s *Source) Fix(currentTime, prevTime time.Time, span time.Duration) time.Time { + expectedTime := prevTime.Add(span) + discrepancy := expectedTime.Sub(currentTime) + + return currentTime.Add(discrepancy) +}