Skip to content

Commit

Permalink
Optimized for better SeedLink buffer performance
Browse files Browse the repository at this point in the history
  • Loading branch information
bclswl0827 committed Mar 6, 2024
1 parent 35beebc commit 440b472
Show file tree
Hide file tree
Showing 45 changed files with 1,364 additions and 241 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

Starting from v2.2.5, all notable changes to this project will be documented in this file.

## v2.11.0

- Using NoSQL database as SeedLink ring buffer backend
- Fix SeedLink command parsing issue which causes some clients to be unable to connect
- Fragmenting SeedLink packets to accommodate higher sampling rates
- Remove redundant `status` fields in `/api/v1/station` response
- Fix timestamp issue in geophone data collecting module

## v2.10.2

- Input component optimization
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v2.10.2
v2.11.0
2 changes: 1 addition & 1 deletion app/v1/inventory/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func getInventoryString(config *config.Conf, status *publisher.Status) string {
</responsePAZ>
<network publicID="Network@AnyShake" code="%s">
<start>%s</start>
<description>AnyShake_Project_Seismic_Network</description>
<description>AnyShake seismic network</description>
<institutions>AnyShake Project</institutions>
<region>%s</region>
<shared>true</shared>
Expand Down
3 changes: 1 addition & 2 deletions build/assets/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
"enable": false,
"host": "0.0.0.0",
"port": 18000,
"size": 43200,
"buffer": "/data/sl_buffer.dat"
"size": 86400
}
}
9 changes: 4 additions & 5 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ type miniseed struct {
}

type seedlink struct {
Enable bool `json:"enable"`
Host string `json:"host"`
Buffer string `json:"buffer"`
Port int `json:"port"`
Size int `json:"size"`
Enable bool `json:"enable"`
Host string `json:"host"`
Port int `json:"port"`
Duration int `json:"duration"`
}

type Conf struct {
Expand Down
9 changes: 0 additions & 9 deletions docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,20 +341,11 @@ const docTemplate = `{
"errors": {
"type": "integer"
},
"failures": {
"type": "integer"
},
"messages": {
"type": "integer"
},
"offset": {
"type": "number"
},
"pushed": {
"type": "integer"
},
"queued": {
"type": "integer"
}
}
},
Expand Down
9 changes: 0 additions & 9 deletions docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -333,20 +333,11 @@
"errors": {
"type": "integer"
},
"failures": {
"type": "integer"
},
"messages": {
"type": "integer"
},
"offset": {
"type": "number"
},
"pushed": {
"type": "integer"
},
"queued": {
"type": "integer"
}
}
},
Expand Down
6 changes: 0 additions & 6 deletions docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,10 @@ definitions:
properties:
errors:
type: integer
failures:
type: integer
messages:
type: integer
offset:
type: number
pushed:
type: integer
queued:
type: integer
type: object
response.HttpResponse:
properties:
Expand Down
2 changes: 1 addition & 1 deletion driver/seedlink/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type DATA struct{}

// Callback of "DATA" command, implements SeedLinkCommandCallback interface
func (*DATA) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.FeatureOptions, streamer SeedLinkStreamer, conn net.Conn, args ...string) error {
_, err := conn.Write([]byte(RES_OK))
_, err := conn.Write([]byte(RES_ERR))
return err
}

Expand Down
74 changes: 50 additions & 24 deletions driver/seedlink/end.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package seedlink

import (
"net"
"strconv"
"strings"
"time"

"github.com/anyshake/observer/feature"
"github.com/anyshake/observer/publisher"
"github.com/anyshake/observer/utils/text"
"github.com/ostafen/clover/v2/query"
)

type END struct{}
Expand All @@ -15,42 +18,65 @@ type END struct{}
func (*END) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.FeatureOptions, streamer SeedLinkStreamer, conn net.Conn, args ...string) error {
cl.StreamMode = true // Enter stream mode
var (
seqNum int64 = 0
channels = cl.Channels
location = cl.Location
endTime = cl.EndTime
startTime = cl.StartTime
station = text.TruncateString(cl.Station, 5)
network = text.TruncateString(cl.Network, 2)
seqNum int64 = 0
channels = cl.Channels
location = cl.Location
endTime = cl.EndTime
startTime = cl.StartTime
database = sl.SeedLinkBuffer.Database
collection = sl.SeedLinkBuffer.Collection
station = text.TruncateString(cl.Station, 5)
network = text.TruncateString(cl.Network, 2)
)

if startTime.IsZero() {
_, err := conn.Write([]byte(RES_ERR))
return err
}

// Send data in buffer
for _, buffer := range sl.SeedLinkBuffer.Data {
chMap := map[string]publisher.Int32Array{
"EHZ": buffer.EHZ, "EHE": buffer.EHE, "EHN": buffer.EHN,
records, err := database.FindAll(
query.NewQuery(collection).
Where(query.Field("ts").
Gt(startTime.UnixMilli()).
And(query.Field("ts").
Lt(endTime.UnixMilli()),
),
),
)
if err != nil {
conn.Write([]byte(RES_ERR))
return err
}

for _, record := range records {
var recordMap map[string]any
record.Unmarshal(&recordMap)
channelMap := map[string]string{
"EHZ": recordMap["ehz"].(string),
"EHE": recordMap["ehe"].(string),
"EHN": recordMap["ehn"].(string),
}
for _, channel := range channels {
if data, ok := chMap[channel]; ok {
bufTime := time.UnixMilli(buffer.TS)
if bufTime.After(startTime) && bufTime.Before(endTime) {
dataBytes, err := CreateSLPacket(data, buffer.TS, seqNum, network, station, channel, location)
data, ok := channelMap[channel]
if !ok {
continue
}
var (
timestamp = int64(recordMap["ts"].(float64))
bufTime = time.UnixMilli(timestamp)
)
if bufTime.After(startTime) && bufTime.Before(endTime) {
var countData []int32
for _, v := range strings.Split(data, "|") {
intData, err := strconv.Atoi(v)
if err != nil {
return err
}

if len(dataBytes) > 0 {
_, err = conn.Write(dataBytes)
if err != nil {
return err
}

seqNum++
}
countData = append(countData, int32(intData))
}
err := SendSLPacket(conn, countData, timestamp, &seqNum, network, station, channel, location)
if err != nil {
return err
}
}
}
Expand Down
81 changes: 54 additions & 27 deletions driver/seedlink/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,67 @@ package seedlink

import (
"fmt"
"net"
"time"

"github.com/bclswl0827/mseedio"
)

func CreateSLPacket(count []int32, ts, seq int64, network, station, channel, location string) ([]byte, error) {
// Generate MiniSEED, send to client
var miniseed mseedio.MiniSeedData
// Init header fields
miniseed.Init(mseedio.STEIM2, mseedio.MSBFIRST)

// Append MiniSEED data
err := miniseed.Append(count, &mseedio.AppendOptions{
StationCode: station,
LocationCode: location,
ChannelCode: channel,
NetworkCode: network,
SampleRate: float64(len(count)),
StartTime: time.UnixMilli(ts).UTC(),
SequenceNumber: fmt.Sprintf("%06d", seq),
})
if err != nil {
return nil, err
func SendSLPacket(conn net.Conn, count []int32, ts int64, seq *int64, network, station, channel, location string) error {
// Create data chunks to adapt to SeedLink packet size
var countGroup [][]int32
if len(count) > CHUNK_SIZE {
for i := 0; i < len(count); i += CHUNK_SIZE {
if i+CHUNK_SIZE > len(count) {
countGroup = append(countGroup, count[i:])
} else {
countGroup = append(countGroup, count[i:i+CHUNK_SIZE])
}
}
} else {
countGroup = append(countGroup, count)
}

// Get MiniSEED bytes
dataBytes, err := miniseed.Encode(mseedio.OVERWRITE, mseedio.MSBFIRST)
if err != nil {
return nil, err
} else if len(dataBytes) != PACKET_SIZE {
return nil, nil
dataSpanMs := 1000 / float64(len(count))
for i, c := range countGroup {
// Generate MiniSEED record
var miniseed mseedio.MiniSeedData
miniseed.Init(mseedio.STEIM2, mseedio.MSBFIRST)
err := miniseed.Append(c, &mseedio.AppendOptions{
StationCode: station,
LocationCode: location,
ChannelCode: channel,
NetworkCode: network,
SampleRate: float64(len(count)),
SequenceNumber: fmt.Sprintf("%06d", *seq),
StartTime: time.UnixMilli(ts + int64(float64(i*CHUNK_SIZE)*dataSpanMs)).UTC(),
})
if err != nil {
return err
}

// Get MiniSEED data bytes always in 512 bytes
miniseed.Series[0].BlocketteSection.RecordLength = 9
slData, err := miniseed.Encode(mseedio.OVERWRITE, mseedio.MSBFIRST)
if err != nil {
return err
}

// Prepend and send SeedLink sequence number
slSeq := []byte(fmt.Sprintf("SL%06X", *seq))
_, err = conn.Write(slSeq)
if err != nil {
return err
}

// Send SeedLink packet data
_, err = conn.Write(slData)
if err != nil {
return err
}

*seq++
}

// Return SeedLink packet
slSeq := []byte(fmt.Sprintf("SL%06X", seq))
return append(slSeq, dataBytes...), nil
return nil
}
10 changes: 2 additions & 8 deletions driver/seedlink/station.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,8 @@ type STATION struct{}

// Callback of "STATION <...> <...>" command, implements SeedLinkCommandCallback interface
func (*STATION) Callback(sl *SeedLinkGlobal, cl *SeedLinkClient, options *feature.FeatureOptions, streamer SeedLinkStreamer, conn net.Conn, args ...string) error {
if len(args) < 3 {
_, err := conn.Write([]byte(RES_ERR))
return err
} else {
cl.Station = args[1]
cl.Network = args[2]
}

cl.Station = args[0]
cl.Network = args[1]
_, err := conn.Write([]byte(RES_OK))
return err
}
Expand Down
9 changes: 5 additions & 4 deletions driver/seedlink/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (

"github.com/anyshake/observer/feature"
"github.com/anyshake/observer/publisher"
c "github.com/ostafen/clover/v2"
)

const (
PACKET_SIZE int = 512
CHUNK_SIZE int = 100
ORGANIZATION string = "anyshake.org"
RELEASE string = "SeedLink v3.1 AnyShake Edition (Very basic implementation in Go) :: SLPROTO:3.1 CAP EXTREPLY NSWILDCARD BATCH WS:13 :: Constructing Realtime Seismic Network Ambitiously."
)
Expand Down Expand Up @@ -38,9 +39,9 @@ type SeedLinkGlobal struct {

// SeedLink data buffer
type SeedLinkBuffer struct {
Size int
File string
Data []publisher.Geophone
Collection string
Duration time.Duration
Database *c.DB
}

// SeedLink basic state
Expand Down
11 changes: 5 additions & 6 deletions feature/geophone/callbacks.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package geophone

import (
"time"

"github.com/anyshake/observer/feature"
"github.com/anyshake/observer/utils/duration"
"github.com/anyshake/observer/utils/logger"
Expand All @@ -22,7 +20,6 @@ func (g *Geophone) OnReady(options *feature.FeatureOptions, v ...any) {
if !options.Status.ReadyTime.IsZero() {
var (
packet = v[0].(Packet)
lastTime = time.UnixMilli(options.Status.Buffer.TS).UTC()
currentTime, _ = duration.Timestamp(options.Status.System.Offset)
)

Expand All @@ -34,11 +31,13 @@ func (g *Geophone) OnReady(options *feature.FeatureOptions, v ...any) {
}

// Archive approximately 1 second has passed
timeDiff := duration.Difference(currentTime, lastTime)
timeDiff := duration.Difference(currentTime, options.Status.LastRecvTime)
if timeDiff >= READY_THRESHOLD {
// Set packet timestamp
// Set packet timestamp, note that the timestamp in buffer is the start of the packet
options.Status.Buffer.TS = currentTime.UnixMilli() - timeDiff.Milliseconds()
// Set last received time is the current timestamp
options.Status.LastRecvTime = currentTime
options.Status.System.Messages++
options.Status.Buffer.TS = currentTime.UnixMilli()
// Copy buffer and reset
options.Status.Geophone = *options.Status.Buffer
options.Status.Buffer.EHZ = []int32{}
Expand Down
Loading

0 comments on commit 440b472

Please sign in to comment.