Skip to content

Commit

Permalink
feat: watermark otwatcher enhancement (numaproj#364)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jy4096 authored Nov 18, 2022
1 parent f817057 commit 256e66b
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 118 deletions.
8 changes: 4 additions & 4 deletions pkg/reduce/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestReduceDataForward_Count(t *testing.T) {
// create from buffers
fromBuffer := simplebuffer.NewInMemoryBuffer(fromBufferName, fromBufferSize)

//create to buffers
// create to buffers
buffer := simplebuffer.NewInMemoryBuffer(toBufferName, toBufferSize)
toBuffer := map[string]isb.BufferWriter{
toBufferName: buffer,
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestReduceDataForward_Sum(t *testing.T) {
// create from buffers
fromBuffer := simplebuffer.NewInMemoryBuffer(fromBufferName, fromBufferSize)

//create to buffers
// create to buffers
buffer := simplebuffer.NewInMemoryBuffer(toBufferName, toBufferSize)
toBuffer := map[string]isb.BufferWriter{
toBufferName: buffer,
Expand Down Expand Up @@ -398,7 +398,7 @@ func TestReduceDataForward_Max(t *testing.T) {
// create from buffers
fromBuffer := simplebuffer.NewInMemoryBuffer(fromBufferName, fromBufferSize)

//create to buffers
// create to buffers
buffer := simplebuffer.NewInMemoryBuffer(toBufferName, toBufferSize)
toBuffer := map[string]isb.BufferWriter{
toBufferName: buffer,
Expand Down Expand Up @@ -471,7 +471,7 @@ func TestReduceDataForward_SumWithDifferentKeys(t *testing.T) {
// create from buffers
fromBuffer := simplebuffer.NewInMemoryBuffer(fromBufferName, fromBufferSize)

//create to buffers
// create to buffers
buffer := simplebuffer.NewInMemoryBuffer(toBufferName, toBufferSize)
toBuffer := map[string]isb.BufferWriter{
toBufferName: buffer,
Expand Down
2 changes: 1 addition & 1 deletion pkg/watermark/fetch/edge_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (e *edgeFetcher) GetWatermark(inputOffset isb.Offset) processor.Watermark {
}
if p.IsDeleted() && (offset > p.offsetTimeline.GetHeadOffset()) {
// if the pod is not active and the current offset is ahead of all offsets in Timeline
e.processorManager.DeleteProcessor(p.entity.GetID())
e.processorManager.DeleteProcessor(p.entity.GetName())
}
}
// if the offset is smaller than every offset in the timeline, set the value to be -1
Expand Down
6 changes: 3 additions & 3 deletions pkg/watermark/fetch/edge_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func TestBuffer_GetWatermark(t *testing.T) {
otWatcher := noop.NewKVOpWatch()
processorManager := NewProcessorManager(ctx, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher))
var (
testPod0 = NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, otWatcher)
testPod1 = NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5, otWatcher)
testPod2 = NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5, otWatcher)
testPod0 = NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5)
testPod1 = NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod2"), 5)
testPod2 = NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod3"), 5)
pod0Timeline = []OffsetWatermark{
{watermark: 11, offset: 9},
{watermark: 12, offset: 20},
Expand Down
57 changes: 54 additions & 3 deletions pkg/watermark/fetch/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"sync"
"time"

"github.com/numaproj/numaflow/pkg/watermark/ot"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/processor"
Expand Down Expand Up @@ -69,6 +71,7 @@ func NewProcessorManager(ctx context.Context, watermarkStoreWatcher store.Waterm
}
go v.startRefreshingProcessors()
go v.startHeatBeatWatcher()
go v.startTimeLineWatcher()
return v
}

Expand Down Expand Up @@ -141,7 +144,6 @@ func (v *ProcessorManager) refreshingProcessors() {
// TODO: how to delete the pod from the heartbeat store?
v.log.Infow("Processor has been inactive for 10 heartbeats, deleting...", zap.String("key", pName), zap.String(pName, p.String()))
p.setStatus(_deleted)
p.stopTimeLineWatcher()
v.heartbeat.Delete(pName)
} else if time.Now().Unix()-pTime > v.opts.podHeartbeatRate {
// if the pod's last heartbeat is greater than podHeartbeatRate
Expand Down Expand Up @@ -177,7 +179,7 @@ func (v *ProcessorManager) startHeatBeatWatcher() {
// The fromProcessor may have been deleted
// TODO: make capacity configurable
var entity = processor.NewProcessorEntity(value.Key())
var fromProcessor = NewProcessorToFetch(v.ctx, entity, 10, v.otWatcher)
var fromProcessor = NewProcessorToFetch(v.ctx, entity, 10)
v.addProcessor(value.Key(), fromProcessor)
v.log.Infow("v.AddProcessor successfully added a new fromProcessor", zap.String("fromProcessor", value.Key()))
} else { // else just make a note that this processor is still active
Expand All @@ -200,7 +202,6 @@ func (v *ProcessorManager) startHeatBeatWatcher() {
} else {
v.log.Infow("Deleting", zap.String("key", value.Key()), zap.String(value.Key(), p.String()))
p.setStatus(_deleted)
p.stopTimeLineWatcher()
v.heartbeat.Delete(value.Key())
}
case store.KVPurge:
Expand All @@ -212,3 +213,53 @@ func (v *ProcessorManager) startHeatBeatWatcher() {

}
}

func (v *ProcessorManager) startTimeLineWatcher() {
watchCh, stopped := v.otWatcher.Watch(v.ctx)

for {
select {
case <-stopped:
return
case value := <-watchCh:
if value == nil {
continue
}
switch value.Operation() {
case store.KVPut:
p := v.GetProcessor(value.Key())
_ = wait.ExponentialBackoffWithContext(v.ctx, wait.Backoff{
// default heartbeat rate is set to 5 seconds, so retry every "duration * factor + [0, jitter]" interval for 5 times
Duration: 1 * time.Second,
Factor: 1,
Jitter: 0.1,
Steps: 5,
}, func() (done bool, err error) {
if p = v.GetProcessor(value.Key()); p == nil {
return false, nil
}
return true, nil
})
if p == nil {
v.log.Errorw("Unable to find the processor", zap.String("processorEntity", value.Key()))
continue
}
otValue, err := ot.DecodeToOTValue(value.Value())
if err != nil {
v.log.Errorw("Unable to decode the value", zap.String("processorEntity", p.entity.GetName()), zap.Error(err))
continue
}
p.offsetTimeline.Put(OffsetWatermark{
watermark: otValue.Watermark,
offset: otValue.Offset,
})
v.log.Debugw("TimelineWatcher- Updates", zap.String("bucket", v.otWatcher.GetKVName()), zap.Int64("watermark", otValue.Watermark), zap.Int64("offset", otValue.Offset))
case store.KVDelete:
// we do not care about Delete events because the timeline bucket is meant to grow and the TTL will
// naturally trim the KV store.
case store.KVPurge:
// skip
}
}
}
}
18 changes: 2 additions & 16 deletions pkg/watermark/fetch/processor_manager_inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,27 +182,13 @@ func TestFetcherWithSameOTBucket_InMem(t *testing.T) {
assert.True(t, allProcessors["p1"].IsActive())
assert.True(t, allProcessors["p2"].IsActive())
// "p1" has been deleted from vertex.Processors
// so "p1" will be considered as a new processors and a new offsetTimeline watcher for "p1" will be created
// so "p1" will be considered as a new processors with a new default offset timeline
_ = testBuffer.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+1, 10) }))
p1 := testBuffer.processorManager.GetProcessor("p1")

assert.NotNil(t, p1)
assert.True(t, p1.IsActive())
assert.NotNil(t, p1.offsetTimeline)
// wait till the offsetTimeline has been populated
newP1head := p1.offsetTimeline.GetHeadOffset()
for newP1head == -1 {
select {
case <-ctx.Done():
t.Fatalf("expected head offset to not be equal to -1, %s", ctx.Err())
default:
time.Sleep(1 * time.Millisecond)
newP1head = p1.offsetTimeline.GetHeadOffset()
}
}
// because we keep the kv updates history in the in mem watermark
// the new watcher will read all the history data to create this new offsetTimeline
assert.Equal(t, int64(100), p1.offsetTimeline.GetHeadOffset())
assert.Equal(t, int64(-1), p1.offsetTimeline.GetHeadOffset())

// publish a new watermark 101
otValueByte, err = otValueToBytes(testOffset+1, epoch)
Expand Down
20 changes: 4 additions & 16 deletions pkg/watermark/fetch/processor_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,25 +243,13 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
assert.True(t, allProcessors["p1"].IsActive())
assert.True(t, allProcessors["p2"].IsActive())
// "p1" has been deleted from vertex.Processors
// so "p1" will be considered as a new processors and a new offsetTimeline watcher for "p1" will be created
// so "p1" will be considered as a new processors with a new default offset timeline
_ = testBuffer.GetWatermark(isb.SimpleStringOffset(func() string { return strconv.FormatInt(testOffset+1, 10) }))
p1 := testBuffer.processorManager.GetProcessor("p1")
assert.NotNil(t, p1)
assert.True(t, p1.IsActive())
assert.NotNil(t, p1.offsetTimeline)
// wait till the offsetTimeline has been populated
newP1head := p1.offsetTimeline.GetHeadOffset()
// because the bucket hasn't been cleaned up, the new watcher will read all the history data to create this new offsetTimeline
for newP1head != 102 {
fmt.Println(p1.offsetTimeline.Dump())
select {
case <-ctx.Done():
t.Fatalf("expected head offset to be 102, %s", ctx.Err())
default:
time.Sleep(1 * time.Millisecond)
newP1head = p1.offsetTimeline.GetHeadOffset()
}
}
assert.Equal(t, int64(-1), p1.offsetTimeline.GetHeadOffset())

// publish a new watermark 103
otValueByte, err = otValueToBytes(testOffset+3, epoch)
Expand Down Expand Up @@ -310,11 +298,11 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
// added 103 in the previous steps for p1, so the head should be 103 after resume
assert.Equal(t, int64(103), p1.offsetTimeline.GetHeadOffset())

for allProcessors["p1"].offsetTimeline.Dump() != "[1651161660000:103] -> [1651161600300:102] -> [1651161600200:101] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" {
for allProcessors["p1"].offsetTimeline.Dump() != "[1651161660000:103] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1] -> [-1:-1]" {
select {
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
t.Fatalf("expected p1 has the offset timeline [1651161660000:103] -> [1651161600300:102] -> [1651161600200:101] -> [-1:-1]..., got %s: %s", allProcessors["p1"].offsetTimeline.Dump(), ctx.Err())
t.Fatalf("expected p1 has the offset timeline [1651161660000:103] -> [-1:-1] -> [-1:-1] -> [-1:-1]..., got %s: %s", allProcessors["p1"].offsetTimeline.Dump(), ctx.Err())
}
default:
time.Sleep(1 * time.Millisecond)
Expand Down
55 changes: 2 additions & 53 deletions pkg/watermark/fetch/processor_to_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ import (
"fmt"
"sync"

"github.com/numaproj/numaflow/pkg/watermark/ot"
"go.uber.org/zap"

"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/processor"
"github.com/numaproj/numaflow/pkg/watermark/store"
)

type status int
Expand All @@ -52,32 +50,26 @@ func (s status) String() string {
// ProcessorToFetch is the smallest unit of entity (from which we fetch data) that does inorder processing or contains inorder data.
type ProcessorToFetch struct {
ctx context.Context
cancel context.CancelFunc
entity processor.ProcessorEntitier
status status
offsetTimeline *OffsetTimeline
otWatcher store.WatermarkKVWatcher
lock sync.RWMutex
log *zap.SugaredLogger
}

func (p *ProcessorToFetch) String() string {
return fmt.Sprintf("%s status:%v, timeline: %s", p.entity.GetID(), p.getStatus(), p.offsetTimeline.Dump())
return fmt.Sprintf("%s status:%v, timeline: %s", p.entity.GetName(), p.getStatus(), p.offsetTimeline.Dump())
}

// NewProcessorToFetch creates ProcessorToFetch.
func NewProcessorToFetch(ctx context.Context, processor processor.ProcessorEntitier, capacity int, watcher store.WatermarkKVWatcher) *ProcessorToFetch {
ctx, cancel := context.WithCancel(ctx)
func NewProcessorToFetch(ctx context.Context, processor processor.ProcessorEntitier, capacity int) *ProcessorToFetch {
p := &ProcessorToFetch{
ctx: ctx,
cancel: cancel,
entity: processor,
status: _active,
offsetTimeline: NewOffsetTimeline(ctx, capacity),
otWatcher: watcher,
log: logging.FromContext(ctx),
}
go p.startTimeLineWatcher()
return p
}

Expand Down Expand Up @@ -113,46 +105,3 @@ func (p *ProcessorToFetch) IsDeleted() bool {
defer p.lock.RUnlock()
return p.status == _deleted
}

func (p *ProcessorToFetch) stopTimeLineWatcher() {
p.cancel()
}

func (p *ProcessorToFetch) startTimeLineWatcher() {
watchCh, stopped := p.otWatcher.Watch(p.ctx)

for {
select {
case <-stopped:
// no need to close ot watcher here because the ot watcher is shared for the given vertex
// the parent ctx will close the ot watcher
return
case value := <-watchCh:
// TODO: why will value will be nil?
if value == nil {
continue
}
switch value.Operation() {
case store.KVPut:
if value.Key() != p.entity.BuildOTWatcherKey() {
continue
}
otValue, err := ot.DecodeToOTValue(value.Value())
if err != nil {
p.log.Errorw("Unable to decode the value", zap.String("processorEntity", p.entity.GetID()), zap.Error(err))
continue
}
p.offsetTimeline.Put(OffsetWatermark{
watermark: otValue.Watermark,
offset: otValue.Offset,
})
p.log.Debugw("TimelineWatcher- Updates", zap.String("bucket", p.otWatcher.GetKVName()), zap.Int64("watermark", otValue.Watermark), zap.Int64("offset", otValue.Offset))
case store.KVDelete:
// we do not care about Delete events because the timeline bucket is meant to grow and the TTL will
// naturally trim the KV store.
case store.KVPurge:
// skip
}
}
}
}
3 changes: 1 addition & 2 deletions pkg/watermark/fetch/processor_to_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (
"context"
"testing"

"github.com/numaproj/numaflow/pkg/watermark/store/noop"
"github.com/stretchr/testify/assert"

"github.com/numaproj/numaflow/pkg/watermark/processor"
)

func TestFromProcessor_setStatus(t *testing.T) {
var ctx = context.Background()
p := NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5, noop.NewKVOpWatch())
p := NewProcessorToFetch(ctx, processor.NewProcessorEntity("testPod1"), 5)
p.setStatus(_inactive)
assert.Equal(t, _inactive, p.status)
}
12 changes: 3 additions & 9 deletions pkg/watermark/processor/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func (w Watermark) Before(t time.Time) bool {
// ProcessorEntitier defines what can be a processor. The Processor is the smallest unit where the watermark will
// monotonically increase.
type ProcessorEntitier interface {
GetID() string
BuildOTWatcherKey() string
GetName() string
}

// processorEntity implements ProcessorEntitier.
Expand All @@ -68,12 +67,7 @@ func NewProcessorEntity(name string) ProcessorEntitier {
}
}

// GetID returns the ID of the processor.
func (p *processorEntity) GetID() string {
// GetName returns the ID of the processor.
func (p *processorEntity) GetName() string {
return p.name
}

// BuildOTWatcherKey builds the offset-timeline key name
func (p *processorEntity) BuildOTWatcherKey() string {
return p.GetID()
}
2 changes: 1 addition & 1 deletion pkg/watermark/processor/entity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

func TestEntity(t *testing.T) {
e := NewProcessorEntity("pod0")
assert.Equal(t, "pod0", e.GetID())
assert.Equal(t, "pod0", e.GetName())
}

func ExampleWatermark_String() {
Expand Down
Loading

0 comments on commit 256e66b

Please sign in to comment.