From c3739ccaf29d17f881c059e2a2a210bf6d33bf4c Mon Sep 17 00:00:00 2001
From: Derek Wang
-StoreType (
string
alias)
-
-
-StoreType is the PBQ store’s backend type. -
-string
alias)
-
--
Generated with gen-crd-api-reference-docs
.
diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go
index d4a08194ae..e71c871073 100644
--- a/pkg/apis/numaflow/v1alpha1/const.go
+++ b/pkg/apis/numaflow/v1alpha1/const.go
@@ -130,64 +130,15 @@ const (
DefaultPBQReadTimeout = 1 * time.Second // Default read timeout for pbq
DefaultPBQReadBatchSize = 100 // Default read batch size for pbq
+ // PVC mount path for PBQ
+ PathPBQMount = "/var/numaflow/pbq"
+
// Default persistent store options
DefaultStoreSyncDuration = 2 * time.Second // Default sync duration for pbq
- DefaultStoreType = NoOpType // Default store type
- DefaultStoreSize = 1000000 // Default persistent store size
DefaultStoreMaxBufferSize = 100000 // Default buffer size for pbq in bytes
DefaultStorePath = PathPBQMount + "/wals" // Default store path
-
- // Default window options
- DefaultWindowType = FixedType
- DefaultWindowDuration = 0
-
- // PVC mount path for PBQ
- PathPBQMount = "/var/numaflow/pbq"
)
-// StoreType is the PBQ store's backend type.
-type StoreType string
-
-const (
- InMemoryType StoreType = "in-memory"
- FileSystemType StoreType = "file-system"
- NoOpType StoreType = "no-op"
-)
-
-func (st StoreType) String() string {
- switch st {
- case InMemoryType:
- return string(InMemoryType)
- case FileSystemType:
- return string(FileSystemType)
- case NoOpType:
- return string(NoOpType)
- default:
- return "unknownStoreType"
- }
-}
-
-type WindowType string
-
-const (
- FixedType WindowType = "fixed"
- SlidingType WindowType = "sliding"
- SessionType WindowType = "session"
-)
-
-func (wt WindowType) String() string {
- switch wt {
- case FixedType:
- return string(FixedType)
- case SlidingType:
- return string(SlidingType)
- case SessionType:
- return string(SessionType)
- default:
- return "unknownWindowType"
- }
-}
-
var (
MessageKeyDrop = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
MessageKeyAll = fmt.Sprintf("%U__ALL__", '\\') // U+005C__ALL__
diff --git a/pkg/reduce/options.go b/pkg/reduce/options.go
index 7ca2ae85a1..bdb7f0fe2e 100644
--- a/pkg/reduce/options.go
+++ b/pkg/reduce/options.go
@@ -18,15 +18,12 @@ package reduce
import (
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
- "github.com/numaproj/numaflow/pkg/window"
)
// Options for forwarding the message
type Options struct {
// readBatchSize is the default batch size
readBatchSize int64
- // windowOpts Options for window
- windowOpts *window.Options
}
type Option func(*Options) error
@@ -34,7 +31,6 @@ type Option func(*Options) error
func DefaultOptions() *Options {
return &Options{
readBatchSize: dfv1.DefaultReadBatchSize,
- windowOpts: window.DefaultOptions(),
}
}
@@ -45,16 +41,3 @@ func WithReadBatchSize(f int64) Option {
return nil
}
}
-
-// WithWindowOptions sets different window options
-func WithWindowOptions(opts ...window.Option) Option {
- return func(options *Options) error {
- for _, opt := range opts {
- err := opt(options.windowOpts)
- if err != nil {
- return err
- }
- }
- return nil
- }
-}
diff --git a/pkg/reduce/readloop/readloop.go b/pkg/reduce/readloop/readloop.go
index 78aedf65fe..182feda925 100644
--- a/pkg/reduce/readloop/readloop.go
+++ b/pkg/reduce/readloop/readloop.go
@@ -67,7 +67,7 @@ func NewReadLoop(ctx context.Context,
toBuffers map[string]isb.BufferWriter,
whereToDecider forward.ToWhichStepDecider,
pw map[string]publish.Publisher,
- _ *window.Options) *ReadLoop {
+) *ReadLoop {
op := newOrderedForwarder(ctx)
diff --git a/pkg/reduce/reduce.go b/pkg/reduce/reduce.go
index 650fa963ed..dc9177b1d7 100644
--- a/pkg/reduce/reduce.go
+++ b/pkg/reduce/reduce.go
@@ -67,7 +67,7 @@ func NewDataForward(ctx context.Context,
}
}
- rl := readloop.NewReadLoop(ctx, udf, pbqManager, windowingStrategy, toBuffers, whereToDecider, watermarkPublishers, options.windowOpts)
+ rl := readloop.NewReadLoop(ctx, udf, pbqManager, windowingStrategy, toBuffers, whereToDecider, watermarkPublishers)
return &DataForward{
fromBuffer: fromBuffer,
toBuffers: toBuffers,
diff --git a/pkg/window/options.go b/pkg/window/options.go
deleted file mode 100644
index f34fe77b48..0000000000
--- a/pkg/window/options.go
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Copyright 2022 The Numaproj Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package window
-
-import (
- dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
- "time"
-)
-
-type Options struct {
- // windowType to specify the window type(fixed, sliding or session)
- windowType dfv1.WindowType
- // windowDuration to specify the duration of the window
- windowDuration time.Duration
-}
-
-func DefaultOptions() *Options {
- return &Options{
- windowType: dfv1.DefaultWindowType,
- windowDuration: dfv1.DefaultWindowDuration,
- }
-}
-
-type Option func(options *Options) error
-
-// WithWindowType sets the window type
-func WithWindowType(wt dfv1.WindowType) Option {
- return func(o *Options) error {
- o.windowType = wt
- return nil
- }
-}
-
-// WithWindowDuration sets the window duration
-func WithWindowDuration(wd time.Duration) Option {
- return func(o *Options) error {
- o.windowDuration = wd
- return nil
- }
-}