Skip to content

Commit

Permalink
chore: clean up unused code (numaproj#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored Nov 19, 2022
1 parent 7e041d8 commit c3739cc
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 140 deletions.
15 changes: 0 additions & 15 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3448,15 +3448,6 @@ state.
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.StoreType">
StoreType (<code>string</code> alias)
</p>
</h3>
<p>
<p>
StoreType is the PBQ store’s backend type.
</p>
</p>
<h3 id="numaflow.numaproj.io/v1alpha1.TLS">
TLS
</h3>
Expand Down Expand Up @@ -4134,12 +4125,6 @@ Description
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.WindowType">
WindowType (<code>string</code> alias)
</p>
</h3>
<p>
</p>
<hr/>
<p>
<em> Generated with <code>gen-crd-api-reference-docs</code>. </em>
Expand Down
55 changes: 3 additions & 52 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,64 +130,15 @@ const (
DefaultPBQReadTimeout = 1 * time.Second // Default read timeout for pbq
DefaultPBQReadBatchSize = 100 // Default read batch size for pbq

// PVC mount path for PBQ
PathPBQMount = "/var/numaflow/pbq"

// Default persistent store options
DefaultStoreSyncDuration = 2 * time.Second // Default sync duration for pbq
DefaultStoreType = NoOpType // Default store type
DefaultStoreSize = 1000000 // Default persistent store size
DefaultStoreMaxBufferSize = 100000 // Default buffer size for pbq in bytes
DefaultStorePath = PathPBQMount + "/wals" // Default store path

// Default window options
DefaultWindowType = FixedType
DefaultWindowDuration = 0

// PVC mount path for PBQ
PathPBQMount = "/var/numaflow/pbq"
)

// StoreType is the PBQ store's backend type.
type StoreType string

const (
InMemoryType StoreType = "in-memory"
FileSystemType StoreType = "file-system"
NoOpType StoreType = "no-op"
)

func (st StoreType) String() string {
switch st {
case InMemoryType:
return string(InMemoryType)
case FileSystemType:
return string(FileSystemType)
case NoOpType:
return string(NoOpType)
default:
return "unknownStoreType"
}
}

type WindowType string

const (
FixedType WindowType = "fixed"
SlidingType WindowType = "sliding"
SessionType WindowType = "session"
)

func (wt WindowType) String() string {
switch wt {
case FixedType:
return string(FixedType)
case SlidingType:
return string(SlidingType)
case SessionType:
return string(SessionType)
default:
return "unknownWindowType"
}
}

var (
MessageKeyDrop = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
MessageKeyAll = fmt.Sprintf("%U__ALL__", '\\') // U+005C__ALL__
Expand Down
17 changes: 0 additions & 17 deletions pkg/reduce/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@ package reduce

import (
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/window"
)

// Options for forwarding the message
type Options struct {
// readBatchSize is the default batch size
readBatchSize int64
// windowOpts Options for window
windowOpts *window.Options
}

type Option func(*Options) error

func DefaultOptions() *Options {
return &Options{
readBatchSize: dfv1.DefaultReadBatchSize,
windowOpts: window.DefaultOptions(),
}
}

Expand All @@ -45,16 +41,3 @@ func WithReadBatchSize(f int64) Option {
return nil
}
}

// WithWindowOptions sets different window options
func WithWindowOptions(opts ...window.Option) Option {
return func(options *Options) error {
for _, opt := range opts {
err := opt(options.windowOpts)
if err != nil {
return err
}
}
return nil
}
}
2 changes: 1 addition & 1 deletion pkg/reduce/readloop/readloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewReadLoop(ctx context.Context,
toBuffers map[string]isb.BufferWriter,
whereToDecider forward.ToWhichStepDecider,
pw map[string]publish.Publisher,
_ *window.Options) *ReadLoop {
) *ReadLoop {

op := newOrderedForwarder(ctx)

Expand Down
2 changes: 1 addition & 1 deletion pkg/reduce/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewDataForward(ctx context.Context,
}
}

rl := readloop.NewReadLoop(ctx, udf, pbqManager, windowingStrategy, toBuffers, whereToDecider, watermarkPublishers, options.windowOpts)
rl := readloop.NewReadLoop(ctx, udf, pbqManager, windowingStrategy, toBuffers, whereToDecider, watermarkPublishers)
return &DataForward{
fromBuffer: fromBuffer,
toBuffers: toBuffers,
Expand Down
54 changes: 0 additions & 54 deletions pkg/window/options.go

This file was deleted.

0 comments on commit c3739cc

Please sign in to comment.