Skip to content

Commit

Permalink
fixup: More cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
maru-ava committed Feb 24, 2025
1 parent c7df171 commit 5547800
Showing 1 changed file with 128 additions and 70 deletions.
198 changes: 128 additions & 70 deletions tests/fixture/tmpnet/monitor_processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"os/exec"
"path/filepath"
Expand All @@ -16,37 +18,66 @@ import (
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/perms"
)

// Used to generate configuration for a collector
type configGeneratorFunc func(workingDir string, serviceDiscoveryDir string, username string, password string) string

const (
collectorTickerInterval = 100 * time.Millisecond

// TODO(marun) Maybe use dynamic HTTP ports to avoid the possibility of them being already bound?

// Prometheus configuration
prometheusCmd = "prometheus"
prometheusURL = "https://prometheus-poc.avax-dev.network/api/v1/write"
prometheusScrapeInterval = 10 * time.Second
prometheusHTTPPort = 9090

prometheusCmd = "prometheus"
promtailCmd = "promtail"
// Promtail configuration
promtailCmd = "promtail"
lokiURL = "https://loki-poc.avax-dev.network/api/prom/push"
promtailHTTPPort = 3101

// Use a delay slightly longer than the scrape interval to ensure a final scrape before shutdown
NetworkShutdownDelay = prometheusScrapeInterval + 2*time.Second
)

var (
prometheusListenAddress = fmt.Sprintf("127.0.0.1:%d", prometheusHTTPPort)
prometheusReadinessURL = fmt.Sprintf("http://%s/-/ready", prometheusListenAddress)

promtailReadinessURL = fmt.Sprintf("http://127.0.0.1:%d/ready", promtailHTTPPort)
)

// StartCollectors ensures collectors are running to collect logs and metrics from local nodes.
func StartCollectors(ctx context.Context, log logging.Logger) error {
if _, ok := ctx.Deadline(); !ok {
return errors.New("unable to start collectors with a context without a deadline")
}
if err := startPrometheus(ctx, log); err != nil {
if err := startPromtail(ctx, log); err != nil {
return err
}
if err := startPromtail(ctx, log); err != nil {
if err := startPrometheus(ctx, log); err != nil {
return err
}

// Wait for readiness. These checks are performed separately from start to
// minimize time to readiness.
readinessURLs := map[string]string{
promtailCmd: promtailReadinessURL,
prometheusCmd: prometheusReadinessURL,
}
for cmdName, readinessURLs := range readinessURLs {
if err := waitForReadiness(ctx, log, cmdName, readinessURLs); err != nil {
return err
}
}

log.Info("To stop: tmpnetctl stop-collectors")

return nil
Expand All @@ -57,7 +88,7 @@ func StopCollectors(ctx context.Context, log logging.Logger) error {
if _, ok := ctx.Deadline(); !ok {
return errors.New("unable to start collectors with a context without a deadline")
}
for _, cmdName := range []string{prometheusCmd, promtailCmd} {
for _, cmdName := range []string{promtailCmd, prometheusCmd} {
// Determine if the process is running
workingDir, err := getWorkingDir(cmdName)
if err != nil {
Expand Down Expand Up @@ -87,12 +118,10 @@ func StopCollectors(ctx context.Context, log logging.Logger) error {
zap.String("cmdName", cmdName),
zap.Int("pid", proc.Pid),
)
ticker := time.NewTicker(collectorTickerInterval)
defer ticker.Stop()
for {
if err := pollUntilContextCancel(ctx, func(ctx context.Context) (bool, error) {

Check failure on line 121 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
p, err := getProcess(proc.Pid)
if err != nil {
return fmt.Errorf("failed to retrieve process: %w", err)
return false, fmt.Errorf("failed to retrieve process: %w", err)
}
if p == nil {
// Process is no longer running
Expand All @@ -105,15 +134,10 @@ func StopCollectors(ctx context.Context, log logging.Logger) error {
zap.Error(err),
)
}

break
}

select {
case <-ctx.Done():
return fmt.Errorf("failed to see %s stop before timeout: %w", cmdName, ctx.Err())
case <-ticker.C:
}
return p == nil, nil
}); err != nil {
return err
}
log.Info("collector stopped",
zap.String("cmdName", cmdName),
Expand All @@ -125,12 +149,17 @@ func StopCollectors(ctx context.Context, log logging.Logger) error {

// startPrometheus ensures an agent-mode prometheus process is running to collect metrics from local nodes.
func startPrometheus(ctx context.Context, log logging.Logger) error {
args := fmt.Sprintf(
"--config.file=prometheus.yaml --web.listen-address=%s --enable-feature=agent --storage.agent.path=./data",
prometheusListenAddress,
)
return startCollector(
ctx,
log,
prometheusCmd,
"--config.file=prometheus.yaml --storage.agent.path=./data --web.listen-address=localhost:0 --enable-feature=agent",
args,
"PROMETHEUS",
prometheusReadinessURL,
func(_ string, serviceDiscoveryDir string, username string, password string) string {
return fmt.Sprintf(`
global:
Expand All @@ -146,11 +175,11 @@ scrape_configs:
- '%s/*.json'
remote_write:
- url: "https://prometheus-poc.avax-dev.network/api/v1/write"
- url: "%s"
basic_auth:
username: "%s"
password: "%s"
`, prometheusScrapeInterval, serviceDiscoveryDir, username, password)
`, prometheusScrapeInterval, serviceDiscoveryDir, prometheusURL, username, password)
},
)
}
Expand All @@ -163,17 +192,18 @@ func startPromtail(ctx context.Context, log logging.Logger) error {
promtailCmd,
"-config.file=promtail.yaml",
"LOKI",
promtailReadinessURL,
func(workingDir string, serviceDiscoveryDir string, username string, password string) string {
return fmt.Sprintf(`
server:
http_listen_port: 0
http_listen_port: %d
grpc_listen_port: 0
positions:
filename: %s/positions.yaml
client:
url: "https://loki-poc.avax-dev.network/api/prom/push"
url: "%s"
basic_auth:
username: "%s"
password: "%s"
Expand All @@ -183,7 +213,7 @@ scrape_configs:
file_sd_configs:
- files:
- '%s/*.json'
`, workingDir, username, password, serviceDiscoveryDir)
`, promtailHTTPPort, workingDir, lokiURL, username, password, serviceDiscoveryDir)
},
)
}
Expand Down Expand Up @@ -215,6 +245,7 @@ func startCollector(
cmdName string,
args string,
baseEnvName string,
readinessURL string,
configGenerator configGeneratorFunc,
) error {
// Determine paths
Expand Down Expand Up @@ -256,7 +287,7 @@ func startCollector(
}

// Start the process
return startCollectorProcess(ctx, log, cmdName, args, workingDir, pidPath)
return startCollectorProcess(ctx, log, cmdName, args, workingDir, pidPath, readinessURL)
}

// processFromPIDFile attempts to retrieve a running process from the specified PID file.
Expand Down Expand Up @@ -358,8 +389,10 @@ func startCollectorProcess(
args string,
workingDir string,
pidPath string,
readinessURL string,

Check failure on line 392 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Lint

unused-parameter: parameter 'readinessURL' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 392 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Lint

`startCollectorProcess` - `readinessURL` is unused (unparam)
) error {
fullCmd := "nohup " + cmdName + " " + args + " > " + cmdName + ".log 2>&1 & echo -n \"$!\" > " + pidPath
logFilename := cmdName + ".log"
fullCmd := "nohup " + cmdName + " " + args + " > " + logFilename + " 2>&1 & echo -n \"$!\" > " + pidPath
log.Info("starting "+cmdName,
zap.String("workingDir", workingDir),
zap.String("fullCmd", fullCmd),
Expand All @@ -372,63 +405,88 @@ func startCollectorProcess(
return fmt.Errorf("failed to start %s: %w", cmdName, err)
}

// Wait for PID file to be written. It's not enough to check for the PID of cmd
// because the PID we want is a child of the process that cmd represents.
if pid, err := waitForPIDFile(ctx, cmdName, pidPath); err != nil {
// Wait for PID file
var pid int
if err := pollUntilContextCancel(ctx, func(ctx context.Context) (bool, error) {

Check failure on line 410 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
var err error
pid, err = getPID(cmdName, pidPath)
if err != nil {
log.Warn("failed to read PID file",
zap.String("cmd", cmdName),
zap.String("pidPath", pidPath),
zap.Error(err),
)
}
return pid != 0, nil
}); err != nil {
return err
} else {
log.Info(cmdName+" started",
zap.String("pid", pid),
)
}
log.Info(cmdName+" started",
zap.Int("pid", pid),
)

// TODO(marun) Perform a readiness check
// TODO(marun) Check that the log is not empty
// Wait for non-empty log file. An empty log file should only occur if the command
// invocation is not correctly redirecting stderr and stdout to the expected file.
logPath := filepath.Join(workingDir, logFilename)
if err := pollUntilContextCancel(ctx, func(ctx context.Context) (bool, error) {

Check failure on line 431 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
logData, err := os.ReadFile(logPath)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return false, fmt.Errorf("failed to read log file %s for %s: %w", err)

Check failure on line 434 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Unit (macos-14)

fmt.Errorf format %s reads arg #2, but call has 1 arg

Check failure on line 434 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Unit (ubuntu-20.04)

fmt.Errorf format %s reads arg #2, but call has 1 arg

Check failure on line 434 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Unit (ubuntu-22.04)

fmt.Errorf format %s reads arg #2, but call has 1 arg

Check failure on line 434 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Unit (ubuntu-24.04)

fmt.Errorf format %s reads arg #2, but call has 1 arg

Check failure on line 434 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Unit (windows-2022)

fmt.Errorf format %s reads arg #2, but call has 1 arg

Check failure on line 434 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Unit (custom-arm64-jammy)

fmt.Errorf format %s reads arg #2, but call has 1 arg

Check failure on line 434 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Lint

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)

Check failure on line 434 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Lint

printf: fmt.Errorf format %s reads arg #2, but call has 1 arg (govet)

Check failure on line 434 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Unit (custom-arm64-noble)

fmt.Errorf format %s reads arg #2, but call has 1 arg
}
return len(logData) != 0, nil
}); err != nil {
return fmt.Errorf("empty log file %s for %s indicates misconfiguration: %w", logPath, cmdName, err)
}

return nil
}

// waitForPIDFile waits for the PID file to be written as an indication of process start.
func waitForPIDFile(ctx context.Context, cmdName string, pidPath string) (string, error) {
var (
ticker = time.NewTicker(collectorTickerInterval)
pid string
)
defer ticker.Stop()
for {
if fileExistsAndNotEmpty(pidPath) {
var err error
pid, err = readFileContents(pidPath)
if err != nil {
return "", fmt.Errorf("failed to read pid file: %w", err)
}
break
}
select {
case <-ctx.Done():
return "", fmt.Errorf("failed to wait for %s to start before timeout: %w", cmdName, ctx.Err())
case <-ticker.C:
}
// checkReadiness retrieves the provided URL and indicates whether it returned 200
func checkReadiness(readinessURL string) (bool, string, error) {
client := &http.Client{
Timeout: 5 * time.Second,
}
return pid, nil
}

func fileExistsAndNotEmpty(filename string) bool {
fileInfo, err := os.Stat(filename)
resp, err := client.Get(readinessURL)
if err != nil {
if os.IsNotExist(err) {
return false
}
fmt.Printf("Error stating file: %v\n", err)
return false
return false, "", fmt.Errorf("request failed: %v", err)

Check failure on line 452 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Lint

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}
return fileInfo.Size() > 0
}
defer resp.Body.Close()

func readFileContents(filename string) (string, error) {
content, err := os.ReadFile(filename)
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
return false, "", fmt.Errorf("failed to read response: %v", err)

Check failure on line 458 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Lint

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}
return string(content), nil

return resp.StatusCode == http.StatusOK, string(body), nil
}

// waitForReadiness waits until the given readiness URL returns 200
func waitForReadiness(ctx context.Context, log logging.Logger, cmdName string, readinessURL string) error {
log.Info("waiting for "+cmdName+" readiness",
zap.String("url", readinessURL),
)
if err := pollUntilContextCancel(ctx, func(ctx context.Context) (bool, error) {

Check failure on line 469 in tests/fixture/tmpnet/monitor_processes.go

View workflow job for this annotation

GitHub Actions / Lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
ready, body, err := checkReadiness(readinessURL)
if err == nil {
return ready, nil
}
log.Warn("failed to check readiness",
zap.String("cmd", cmdName),
zap.String("url", readinessURL),
zap.String("body", body),
zap.Error(err),
)
return false, nil
}); err != nil {
return err
}
log.Info(cmdName+" ready",
zap.String("url", readinessURL),
)
return nil
}

func pollUntilContextCancel(ctx context.Context, condition wait.ConditionWithContextFunc) error {
return wait.PollUntilContextCancel(ctx, collectorTickerInterval, true /* immediate */, condition)
}

0 comments on commit 5547800

Please sign in to comment.