Skip to content

Commit

Permalink
[POA-2672] Handle race condition while flushing witness (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
mudit-postman authored Jan 21, 2025
1 parent ba2270b commit 647d928
Showing 1 changed file with 40 additions and 15 deletions.
55 changes: 40 additions & 15 deletions trace/backend_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ type witnessWithInfo struct {
requestEnd time.Time
responseStart time.Time

// Mutex protecting witness while it is being processed and/or flushed.
witnessMutex sync.Mutex

// Whether the witness has been flushed to the backend.
witnessFlushed bool

witness *pb.Witness
}

Expand Down Expand Up @@ -210,22 +216,28 @@ func (c *BackendCollector) Process(t akinet.ParsedNetworkTraffic) error {
if val, ok := c.pairCache.LoadAndDelete(partial.PairKey); ok {
pair := val.(*witnessWithInfo)

// Combine the pair, merging the result into the existing item
// rather than the new partial.
learn.MergeWitness(pair.witness, partial.Witness)
pair.computeProcessingLatency(isRequest, t)

// If partial is the request, flip the src/dst in the pair before
// reporting.
if isRequest {
pair.srcIP, pair.dstIP = pair.dstIP, pair.srcIP
pair.srcPort, pair.dstPort = pair.dstPort, pair.srcPort
}

c.queueUpload(pair)
printer.Debugf("Completed witness %v at %v -- %v\n",
partial.PairKey, t.ObservationTime, t.FinalPacketTime)
func() {
// Lock the witness while it is being processed and flushed
// and unlock it after it is flushed
pair.witnessMutex.Lock()
defer pair.witnessMutex.Unlock()

// Combine the pair, merging the result into the existing item
// rather than the new partial.
learn.MergeWitness(pair.witness, partial.Witness)
pair.computeProcessingLatency(isRequest, t)

// If partial is the request, flip the src/dst in the pair before
// reporting.
if isRequest {
pair.srcIP, pair.dstIP = pair.dstIP, pair.srcIP
pair.srcPort, pair.dstPort = pair.dstPort, pair.srcPort
}

c.queueUpload(pair)
printer.Debugf("Completed witness %v at %v -- %v\n",
partial.PairKey, t.ObservationTime, t.FinalPacketTime)
}()
} else {
// Store the partial witness for now, waiting for its pair or a
// flush timeout.
Expand Down Expand Up @@ -335,6 +347,14 @@ func excludeWitnessFromReproMode(w *pb.Witness) bool {
}

func (c *BackendCollector) queueUpload(w *witnessWithInfo) {
if w.witnessFlushed {
printer.Debugf("Witness %v already flushed.\n", w.id)
return
}
defer func() {
w.witnessFlushed = true
}()

// Mark the method as not obfuscated.
w.witness.GetMethod().GetMeta().GetHttp().Obfuscation = pb.HTTPMethodMeta_NONE

Expand Down Expand Up @@ -398,6 +418,11 @@ func (c *BackendCollector) flushPairCache(cutoffTime time.Time) {
c.pairCache.Range(func(k, v interface{}) bool {
e := v.(*witnessWithInfo)
if e.observationTime.Before(cutoffTime) {
// Lock the witness while it is being flushed
// and unlock it after it is deleted from pairCache
e.witnessMutex.Lock()
defer e.witnessMutex.Unlock()

c.queueUpload(e)
c.pairCache.Delete(k)
}
Expand Down

0 comments on commit 647d928

Please sign in to comment.