-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add batching support for high-volume runtime monitoring #12168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,10 +17,12 @@ | |
| package remote | ||
|
|
||
| import ( | ||
| "encoding/binary" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "os" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "golang.org/x/sys/unix" | ||
|
|
@@ -45,6 +47,12 @@ func init() { | |
| }) | ||
| } | ||
|
|
||
| // batchedMessage represents a message to be sent in batch. | ||
| type batchedMessage struct { | ||
| msg proto.Message | ||
| msgType pb.MessageType | ||
| } | ||
|
|
||
| // remote sends a serialized point to a remote process asynchronously over a | ||
| // SOCK_SEQPACKET Unix-domain socket. Each message corresponds to a single | ||
| // serialized point proto, preceded by a standard header. If the point cannot | ||
|
|
@@ -58,6 +66,15 @@ type remote struct { | |
| retries int | ||
| initialBackoff time.Duration | ||
| maxBackoff time.Duration | ||
|
|
||
| // Batching fields | ||
| batchInterval time.Duration | ||
| remoteVersion uint32 | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| batchMu sync.Mutex | ||
| batch []batchedMessage | ||
| batchTicker *time.Ticker | ||
| stopBatch chan struct{} | ||
| wg sync.WaitGroup | ||
| } | ||
|
|
||
| var _ seccheck.Sink = (*remote)(nil) | ||
|
|
@@ -74,14 +91,20 @@ func setupSink(config map[string]any) (*os.File, error) { | |
| if !ok { | ||
| return nil, fmt.Errorf("endpoint %q is not a string", addrOpaque) | ||
| } | ||
| return setup(addr) | ||
| file, _, err := setupWithVersion(addr) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment below, if |
||
| return file, err | ||
| } | ||
|
|
||
| // setupWithVersion returns the file and the remote version. | ||
| func setupWithVersion(path string) (*os.File, uint32, error) { | ||
| return setup(path) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Empty implementation. I guess this is where you would want to process the remote's version and disable batching if the remote doesn't support it. Please add unit test to check the fallback behavior. |
||
| } | ||
|
|
||
| func setup(path string) (*os.File, error) { | ||
| func setup(path string) (*os.File, uint32, error) { | ||
| log.Debugf("Remote sink connecting to %q", path) | ||
| socket, err := unix.Socket(unix.AF_UNIX, unix.SOCK_SEQPACKET, 0) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("socket(AF_UNIX, SOCK_SEQPACKET, 0): %w", err) | ||
| return nil, 0, fmt.Errorf("socket(AF_UNIX, SOCK_SEQPACKET, 0): %w", err) | ||
| } | ||
| f := os.NewFile(uintptr(socket), path) | ||
| cu := cleanup.Make(func() { | ||
|
|
@@ -91,46 +114,49 @@ func setup(path string) (*os.File, error) { | |
|
|
||
| addr := unix.SockaddrUnix{Name: path} | ||
| if err := unix.Connect(int(f.Fd()), &addr); err != nil { | ||
| return nil, fmt.Errorf("connect(%q): %w", path, err) | ||
| return nil, 0, fmt.Errorf("connect(%q): %w", path, err) | ||
| } | ||
|
|
||
| // Perform handshake. See common.proto for details about the protocol. | ||
| hsOut := pb.Handshake{Version: wire.CurrentVersion} | ||
| out, err := proto.Marshal(&hsOut) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("marshalling handshake message: %w", err) | ||
| return nil, 0, fmt.Errorf("marshalling handshake message: %w", err) | ||
| } | ||
| if _, err := f.Write(out); err != nil { | ||
| return nil, fmt.Errorf("sending handshake message: %w", err) | ||
| return nil, 0, fmt.Errorf("sending handshake message: %w", err) | ||
| } | ||
|
|
||
| in := make([]byte, 10240) | ||
| read, err := f.Read(in) | ||
| if err != nil && !errors.Is(err, io.EOF) { | ||
| return nil, fmt.Errorf("reading handshake message: %w", err) | ||
| return nil, 0, fmt.Errorf("reading handshake message: %w", err) | ||
| } | ||
| // Protect against the handshake becoming larger than the buffer allocated | ||
| // for it. | ||
| if read == len(in) { | ||
| return nil, fmt.Errorf("handshake message too big") | ||
| return nil, 0, fmt.Errorf("handshake message too big") | ||
| } | ||
| hsIn := pb.Handshake{} | ||
| if err := proto.Unmarshal(in[:read], &hsIn); err != nil { | ||
| return nil, fmt.Errorf("unmarshalling handshake message: %w", err) | ||
| return nil, 0, fmt.Errorf("unmarshalling handshake message: %w", err) | ||
| } | ||
|
|
||
| // Check that remote version can be supported. | ||
| const minSupportedVersion = 1 | ||
| if hsIn.Version < minSupportedVersion { | ||
| return nil, fmt.Errorf("remote version (%d) is smaller than minimum supported (%d)", hsIn.Version, minSupportedVersion) | ||
| return nil, 0, fmt.Errorf("remote version (%d) is smaller than minimum supported (%d)", hsIn.Version, minSupportedVersion) | ||
| } | ||
|
|
||
| // Version 2+ supports batching | ||
| const batchingSupportedVersion = 2 | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not used? |
||
|
|
||
| if err := unix.SetNonblock(int(f.Fd()), true); err != nil { | ||
| return nil, err | ||
| return nil, 0, err | ||
| } | ||
|
|
||
| cu.Release() | ||
| return f, nil | ||
| return f, hsIn.Version, nil | ||
| } | ||
|
|
||
| func parseDuration(config map[string]any, name string) (bool, time.Duration, error) { | ||
|
|
@@ -183,6 +209,22 @@ func new(config map[string]any, endpoint *fd.FD) (seccheck.Sink, error) { | |
| return nil, fmt.Errorf("initial backoff (%v) cannot be larger than max backoff (%v)", r.initialBackoff, r.maxBackoff) | ||
| } | ||
|
|
||
| // Parse batch interval | ||
| if ok, batchInterval, err := parseDuration(config, "batch_interval"); err != nil { | ||
| return nil, err | ||
| } else if ok { | ||
| r.batchInterval = batchInterval | ||
| } | ||
|
|
||
| // Initialize batching if batch_interval is set | ||
| if r.batchInterval > 0 { | ||
| r.stopBatch = make(chan struct{}) | ||
| r.batchTicker = time.NewTicker(r.batchInterval) | ||
| r.wg.Add(1) | ||
| go r.batchFlushLoop() | ||
| log.Debugf("Remote sink batching enabled with interval %v", r.batchInterval) | ||
| } | ||
|
|
||
| log.Debugf("Remote sink created, endpoint FD: %d, %+v", r.endpoint.FD(), r) | ||
| return r, nil | ||
| } | ||
|
|
@@ -199,46 +241,167 @@ func (r *remote) Status() seccheck.SinkStatus { | |
|
|
||
| // Stop implements seccheck.Sink. | ||
| func (r *remote) Stop() { | ||
| // Stop batching if enabled | ||
| if r.batchInterval > 0 { | ||
| if r.batchTicker != nil { | ||
| r.batchTicker.Stop() | ||
| } | ||
| close(r.stopBatch) | ||
| r.wg.Wait() // Wait for flush loop to finish | ||
|
|
||
| // Flush any remaining messages | ||
| r.batchMu.Lock() | ||
| if len(r.batch) > 0 { | ||
| r.flushBatchLocked() | ||
| } | ||
| r.batchMu.Unlock() | ||
| } | ||
|
|
||
| if r.endpoint != nil { | ||
| // It's possible to race with Point firing, but in the worst case they will | ||
| // simply fail to be delivered. | ||
| r.endpoint.Close() | ||
| } | ||
| } | ||
|
|
||
| func (r *remote) write(msg proto.Message, msgType pb.MessageType) { | ||
| out, err := proto.Marshal(msg) | ||
| if err != nil { | ||
| log.Debugf("Marshal(%+v): %v", msg, err) | ||
| // batchFlushLoop runs in a goroutine and flushes batched messages periodically. | ||
| func (r *remote) batchFlushLoop() { | ||
| defer r.wg.Done() | ||
| for { | ||
| select { | ||
| case <-r.batchTicker.C: | ||
| r.batchMu.Lock() | ||
| if len(r.batch) > 0 { | ||
| r.flushBatchLocked() | ||
| } | ||
| r.batchMu.Unlock() | ||
| case <-r.stopBatch: | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // flushBatchLocked sends all batched messages individually to maintain SEQPACKET boundaries. | ||
| // Must be called with batchMu held. | ||
| func (r *remote) flushBatchLocked() { | ||
| if len(r.batch) == 0 { | ||
| return | ||
| } | ||
| hdr := wire.Header{ | ||
| HeaderSize: uint16(wire.HeaderStructSize), | ||
| DroppedCount: r.droppedCount.Load(), | ||
| MessageType: uint16(msgType), | ||
|
|
||
| log.Debugf("Flushing batch of %d messages using individual writes", len(r.batch)) | ||
|
|
||
| // SOCK_SEQPACKET requires separate writes to maintain message boundaries | ||
| // Send each message individually with brief spacing to avoid overwhelming the socket | ||
| currentDroppedCount := r.droppedCount.Load() | ||
| var failedCount uint32 | ||
|
|
||
| for i, bm := range r.batch { | ||
| // Marshal the message | ||
| out, err := proto.Marshal(bm.msg) | ||
| if err != nil { | ||
| log.Debugf("Marshal(%+v): %v", bm.msg, err) | ||
| failedCount++ | ||
| continue | ||
| } | ||
|
|
||
| // Create header for this message - use snapshot of dropped count | ||
| hdr := wire.Header{ | ||
| HeaderSize: uint16(wire.HeaderStructSize), | ||
| DroppedCount: currentDroppedCount, | ||
| MessageType: uint16(bm.msgType), | ||
| } | ||
| var hdrOut [wire.HeaderStructSize]byte | ||
| binary.LittleEndian.PutUint16(hdrOut[0:2], hdr.HeaderSize) | ||
| binary.LittleEndian.PutUint16(hdrOut[2:4], hdr.MessageType) | ||
| binary.LittleEndian.PutUint32(hdrOut[4:8], hdr.DroppedCount) | ||
|
|
||
| // Send this message with retry logic for EAGAIN | ||
| if err := r.writeSingleMessage(hdrOut[:], out); err != nil { | ||
| failedCount++ | ||
| if failedCount == 1 { // Log only first error to avoid spam | ||
| log.Debugf("Batch message write failed: %v", err) | ||
| } | ||
| } | ||
|
|
||
| // Add brief spacing to avoid overwhelming the socket buffer | ||
| // For large batches (>100 messages), add microsecond delays | ||
| if i > 0 && i%100 == 0 && len(r.batch) > 100 { | ||
| time.Sleep(10 * time.Microsecond) | ||
| } | ||
| } | ||
| var hdrOut [wire.HeaderStructSize]byte | ||
| hdr.MarshalUnsafe(hdrOut[:]) | ||
|
|
||
| if failedCount > 0 { | ||
| log.Debugf("Batch flush completed: %d failed out of %d messages", failedCount, len(r.batch)) | ||
| r.droppedCount.Add(failedCount) | ||
| } | ||
|
|
||
| // Clear the batch | ||
| r.batch = r.batch[:0] | ||
| } | ||
|
|
||
| // writeSingleMessage sends a single message with retry logic for EAGAIN | ||
| func (r *remote) writeSingleMessage(header, payload []byte) error { | ||
| backoff := r.initialBackoff | ||
| for i := 0; ; i++ { | ||
| _, err := unix.Writev(r.endpoint.FD(), [][]byte{hdrOut[:], out}) | ||
| for i := 0; i <= r.retries; i++ { | ||
| _, err := unix.Writev(r.endpoint.FD(), [][]byte{header, payload}) | ||
| if err == nil { | ||
| // Write succeeded, we're done! | ||
| return | ||
| return nil | ||
| } | ||
| if !errors.Is(err, unix.EAGAIN) || i >= r.retries { | ||
| log.Debugf("Write failed, dropping point: %v", err) | ||
| r.droppedCount.Add(1) | ||
| return | ||
| if !errors.Is(err, unix.EAGAIN) { | ||
| return err // Non-retryable error | ||
| } | ||
| log.Debugf("Write failed, retrying (%d/%d) in %v: %v", i+1, r.retries, backoff, err) | ||
| if i >= r.retries { | ||
| return err // Max retries exceeded | ||
| } | ||
|
|
||
| // Brief backoff for EAGAIN | ||
| time.Sleep(backoff) | ||
| backoff *= 2 | ||
| if r.maxBackoff > 0 && backoff > r.maxBackoff { | ||
| backoff = r.maxBackoff | ||
| } | ||
| } | ||
| return unix.EAGAIN | ||
| } | ||
|
|
||
| // writeSingle sends a single message immediately (internal method). | ||
| func (r *remote) writeSingle(msg proto.Message, msgType pb.MessageType) { | ||
| out, err := proto.Marshal(msg) | ||
| if err != nil { | ||
| log.Debugf("Marshal(%+v): %v", msg, err) | ||
| return | ||
| } | ||
| hdr := wire.Header{ | ||
| HeaderSize: uint16(wire.HeaderStructSize), | ||
| DroppedCount: r.droppedCount.Load(), | ||
| MessageType: uint16(msgType), | ||
| } | ||
| var hdrOut [wire.HeaderStructSize]byte | ||
| binary.LittleEndian.PutUint16(hdrOut[0:2], hdr.HeaderSize) | ||
| binary.LittleEndian.PutUint16(hdrOut[2:4], hdr.MessageType) | ||
| binary.LittleEndian.PutUint32(hdrOut[4:8], hdr.DroppedCount) | ||
|
|
||
| if err := r.writeSingleMessage(hdrOut[:], out); err != nil { | ||
| log.Debugf("Write failed, dropping point: %v", err) | ||
| r.droppedCount.Add(1) | ||
| } | ||
| } | ||
|
|
||
| func (r *remote) write(msg proto.Message, msgType pb.MessageType) { | ||
| // If batching is not enabled, send immediately | ||
| if r.batchInterval <= 0 { | ||
| r.writeSingle(msg, msgType) | ||
| return | ||
| } | ||
|
|
||
| // Add to batch | ||
| r.batchMu.Lock() | ||
| defer r.batchMu.Unlock() | ||
|
|
||
| r.batch = append(r.batch, batchedMessage{ | ||
| msg: msg, | ||
| msgType: msgType, | ||
| }) | ||
| } | ||
|
|
||
| // Clone implements seccheck.Sink. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use
BatchEntryhere instead of defining a new type?