Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/sql_exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
Expand Down Expand Up @@ -118,6 +119,12 @@ func main() {
http.HandleFunc("/reload", reloadHandler(exporter, *configFile))
}

// Enable pprof endpoints when debug mode is enabled
if os.Getenv(cfg.EnvDebug) != "" {
slog.Info("Debug mode enabled, pprof endpoints available at /debug/pprof/")
http.Handle("/debug/", http.DefaultServeMux)
}

server := &http.Server{Addr: *listenAddress, ReadHeaderTimeout: httpReadHeaderTimeout}
if err := web.ListenAndServe(server, &web.FlagConfig{
WebListenAddresses: &([]string{*listenAddress}),
Expand Down
170 changes: 150 additions & 20 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"log/slog"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -158,7 +159,7 @@ func (q *Query) Collect(ctx context.Context, conn *sql.DB, ch chan<- Metric) {
}

// Apply lag calculations and other transformations
transformedRow := q.applyTransformations(row, mf.config)
transformedRow := q.applyTransformations(row, mf.config, ch)

mf.Collect(transformedRow, ch)
metricsGenerated++
Expand All @@ -183,8 +184,14 @@ func (q *Query) Collect(ctx context.Context, conn *sql.DB, ch chan<- Metric) {
func (q *Query) run(ctx context.Context, conn *sql.DB) (*sql.Rows, errors.WithContext) {
if slog.Default().Enabled(ctx, slog.LevelDebug) {
start := time.Now()
slog.Debug("Starting query execution", "logContext", q.logContext, "sql", q.config.Query)
defer func() {
slog.Debug("Query execution time", "logContext", q.logContext, "duration", time.Since(start))
duration := time.Since(start)
slog.Debug("Query execution time", "logContext", q.logContext, "duration", duration, "sql", q.config.Query)
// Log slow queries at WARN level for better visibility
if duration > time.Second*30 {
slog.Warn("Slow query detected", "logContext", q.logContext, "duration", duration, "sql", q.config.Query)
}
}()
}

Expand All @@ -200,7 +207,7 @@ func (q *Query) run(ctx context.Context, conn *sql.DB) (*sql.Rows, errors.WithCo
if q.stmt == nil {
stmt, err := conn.PrepareContext(ctx, q.config.Query)
if err != nil {
return nil, errors.Wrapf(q.logContext, err, "prepare query failed")
return nil, errors.Wrapf(q.logContext, err, "prepare query failed: %s", q.config.Query)
}
q.conn = conn
q.stmt = stmt
Expand Down Expand Up @@ -235,7 +242,17 @@ func (q *Query) scanDest(rows *sql.Rows) ([]any, errors.WithContext) {
if column == "" {
slog.Debug("Unnamed column", "logContext", q.logContext, "column", i)
} else {
slog.Debug("Extra column returned by query", "logContext", q.logContext, "column", column)
// Don't log extra columns when column_filters are configured - this is expected
hasColumnFilters := false
for _, mf := range q.metricFamilies {
if len(mf.config.ColumnFilters) > 0 {
hasColumnFilters = true
break
}
}
if !hasColumnFilters {
slog.Debug("Extra column returned by query", "logContext", q.logContext, "column", column)
}
}
dest = append(dest, new(any))
}
Expand Down Expand Up @@ -268,27 +285,24 @@ func (q *Query) scanRow(rows *sql.Rows, dest []any) (map[string]any, errors.With
return nil, errors.Wrapf(q.logContext, err, "scanning of query result failed")
}

// Pick all values we're interested in into a map.
// Pick all values we're interested in
result := make(map[string]any, len(q.columnTypes))
for i, column := range columns {
switch q.columnTypes[column] {
case columnTypeKey:
if !dest[i].(*sql.NullString).Valid {
slog.Debug("Key column is NULL", "logContext", q.logContext, "column", column)
}
result[column] = *dest[i].(*sql.NullString)
case columnTypeTime:
if !dest[i].(*sql.NullTime).Valid {
slog.Debug("Time column is NULL", "logContext", q.logContext, "column", column)
}
result[column] = *dest[i].(*sql.NullTime)
case columnTypeValue:
if !dest[i].(*sql.NullFloat64).Valid {
slog.Debug("Value column is NULL", "logContext", q.logContext, "column", column)
}
result[column] = *dest[i].(*sql.NullFloat64)
default:
// Skip unused extra columns - they're not needed for metrics or calculations
// Note: All columns used in lag_calculations, row_filters, key_labels, values, and timestamp_value
// are already handled by the cases above
continue
}
}

return result, nil
}

Expand Down Expand Up @@ -358,10 +372,83 @@ func (q *Query) applyRowFilter(row map[string]any, filter config.RowFilter) bool
}
}

// columnIsNeeded determines if a column is actually needed for the metric configuration
func (q *Query) columnIsNeeded(column string, metric *config.MetricConfig) bool {
// Always need column_name for SHOW STATS context
if column == "column_name" {
return false // Don't check this for NULLs, just use for context
}

// Check if column is in key_labels
for _, keyLabel := range metric.KeyLabels {
if keyLabel == column {
return true
}
}

// Check if column is in values
for _, valueCol := range metric.Values {
if valueCol == column {
return true
}
}

// Check if column is a source column in lag_calculations
for _, lagCalc := range metric.LagCalculations {
if lagCalc.SourceColumn == column {
return true
}
}

// Check if column is used in row_filters
for _, filter := range metric.RowFilters {
if filter.Column == column {
return true
}
}

// Check timestamp_value
if metric.TimestampValue == column {
return true
}

return false
}

// applyTransformations applies configured transformations like lag calculations to a row
func (q *Query) applyTransformations(row map[string]any, metric *config.MetricConfig) map[string]any {
func (q *Query) applyTransformations(row map[string]any, metric *config.MetricConfig, ch chan<- Metric) map[string]any {
result := make(map[string]any)

// Check for NULL values in columns we actually care about (after row filtering)
var nullColumns []string
var tableColumn string
for column, value := range row {
// Get the table column name for context (from column_name field in SHOW STATS)
if column == "column_name" {
if nullStr, ok := value.(sql.NullString); ok && nullStr.String != "" {
tableColumn = nullStr.String
}
}

// Check for NULL values in key columns we'll actually use
if needsColumn := q.columnIsNeeded(column, metric); needsColumn {
if nullStr, ok := value.(sql.NullString); ok {
// For SHOW STATS queries, some drivers incorrectly set Valid=false even when String has data
// Check both Valid flag and actual string content
if !nullStr.Valid && nullStr.String == "" {
nullColumns = append(nullColumns, column)
}
}
}
}

// Log NULL columns only for rows that passed filtering and columns we need
if len(nullColumns) > 0 {
slog.Warn("Key columns are NULL", "logContext", q.logContext, "columns", nullColumns, "table_column", tableColumn, "sql", q.config.Query)
// Also register this as a scrape error
ch <- NewInvalidMetric(errors.Errorf(q.logContext, "key columns are NULL: %v (table_column: %s)", nullColumns, tableColumn))
}

// Copy original row data
for k, v := range row {
result[k] = v
Expand Down Expand Up @@ -391,32 +478,38 @@ func (q *Query) applyTransformations(row map[string]any, metric *config.MetricCo
}

// calculateLag calculates the lag in seconds between a timestamp and current time
// SHOW STATS always returns timestamps as strings, so we primarily handle sql.NullString
func (q *Query) calculateLag(timestampValue any, format string) float64 {
if timestampValue == nil {
return 0
}

var timestampStr string

// Handle different timestamp value types from the updated codebase
switch v := timestampValue.(type) {
case sql.NullString:
if !v.Valid {
return 0
}
timestampStr = v.String
case string:
timestampStr = v
case sql.NullTime:
if !v.Valid {
return 0
}
// Calculate lag directly from time.Time
return time.Since(v.Time).Seconds()
case string:
timestampStr = v
default:
timestampStr = fmt.Sprintf("%v", timestampValue)
}

if format == "unix" {
return q.parseUnixTimestamp(timestampStr)
}
return q.parseStandardTimestamp(timestampStr, format)
}

// parseStandardTimestamp parses non-Unix timestamp formats
func (q *Query) parseStandardTimestamp(timestampStr, format string) float64 {
if timestampStr == "" {
return 0
}
Expand All @@ -437,3 +530,40 @@ func (q *Query) calculateLag(timestampValue any, format string) float64 {
lag := time.Since(parsedTime).Seconds()
return lag
}

// parseUnixTimestamp parses Unix timestamps and auto-detects the format based on string length.
// Supports seconds, milliseconds, microseconds, and nanoseconds.
// Returns 0 for unsupported formats
func (q *Query) parseUnixTimestamp(timestampStr string) float64 {
if timestampStr == "" {
return 0
}

timestamp, err := strconv.ParseInt(timestampStr, 10, 64)
if err != nil {
slog.Warn("Failed to parse Unix timestamp string", "timestamp", timestampStr, "error", err)
return 0
}

var parsedTime time.Time

switch len(timestampStr) {
case 10:
parsedTime = time.Unix(timestamp, 0)
case 13:
parsedTime = time.UnixMilli(timestamp)
case 16:
parsedTime = time.UnixMicro(timestamp)
case 19:
parsedTime = time.Unix(timestamp/1e9, timestamp%1e9)
default:
slog.Warn("Unsupported Unix timestamp format",
"timestamp", timestampStr,
"length", len(timestampStr),
"supported_formats", "10 digits (seconds), 13 digits (milliseconds), 16 digits (microseconds), 19 digits (nanoseconds)")
return 0
}

lag := time.Since(parsedTime).Seconds()
return lag
}
Loading