Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/root_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func Prepare() *cobra.Command {
// root cmd
rootCmd.PersistentFlags().StringP("config", "c", "", ".env or .yaml config file to use with pgstream if any")
rootCmd.PersistentFlags().String("log-level", "debug", "log level for the application. One of trace, debug, info, warn, error, fatal, panic")
rootCmd.PersistentFlags().Bool("periodic-goroutine-dump", false, "Enable periodic goroutine stacktrace dumps every 15 minutes")

// init cmd
initCmd.Flags().String("postgres-url", "", "Source postgres URL where pgstream setup will be run")
Expand Down Expand Up @@ -152,6 +153,7 @@ func withProfiling(fn func(cmd *cobra.Command, args []string) error) func(cmd *c
func rootFlagBinding(cmd *cobra.Command) {
viper.BindPFlag("config", cmd.PersistentFlags().Lookup("config"))
viper.BindPFlag("PGSTREAM_LOG_LEVEL", cmd.PersistentFlags().Lookup("log-level"))
viper.BindPFlag("PGSTREAM_PERIODIC_GOROUTINE_DUMP", cmd.PersistentFlags().Lookup("periodic-goroutine-dump"))
}

func version() string {
Expand Down
12 changes: 11 additions & 1 deletion cmd/run_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/xataio/pgstream/cmd/config"
"github.com/xataio/pgstream/internal/log/zerolog"
"github.com/xataio/pgstream/internal/profiling"
"github.com/xataio/pgstream/pkg/stream"
)

Expand Down Expand Up @@ -45,6 +47,14 @@ func run(ctx context.Context) error {
})
zerolog.SetGlobalLogger(logger)

stdLogger := zerolog.NewStdLogger(logger)

// Start periodic goroutine dump if enabled
if viper.GetBool("PGSTREAM_PERIODIC_GOROUTINE_DUMP") {
profiling.StartPeriodicGoroutineDump(ctx, 15*time.Minute, stdLogger)
stdLogger.Info("periodic goroutine dump enabled (every 15 minutes)")
}

streamConfig, err := config.ParseStreamConfig()
if err != nil {
return fmt.Errorf("parsing stream config: %w", err)
Expand All @@ -56,7 +66,7 @@ func run(ctx context.Context) error {
}
defer provider.Close()

return stream.Run(ctx, zerolog.NewStdLogger(logger), streamConfig, initFlag, provider.NewInstrumentation("run"))
return stream.Run(ctx, stdLogger, streamConfig, initFlag, provider.NewInstrumentation("run"))
}

func runFlagBinding(cmd *cobra.Command, args []string) error {
Expand Down
12 changes: 11 additions & 1 deletion cmd/snapshot_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ package cmd
import (
"context"
"fmt"
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/xataio/pgstream/cmd/config"
"github.com/xataio/pgstream/internal/log/zerolog"
"github.com/xataio/pgstream/internal/profiling"
"github.com/xataio/pgstream/pkg/stream"
)

Expand All @@ -30,6 +32,14 @@ func snapshot(ctx context.Context) error {
})
zerolog.SetGlobalLogger(logger)

stdLogger := zerolog.NewStdLogger(logger)

// Start periodic goroutine dump if enabled
if viper.GetBool("PGSTREAM_PERIODIC_GOROUTINE_DUMP") {
profiling.StartPeriodicGoroutineDump(ctx, 15*time.Minute, stdLogger)
stdLogger.Info("periodic goroutine dump enabled (every 15 minutes)")
}

streamConfig, err := config.ParseStreamConfig()
if err != nil {
return fmt.Errorf("parsing stream config: %w", err)
Expand All @@ -41,7 +51,7 @@ func snapshot(ctx context.Context) error {
}
defer provider.Close()

return stream.Snapshot(ctx, zerolog.NewStdLogger(logger), streamConfig, provider.NewInstrumentation("snapshot"))
return stream.Snapshot(ctx, stdLogger, streamConfig, provider.NewInstrumentation("snapshot"))
}

func snapshotFlagBinding(cmd *cobra.Command, args []string) error {
Expand Down
45 changes: 45 additions & 0 deletions internal/profiling/profiling.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
package profiling

import (
"bytes"
"context"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
"runtime/pprof"
"time"

loglib "github.com/xataio/pgstream/pkg/log"
)

// StartProfilingServer starts an http server exposing /debug/pprof endpoint
Expand Down Expand Up @@ -60,3 +65,43 @@ func CreateMemoryProfile(fileName string) error {

return nil
}

// StartPeriodicGoroutineDump starts a goroutine that periodically dumps
// the full stacktrace of all goroutines to the logger. It respects context
// cancellation for clean shutdown.
func StartPeriodicGoroutineDump(ctx context.Context, interval time.Duration, logger loglib.Logger) {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
logger.Info("stopping periodic goroutine dump")
return
case <-ticker.C:
dumpGoroutineStacktrace(logger)
}
}
}()
}

func dumpGoroutineStacktrace(logger loglib.Logger) {
var buf bytes.Buffer
profile := pprof.Lookup("goroutine")
if profile == nil {
logger.Error(nil, "failed to lookup goroutine profile")
return
}

// Write the full stacktrace (debug=2 gives full stack traces)
if err := profile.WriteTo(&buf, 2); err != nil {
logger.Error(err, "failed to write goroutine stacktrace")
return
}

logger.Info("periodic goroutine stacktrace dump", loglib.Fields{
"goroutine_count": runtime.NumGoroutine(),
"stacktrace": buf.String(),
})
}
37 changes: 34 additions & 3 deletions pkg/wal/processor/postgres/postgres_query_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,41 @@ type query struct {
isDDL bool
}

// size returns the size of the message sql query (does not include the
// parameters)
// size returns the approximate size of the message including the SQL query and
// the parameters
func (m *query) Size() int {
return len(m.sql)
size := len(m.sql) + len(m.schema) + len(m.table)

// Add size of column names
for _, col := range m.columnNames {
size += len(col) + 16 // string data + string header overhead
}

// Add approximate size of args to prevent memory accounting bugs.
// Each arg includes both the value size and Go's interface/pointer overhead.
for _, arg := range m.args {
switch v := arg.(type) {
case string:
size += len(v) + 16 // string data + string header overhead
case []byte:
size += len(v) + 24 // byte slice data + slice header overhead
case nil:
size += 8 // nil interface overhead
case int, int8, int16, int32, int64:
size += 16 // int value + interface overhead
case uint, uint8, uint16, uint32, uint64:
size += 16 // uint value + interface overhead
case float32, float64:
size += 16 // float value + interface overhead
case bool:
size += 16 // bool value + interface overhead
default:
// Conservative estimate for other types (e.g. time.Time, custom types)
size += 64
}
}

return size
}

func (m *query) IsEmpty() bool {
Expand Down
Loading