Skip to content

Commit d17d3dd

Browse files
craig[bot]spilchenyuzefovich
committed
155766: sql/inspect: clarify error message for internal failures r=spilchen a=spilchen Previously, when `INSPECT` encountered internal errors, it always reported "INSPECT found inconsistencies," even if the issue was caused by something other than actual data corruption. This made it hard to tell whether the problem was in user data or within INSPECT itself (for example, a bad query generated internally). This commit refines that behavior. If all observed errors are internal, `INSPECT` now reports: "INSPECT encountered internal errors." This makes it clear that the problem might stem from an internal failure in INSPECT or from data corruption detected during internal queries, rather than always implying user data inconsistencies. Additionally, a hint is now included in the error message, regardless of error type, guiding users to run SHOW INSPECT ERRORS to retrieve more information. Informs: #155676 Epic: CRDB-55075 Release note: none 155824: sql: fix top-level query stats when "inner" plans are present r=yuzefovich a=yuzefovich Previously, whenever we ran an "inner" plan (via `runPlanInsidePlan` helper), we ignored the top-level query stats for the "inner" plan. As a result, reads and writes performed by the inner plan weren't reflected in the "outer" top-level query stats. This is now fixed by adjusting the routines, apply joins, and recursive CTEs to propagate their metrics as ProducerMetadata objects. Note that for routines the access to the DistSQL infra is rather difficult, so we plumbed the access via the planner straight into the DistSQLReceiver, and even though it's ugly, it should work in practice. The only alternative I see is adding the necessary reference into the `eval.Context`, but then it gets tricky to actually set that and ensure the right copy of the eval context is observed by all routines (plus we'd need to make the copy or restore the original state somehow), so I chose to not pursue it. Epic: None Release note (bug fix): Previously, CockroachDB didn't include reads and writes performed by routines (user-defined functions and stored procedures) as well as apply joins into `bytes read`, `rows read`, and `rows written` statement execution statistics, and this is now fixed. The bug has been present since before 23.2 version. Co-authored-by: Matt Spilchen <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]>
3 parents b59054b + 84ea39c + 877884b commit d17d3dd

17 files changed

+327
-47
lines changed

pkg/sql/apply_join.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type applyJoinNode struct {
3535
// The data source with no outer columns.
3636
singleInputPlanNode
3737

38+
// forwarder allows propagating the ProducerMetadata towards the
39+
// DistSQLReceiver.
40+
forwarder metadataForwarder
41+
3842
// pred represents the join predicate.
3943
pred *joinPredicate
4044

@@ -248,12 +252,13 @@ func (a *applyJoinNode) runNextRightSideIteration(params runParams, leftRow tree
248252
}
249253
plan := p.(*planComponents)
250254
rowResultWriter := NewRowResultWriter(&a.run.rightRows)
251-
if err := runPlanInsidePlan(
252-
ctx, params, plan, rowResultWriter,
253-
nil /* deferredRoutineSender */, "", /* stmtForDistSQLDiagram */
254-
); err != nil {
255+
queryStats, err := runPlanInsidePlan(
256+
ctx, params, plan, rowResultWriter, nil /* deferredRoutineSender */, "", /* stmtForDistSQLDiagram */
257+
)
258+
if err != nil {
255259
return err
256260
}
261+
forwardInnerQueryStats(a.forwarder, queryStats)
257262
a.run.rightRowsIterator = newRowContainerIterator(ctx, a.run.rightRows)
258263
return nil
259264
}
@@ -267,7 +272,7 @@ func runPlanInsidePlan(
267272
resultWriter rowResultWriter,
268273
deferredRoutineSender eval.DeferredRoutineSender,
269274
stmtForDistSQLDiagram string,
270-
) error {
275+
) (topLevelQueryStats, error) {
271276
defer plan.close(ctx)
272277
execCfg := params.ExecCfg()
273278
recv := MakeDistSQLReceiver(
@@ -286,6 +291,11 @@ func runPlanInsidePlan(
286291
// before we can produce any "outer" rows to be returned to the client, so
287292
// we make sure to unset pausablePortal field on the planner.
288293
plannerCopy.pausablePortal = nil
294+
// Avoid any possible metadata confusion by unsetting the
295+
// routineMetadataForwarder (if there is a routine in the inner plan that
296+
// needs it, then the plannerCopy will be updated during the inner plan
297+
// setup).
298+
plannerCopy.routineMetadataForwarder = nil
289299

290300
// planner object embeds the extended eval context, so we will modify that
291301
// (which won't affect the outer planner's extended eval context), and we'll
@@ -301,6 +311,8 @@ func runPlanInsidePlan(
301311
// return from this method (after the main query is executed).
302312
subqueryResultMemAcc := params.p.Mon().MakeBoundAccount()
303313
defer subqueryResultMemAcc.Close(ctx)
314+
// Note that planAndRunSubquery updates recv.stats with top-level
315+
// subquery stats.
304316
if !execCfg.DistSQLPlanner.PlanAndRunSubqueries(
305317
ctx,
306318
&plannerCopy,
@@ -311,7 +323,7 @@ func runPlanInsidePlan(
311323
false, /* skipDistSQLDiagramGeneration */
312324
params.p.mustUseLeafTxn(),
313325
) {
314-
return resultWriter.Err()
326+
return recv.stats, resultWriter.Err()
315327
}
316328
}
317329

@@ -338,10 +350,10 @@ func runPlanInsidePlan(
338350

339351
// Check if there was an error interacting with the resultWriter.
340352
if recv.commErr != nil {
341-
return recv.commErr
353+
return recv.stats, recv.commErr
342354
}
343355
if resultWriter.Err() != nil {
344-
return resultWriter.Err()
356+
return recv.stats, resultWriter.Err()
345357
}
346358

347359
plannerCopy.autoCommit = false
@@ -358,10 +370,10 @@ func runPlanInsidePlan(
358370
// need to update the plan for cleanup purposes before proceeding.
359371
*plan = plannerCopy.curPlan.planComponents
360372
if recv.commErr != nil {
361-
return recv.commErr
373+
return recv.stats, recv.commErr
362374
}
363375

364-
return resultWriter.Err()
376+
return recv.stats, resultWriter.Err()
365377
}
366378

367379
func (a *applyJoinNode) Values() tree.Datums {

pkg/sql/conn_executor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3964,6 +3964,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {
39643964
p.noticeSender = nil
39653965
p.preparedStatements = ex.getPrepStmtsAccessor()
39663966
p.sqlCursors = ex.getCursorAccessor()
3967+
p.routineMetadataForwarder = nil
39673968
p.storedProcTxnState = ex.getStoredProcTxnStateAccessor()
39683969
p.createdSequences = ex.getCreatedSequencesAccessor()
39693970

pkg/sql/conn_executor_exec.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3288,6 +3288,8 @@ type topLevelQueryStats struct {
32883288
// client receiving the PGWire protocol messages (as well as construcing
32893289
// those messages).
32903290
clientTime time.Duration
3291+
// NB: when adding another field here, consider whether
3292+
// forwardInnerQueryStats method needs an adjustment.
32913293
}
32923294

32933295
func (s *topLevelQueryStats) add(other *topLevelQueryStats) {

pkg/sql/distsql_physical_planner.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4343,14 +4343,17 @@ func (dsp *DistSQLPlanner) wrapPlan(
43434343

43444344
// Copy the evalCtx.
43454345
evalCtx := *planCtx.ExtendedEvalCtx
4346-
wrapper := newPlanNodeToRowSource(
4346+
wrapper, err := newPlanNodeToRowSource(
43474347
n,
43484348
runParams{
43494349
extendedEvalCtx: &evalCtx,
43504350
p: planCtx.planner,
43514351
},
43524352
firstNotWrapped,
43534353
)
4354+
if err != nil {
4355+
return nil, err
4356+
}
43544357
if !wrapper.rowsAffected && planCtx.planDepth == 1 && planCtx.stmtType == tree.RowsAffected {
43554358
// Return an error if the receiver expects to get the number of rows
43564359
// affected, but the planNode returns something else.

pkg/sql/distsql_running.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,15 @@ func (dsp *DistSQLPlanner) Run(
10141014
return
10151015
}
10161016

1017+
if len(flows) == 1 && evalCtx.Txn != nil && evalCtx.Txn.Type() == kv.RootTxn {
1018+
// If we have a fully local plan and a RootTxn, we don't expect any
1019+
// concurrency, so it's safe to use the DistSQLReceiver to push the
1020+
// metadata into directly from routines.
1021+
if planCtx.planner != nil {
1022+
planCtx.planner.routineMetadataForwarder = recv
1023+
}
1024+
}
1025+
10171026
if finishedSetupFn != nil {
10181027
finishedSetupFn(flow)
10191028
}
@@ -1144,6 +1153,8 @@ type DistSQLReceiver struct {
11441153
}
11451154
}
11461155

1156+
var _ metadataForwarder = &DistSQLReceiver{}
1157+
11471158
// rowResultWriter is a subset of CommandResult to be used with the
11481159
// DistSQLReceiver. It's implemented by RowResultWriter.
11491160
type rowResultWriter interface {
@@ -1570,6 +1581,35 @@ func (r *DistSQLReceiver) checkConcurrentError() {
15701581
}
15711582
}
15721583

1584+
type metadataForwarder interface {
1585+
forwardMetadata(metadata *execinfrapb.ProducerMetadata)
1586+
}
1587+
1588+
// forwardInnerQueryStats propagates the query stats of "inner" plans as
1589+
// metadata via the forwarder.
1590+
func forwardInnerQueryStats(f metadataForwarder, stats topLevelQueryStats) {
1591+
if !buildutil.CrdbTestBuild && f == nil {
1592+
// Safety measure in production builds in case the forwarder is nil for
1593+
// some reason.
1594+
return
1595+
}
1596+
meta := execinfrapb.GetProducerMeta()
1597+
meta.Metrics = execinfrapb.GetMetricsMeta()
1598+
meta.Metrics.BytesRead = stats.bytesRead
1599+
meta.Metrics.RowsRead = stats.rowsRead
1600+
meta.Metrics.RowsWritten = stats.rowsWritten
1601+
// stats.networkEgressEstimate and stats.clientTime are ignored since they
1602+
// only matter at the "true" top-level query (and actually should be zero
1603+
// here anyway).
1604+
f.forwardMetadata(meta)
1605+
}
1606+
1607+
func (r *DistSQLReceiver) forwardMetadata(metadata *execinfrapb.ProducerMetadata) {
1608+
// Note that we don't use pushMeta method directly in order to go through
1609+
// the testing callback path.
1610+
r.Push(nil /* row */, metadata)
1611+
}
1612+
15731613
// pushMeta takes in non-empty metadata object and pushes it to the result
15741614
// writer. Possibly updated status is returned.
15751615
func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra.ConsumerStatus {

pkg/sql/distsql_running_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,3 +1214,127 @@ SELECT id, details FROM jobs AS j INNER JOIN cte1 ON id = job_id WHERE id = 1;
12141214
require.Equal(t, 1, id)
12151215
require.Equal(t, 1, details)
12161216
}
1217+
1218+
// TestTopLevelQueryStats verifies that top-level query stats are collected
1219+
// correctly, including when the query executes "plans inside plans".
1220+
func TestTopLevelQueryStats(t *testing.T) {
1221+
defer leaktest.AfterTest(t)()
1222+
defer log.Scope(t).Close(t)
1223+
1224+
// testQuery will be updated throughout the test to the current target.
1225+
var testQuery atomic.Value
1226+
// The callback will send number of rows read and rows written (for each
1227+
// ProducerMetadata.Metrics object) on these channels, respectively.
1228+
rowsReadCh, rowsWrittenCh := make(chan int64), make(chan int64)
1229+
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
1230+
Knobs: base.TestingKnobs{
1231+
SQLExecutor: &ExecutorTestingKnobs{
1232+
DistSQLReceiverPushCallbackFactory: func(_ context.Context, query string) func(rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) (rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
1233+
if target := testQuery.Load(); target == nil || target.(string) != query {
1234+
return nil
1235+
}
1236+
return func(row rowenc.EncDatumRow, batch coldata.Batch, meta *execinfrapb.ProducerMetadata) (rowenc.EncDatumRow, coldata.Batch, *execinfrapb.ProducerMetadata) {
1237+
if meta != nil && meta.Metrics != nil {
1238+
rowsReadCh <- meta.Metrics.RowsRead
1239+
rowsWrittenCh <- meta.Metrics.RowsWritten
1240+
}
1241+
return row, batch, meta
1242+
}
1243+
},
1244+
},
1245+
},
1246+
})
1247+
defer s.Stopper().Stop(context.Background())
1248+
1249+
if _, err := sqlDB.Exec(`
1250+
CREATE TABLE t (k INT PRIMARY KEY);
1251+
INSERT INTO t SELECT generate_series(1, 10);
1252+
CREATE FUNCTION no_reads() RETURNS INT AS 'SELECT 1' LANGUAGE SQL;
1253+
CREATE FUNCTION reads() RETURNS INT AS 'SELECT count(*) FROM t' LANGUAGE SQL;
1254+
CREATE FUNCTION write(x INT) RETURNS INT AS 'INSERT INTO t VALUES (x); SELECT x' LANGUAGE SQL;
1255+
`); err != nil {
1256+
t.Fatal(err)
1257+
}
1258+
1259+
for _, tc := range []struct {
1260+
name string
1261+
query string
1262+
expRowsRead int64
1263+
expRowsWritten int64
1264+
}{
1265+
{
1266+
name: "simple read",
1267+
query: "SELECT k FROM t",
1268+
expRowsRead: 10,
1269+
expRowsWritten: 0,
1270+
},
1271+
{
1272+
name: "simple write",
1273+
query: "INSERT INTO t SELECT generate_series(11, 42)",
1274+
expRowsRead: 0,
1275+
expRowsWritten: 32,
1276+
},
1277+
{
1278+
name: "read with apply join",
1279+
query: `SELECT (
1280+
WITH foo AS MATERIALIZED (SELECT k FROM t AS x WHERE x.k = y.k)
1281+
SELECT * FROM foo
1282+
) FROM t AS y`,
1283+
expRowsRead: 84, // scanning the table twice
1284+
expRowsWritten: 0,
1285+
},
1286+
{
1287+
name: "routine, no reads",
1288+
query: "SELECT no_reads()",
1289+
expRowsRead: 0,
1290+
expRowsWritten: 0,
1291+
},
1292+
{
1293+
name: "routine, reads",
1294+
query: "SELECT reads()",
1295+
expRowsRead: 42,
1296+
expRowsWritten: 0,
1297+
},
1298+
{
1299+
name: "routine, write",
1300+
query: "SELECT write(43)",
1301+
expRowsRead: 0,
1302+
expRowsWritten: 1,
1303+
},
1304+
{
1305+
name: "routine, multiple reads and writes",
1306+
query: "SELECT reads(), write(44), reads(), write(45), write(46), reads()",
1307+
expRowsRead: 133, // first read is 43 rows, second is 44, third is 46
1308+
expRowsWritten: 3,
1309+
},
1310+
} {
1311+
t.Run(tc.name, func(t *testing.T) {
1312+
testQuery.Store(tc.query)
1313+
errCh := make(chan error)
1314+
// Spin up the worker goroutine which will actually execute the
1315+
// query.
1316+
go func() {
1317+
defer close(errCh)
1318+
_, err := sqlDB.Exec(tc.query)
1319+
errCh <- err
1320+
}()
1321+
// In the main goroutine, loop until the query is completed while
1322+
// accumulating the top-level query stats.
1323+
var rowsRead, rowsWritten int64
1324+
LOOP:
1325+
for {
1326+
select {
1327+
case read := <-rowsReadCh:
1328+
rowsRead += read
1329+
case written := <-rowsWrittenCh:
1330+
rowsWritten += written
1331+
case err := <-errCh:
1332+
require.NoError(t, err)
1333+
break LOOP
1334+
}
1335+
}
1336+
require.Equal(t, tc.expRowsRead, rowsRead)
1337+
require.Equal(t, tc.expRowsWritten, rowsWritten)
1338+
})
1339+
}
1340+
}

pkg/sql/inspect/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ go_test(
120120
"@com_github_cockroachdb_errors//:errors",
121121
"@com_github_cockroachdb_redact//:redact",
122122
"@com_github_gogo_protobuf//types",
123+
"@com_github_lib_pq//:pq",
123124
"@com_github_stretchr_testify//require",
124125
],
125126
)

pkg/sql/inspect/index_consistency_check_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,14 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/util/log"
2727
"github.com/cockroachdb/errors"
2828
"github.com/cockroachdb/redact"
29+
"github.com/lib/pq"
2930
"github.com/stretchr/testify/require"
3031
)
3132

32-
const expectedInspectFoundInconsistencies = "INSPECT found inconsistencies"
33+
const (
34+
expectedInspectFoundInconsistencies = "INSPECT found inconsistencies"
35+
expectedInspectInternalErrors = "INSPECT encountered internal errors"
36+
)
3337

3438
// requireCheckCountsMatch verifies that the job's total check count equals its completed check count.
3539
// This is used to verify that progress tracking correctly counted all checks.
@@ -231,7 +235,7 @@ func TestDetectIndexConsistencyErrors(t *testing.T) {
231235
expectedIssues: []inspectIssue{
232236
{ErrorType: "internal_error"},
233237
},
234-
expectedErrRegex: expectedInspectFoundInconsistencies,
238+
expectedErrRegex: expectedInspectInternalErrors,
235239
expectedInternalErrorPatterns: []map[string]string{
236240
{
237241
"error_message": "error decoding.*float64",
@@ -465,6 +469,10 @@ func TestDetectIndexConsistencyErrors(t *testing.T) {
465469

466470
require.Error(t, err)
467471
require.Regexp(t, tc.expectedErrRegex, err.Error())
472+
var pqErr *pq.Error
473+
require.True(t, errors.As(err, &pqErr), "expected pq.Error, got %T", err)
474+
require.NotEmpty(t, pqErr.Hint, "expected error to have a hint")
475+
require.Regexp(t, "SHOW INSPECT ERRORS FOR JOB [0-9]+ WITH DETAILS", pqErr.Hint)
468476

469477
numExpected := len(tc.expectedIssues)
470478
numFound := issueLogger.numIssuesFound()

pkg/sql/inspect/inspect_job.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ func (c *inspectResumer) OnFailOrCancel(
105105
execCfg := jobExecCtx.ExecCfg()
106106
c.maybeCleanupProtectedTimestamp(ctx, execCfg)
107107

108-
// Record RunsWithIssues metric if the job failed due to finding inconsistencies.
109-
if jobErr != nil && errors.Is(jobErr, errInspectFoundInconsistencies) {
108+
// Record RunsWithIssues metric if the job failed due to finding issues (including internal errors).
109+
if errors.Is(jobErr, errInspectFoundInconsistencies) || errors.Is(jobErr, errInspectInternalErrors) {
110110
execCfg.JobRegistry.MetricsStruct().Inspect.(*InspectMetrics).RunsWithIssues.Inc(1)
111111
}
112112
return nil

0 commit comments

Comments
 (0)