From 5424b351d679bc894d1666aac1d8778443314ffc Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Tue, 9 Jan 2024 11:36:40 +0530 Subject: [PATCH 1/2] fix: memory leak inside session reducer (#95) Signed-off-by: Yashash H L --- pkg/sessionreducer/task_manager.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/sessionreducer/task_manager.go b/pkg/sessionreducer/task_manager.go index 2abd58c6..e40df8b6 100644 --- a/pkg/sessionreducer/task_manager.go +++ b/pkg/sessionreducer/task_manager.go @@ -89,8 +89,7 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1 } // add the task to the tasks list - key := task.uniqueKey() - rtm.tasks[key] = task + rtm.tasks[task.uniqueKey()] = task rtm.rw.Unlock() @@ -120,7 +119,7 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1 close(task.doneCh) // delete the task from the tasks list rtm.rw.Lock() - delete(rtm.tasks, key) + delete(rtm.tasks, task.uniqueKey()) rtm.rw.Unlock() }() @@ -295,11 +294,11 @@ func (rtm *sessionReduceTaskManager) WaitAll() { close(rtm.responseCh) } -func generateKey(keyedWindows *v1.KeyedWindow) string { +func generateKey(keyedWindow *v1.KeyedWindow) string { return fmt.Sprintf("%d:%d:%s", - keyedWindows.GetStart().AsTime().UnixMilli(), - keyedWindows.GetEnd().AsTime().UnixMilli(), - strings.Join(keyedWindows.GetKeys(), delimiter)) + keyedWindow.GetStart().AsTime().UnixMilli(), + keyedWindow.GetEnd().AsTime().UnixMilli(), + strings.Join(keyedWindow.GetKeys(), delimiter)) } func buildDatum(payload *v1.SessionReduceRequest_Payload) Datum { From 08210e956ad537379a4337ea1bb36bda017b552d Mon Sep 17 00:00:00 2001 From: Abdullah Hadi Date: Mon, 29 Jan 2024 11:43:28 -0500 Subject: [PATCH 2/2] chore: write server info for sideinput sdk (#97) Signed-off-by: a3hadi --- pkg/shared/util.go | 2 -- pkg/sideinput/options.go | 19 +++++++++++++++---- pkg/sideinput/server.go | 3 +-- pkg/sideinput/server_test.go | 7 ++++++- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/pkg/shared/util.go b/pkg/shared/util.go index 1d8ce6a9..f7747971 100644 --- a/pkg/shared/util.go +++ b/pkg/shared/util.go @@ -18,8 +18,6 @@ const ( func PrepareServer(sockAddr string, infoFilePath string) (net.Listener, error) { // If infoFilePath is not empty, write the server info to the file. - // For Side input we don't write data to the info server, hence will pass path as empty here. - // Could be used later on for similar cases if infoFilePath != "" { serverInfo := &info.ServerInfo{Protocol: info.UDS, Language: info.Go, Version: info.GetSDKVersion()} if err := info.Write(serverInfo, info.WithServerInfoFilePath(infoFilePath)); err != nil { diff --git a/pkg/sideinput/options.go b/pkg/sideinput/options.go index 761ef737..5728d801 100644 --- a/pkg/sideinput/options.go +++ b/pkg/sideinput/options.go @@ -1,9 +1,12 @@ package sideinput +import "github.com/numaproj/numaflow-go/pkg/info" + // options is the struct to hold the server options. type options struct { - sockAddr string - maxMessageSize int + sockAddr string + maxMessageSize int + serverInfoFilePath string } // Option is the interface to apply options. @@ -12,8 +15,9 @@ type Option func(*options) // defaultOptions returns the default options. func defaultOptions() *options { return &options{ - sockAddr: address, - maxMessageSize: defaultMaxMessageSize, + sockAddr: address, + maxMessageSize: defaultMaxMessageSize, + serverInfoFilePath: info.ServerInfoFilePath, } } @@ -30,3 +34,10 @@ func WithSockAddr(addr string) Option { opts.sockAddr = addr } } + +// WithServerInfoFilePath sets the server info file path to the given path. +func WithServerInfoFilePath(f string) Option { + return func(opts *options) { + opts.serverInfoFilePath = f + } +} diff --git a/pkg/sideinput/server.go b/pkg/sideinput/server.go index 37cf7d02..f1ad76c1 100644 --- a/pkg/sideinput/server.go +++ b/pkg/sideinput/server.go @@ -37,8 +37,7 @@ func (s *server) Start(ctx context.Context) error { defer stop() // start listening on unix domain socket - // For Side input we don't write data to the info server, hence will pass path as empty here. - lis, err := shared.PrepareServer(s.opts.sockAddr, "") + lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/sideinput/server_test.go b/pkg/sideinput/server_test.go index f220e804..aecad514 100644 --- a/pkg/sideinput/server_test.go +++ b/pkg/sideinput/server_test.go @@ -16,12 +16,17 @@ func TestServer_Start(t *testing.T) { _ = os.RemoveAll(socketFile.Name()) }() + serverInfoFile, _ := os.CreateTemp("/tmp", "numaflow-test-info") + defer func() { + _ = os.RemoveAll(serverInfoFile.Name()) + }() + var retrieveHandler = RetrieveFunc(func(ctx context.Context) Message { return BroadcastMessage([]byte("test")) }) // note: using actual uds connection ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) defer cancel() - err := NewSideInputServer(retrieveHandler, WithSockAddr(socketFile.Name())).Start(ctx) + err := NewSideInputServer(retrieveHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) assert.NoError(t, err) }