@@ -14,23 +14,34 @@ import (
1414	"github.com/cockroachdb/cockroach/pkg/keys" 
1515	"github.com/cockroachdb/cockroach/pkg/roachpb" 
1616	"github.com/cockroachdb/cockroach/pkg/security/username" 
17+ 	"github.com/cockroachdb/cockroach/pkg/settings" 
1718	"github.com/cockroachdb/cockroach/pkg/sql/catalog" 
1819	"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" 
1920	"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" 
2021	"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" 
2122	"github.com/cockroachdb/cockroach/pkg/sql/execinfra" 
2223	"github.com/cockroachdb/cockroach/pkg/sql/isql" 
2324	"github.com/cockroachdb/cockroach/pkg/sql/lexbase" 
25+ 	"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" 
26+ 	"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" 
2427	"github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype" 
2528	"github.com/cockroachdb/cockroach/pkg/sql/sem/tree" 
2629	"github.com/cockroachdb/cockroach/pkg/sql/sessiondata" 
2730	"github.com/cockroachdb/cockroach/pkg/sql/spanutils" 
2831	"github.com/cockroachdb/cockroach/pkg/sql/types" 
2932	"github.com/cockroachdb/cockroach/pkg/util/hlc" 
33+ 	"github.com/cockroachdb/cockroach/pkg/util/log" 
3034	"github.com/cockroachdb/errors" 
3135	"github.com/cockroachdb/redact" 
3236)
3337
38+ var  indexConsistencyHashEnabled  =  settings .RegisterBoolSetting (
39+ 	settings .ApplicationLevel ,
40+ 	"sql.inspect.index_consistency_hash.enabled" ,
41+ 	"if false, the index consistency check skips the hash precheck and always runs the full join" ,
42+ 	true ,
43+ )
44+ 
3445// indexConsistencyCheckApplicability is a lightweight version that only implements applicability logic. 
3546type  indexConsistencyCheckApplicability  struct  {
3647	tableID  descpb.ID 
@@ -45,6 +56,21 @@ func (c *indexConsistencyCheckApplicability) AppliesTo(
4556	return  spanContainsTable (c .tableID , codec , span )
4657}
4758
59+ // checkState represents the state of an index consistency check. 
60+ type  checkState  int 
61+ 
62+ const  (
63+ 	// checkNotStarted indicates Start() has not been called yet. 
64+ 	checkNotStarted  checkState  =  iota 
65+ 	// checkHashMatched indicates the hash precheck passed - no corruption detected, 
66+ 	// so the full check can be skipped. 
67+ 	checkHashMatched 
68+ 	// checkRunning indicates the full check is actively running and may produce more results. 
69+ 	checkRunning 
70+ 	// checkDone indicates the check has finished (iterator exhausted or error occurred). 
71+ 	checkDone 
72+ )
73+ 
4874// indexConsistencyCheck verifies consistency between a table's primary index 
4975// and a specified secondary index by streaming rows from both sides of a 
5076// query. It reports an issue if a key exists in the primary but not the 
@@ -56,11 +82,11 @@ type indexConsistencyCheck struct {
5682	indexID  descpb.IndexID 
5783	asOf     hlc.Timestamp 
5884
59- 	tableDesc       catalog.TableDescriptor 
60- 	secIndex        catalog.Index 
61- 	priIndex        catalog.Index 
62- 	rowIter         isql.Rows 
63- 	exhaustedIter   bool 
85+ 	tableDesc  catalog.TableDescriptor 
86+ 	secIndex   catalog.Index 
87+ 	priIndex   catalog.Index 
88+ 	rowIter    isql.Rows 
89+ 	state       checkState 
6490
6591	// columns is a list of the columns returned by one side of the 
6692	// queries join. The actual resulting rows from the RowContainer is 
@@ -77,7 +103,7 @@ var _ inspectCheckApplicability = (*indexConsistencyCheck)(nil)
77103
78104// Started implements the inspectCheck interface. 
79105func  (c  * indexConsistencyCheck ) Started () bool  {
80- 	return  c .rowIter  !=  nil 
106+ 	return  c .state  !=  checkNotStarted 
81107}
82108
83109// Start implements the inspectCheck interface. 
@@ -141,6 +167,10 @@ func (c *indexConsistencyCheck) Start(
141167		return  res 
142168	}
143169
170+ 	pkColNames  :=  colNames (pkColumns )
171+ 	otherColNames  :=  colNames (otherColumns )
172+ 	allColNames  :=  colNames (c .columns )
173+ 
144174	// Generate query bounds from the span to limit the query to the specified range 
145175	var  predicate  string 
146176	var  queryArgs  []interface {}
@@ -188,7 +218,6 @@ func (c *indexConsistencyCheck) Start(
188218		}
189219
190220		// Generate SQL predicate from the bounds 
191- 		pkColNames  :=  colNames (pkColumns )
192221		// Encode column names for SQL usage 
193222		encodedPkColNames  :=  make ([]string , len (pkColNames ))
194223		for  i , colName  :=  range  pkColNames  {
@@ -216,8 +245,27 @@ func (c *indexConsistencyCheck) Start(
216245		}
217246	}
218247
248+ 	if  indexConsistencyHashEnabled .Get (& c .flowCtx .Cfg .Settings .SV ) &&  len (allColNames ) >  0  {
249+ 		match , hashErr  :=  c .hashesMatch (ctx , allColNames , predicate , queryArgs )
250+ 		if  hashErr  !=  nil  {
251+ 			// If hashing fails, we usually fall back to the full check. But if the 
252+ 			// error stems from query construction, that's an internal bug and shouldn't 
253+ 			// be ignored. 
254+ 			if  isQueryConstructionError (hashErr ) {
255+ 				return  errors .WithAssertionFailure (hashErr )
256+ 			}
257+ 			log .Dev .Infof (ctx , "hash precheck for index consistency did not match; falling back to full check: %v" , hashErr )
258+ 		}
259+ 		if  match  {
260+ 			// Hashes match, no corruption detected - skip the full check. 
261+ 			c .state  =  checkHashMatched 
262+ 			return  nil 
263+ 		}
264+ 	}
265+ 
266+ 	joinColNames  :=  colNames (joinColumns )
219267	checkQuery  :=  c .createIndexCheckQuery (
220- 		colNames ( pkColumns ),  colNames ( otherColumns ),  colNames ( joinColumns ) ,
268+ 		pkColNames ,  otherColNames ,  joinColNames ,
221269		c .tableDesc .GetID (), c .secIndex , c .priIndex .GetID (), predicate ,
222270	)
223271
@@ -247,13 +295,19 @@ func (c *indexConsistencyCheck) Start(
247295	// do that here because the results of the iterator are used in the Next() 
248296	// function. 
249297	c .rowIter  =  it 
298+ 	c .state  =  checkRunning 
250299	return  nil 
251300}
252301
253302// Next implements the inspectCheck interface. 
254303func  (c  * indexConsistencyCheck ) Next (
255304	ctx  context.Context , cfg  * execinfra.ServerConfig ,
256305) (* inspectIssue , error ) {
306+ 	// If hashes matched, there's no corruption to report. 
307+ 	if  c .state  ==  checkHashMatched  {
308+ 		return  nil , nil 
309+ 	}
310+ 
257311	if  c .rowIter  ==  nil  {
258312		return  nil , errors .AssertionFailedf ("nil rowIter unexpected" )
259313	}
@@ -263,7 +317,7 @@ func (c *indexConsistencyCheck) Next(
263317		// Close the iterator to prevent further usage. The close may emit the 
264318		// internal error too, but we only need to capture it once. 
265319		_  =  c .Close (ctx )
266- 		c .exhaustedIter  =  true 
320+ 		c .state  =  checkDone 
267321
268322		// Convert internal errors to inspect issues rather than failing the entire job. 
269323		// This allows us to capture and log data corruption or encoding errors as 
@@ -284,7 +338,7 @@ func (c *indexConsistencyCheck) Next(
284338		}, nil 
285339	}
286340	if  ! ok  {
287- 		c .exhaustedIter  =  true 
341+ 		c .state  =  checkDone 
288342		return  nil , nil 
289343	}
290344
@@ -351,12 +405,8 @@ func (c *indexConsistencyCheck) Next(
351405
352406// Done implements the inspectCheck interface. 
353407func  (c  * indexConsistencyCheck ) Done (context.Context ) bool  {
354- 	// If we never started (rowIter is nil), we're done 
355- 	if  c .rowIter  ==  nil  {
356- 		return  true 
357- 	}
358- 	// Otherwise, we're done when the iterator is exhausted 
359- 	return  c .exhaustedIter 
408+ 	done  :=  c .state  ==  checkHashMatched  ||  c .state  ==  checkDone 
409+ 	return  done 
360410}
361411
362412// Close implements the inspectCheck interface. 
@@ -675,6 +725,93 @@ func (c *indexConsistencyCheck) createIndexCheckQuery(
675725	)
676726}
677727
728+ type  hashResult  struct  {
729+ 	rowCount  int64 
730+ 	hash      string 
731+ }
732+ 
733+ // hashesMatch performs a fast comparison of primary and secondary indexes by 
734+ // computing row counts and hash values. Returns true if both indexes have 
735+ // identical row counts and hash values, indicating no corruption. 
736+ func  (c  * indexConsistencyCheck ) hashesMatch (
737+ 	ctx  context.Context , columnNames  []string , predicate  string , queryArgs  []interface {},
738+ ) (bool , error ) {
739+ 	primary , err  :=  c .computeHashAndRowCount (ctx , c .priIndex , columnNames , predicate , queryArgs )
740+ 	if  err  !=  nil  {
741+ 		return  false , errors .Wrapf (err , "computing hash for primary index %s" , c .priIndex .GetName ())
742+ 	}
743+ 	secondary , err  :=  c .computeHashAndRowCount (ctx , c .secIndex , columnNames , predicate , queryArgs )
744+ 	if  err  !=  nil  {
745+ 		return  false , errors .Wrapf (err , "computing hash for secondary index %s" , c .secIndex .GetName ())
746+ 	}
747+ 	// Hashes match only if both row count and hash value are identical. 
748+ 	return  primary .rowCount  ==  secondary .rowCount  &&  primary .hash  ==  secondary .hash , nil 
749+ }
750+ 
751+ // computeHashAndRowCount executes a hash query for the specified index and 
752+ // returns the row count and XOR aggregate hash value. 
753+ func  (c  * indexConsistencyCheck ) computeHashAndRowCount (
754+ 	ctx  context.Context ,
755+ 	index  catalog.Index ,
756+ 	columnNames  []string ,
757+ 	predicate  string ,
758+ 	queryArgs  []interface {},
759+ ) (hashResult , error ) {
760+ 	query  :=  buildIndexHashQuery (c .tableDesc .GetID (), index , columnNames , predicate )
761+ 	queryWithAsOf  :=  fmt .Sprintf ("SELECT * FROM (%s) AS OF SYSTEM TIME %s" , query , c .asOf .AsOfSystemTime ())
762+ 
763+ 	qos  :=  getInspectQoS (& c .flowCtx .Cfg .Settings .SV )
764+ 	row , err  :=  c .flowCtx .Cfg .DB .Executor ().QueryRowEx (
765+ 		ctx , "inspect-index-consistency-hash" , nil , /* txn */ 
766+ 		sessiondata.InternalExecutorOverride {
767+ 			User :             username .NodeUserName (),
768+ 			QualityOfService : & qos ,
769+ 		},
770+ 		queryWithAsOf ,
771+ 		queryArgs ... ,
772+ 	)
773+ 	if  err  !=  nil  {
774+ 		return  hashResult {}, err 
775+ 	}
776+ 	if  len (row ) !=  2  {
777+ 		return  hashResult {}, errors .AssertionFailedf ("hash query returned unexpected column count: %d" , len (row ))
778+ 	}
779+ 	return  hashResult {
780+ 		rowCount : int64 (tree .MustBeDInt (row [0 ])),
781+ 		hash :     string (tree .MustBeDBytes (row [1 ])),
782+ 	}, nil 
783+ }
784+ 
785+ // buildIndexHashQuery constructs a query that computes row count and XOR 
786+ // aggregate hash for the specified index and columns. 
787+ func  buildIndexHashQuery (
788+ 	tableID  descpb.ID , index  catalog.Index , columnNames  []string , predicate  string ,
789+ ) string  {
790+ 	hashExpr  :=  hashInputExpression (columnNames )
791+ 	whereClause  :=  buildWhereClause (predicate , nil  /* nullFilters */ )
792+ 	return  fmt .Sprintf (` 
793+ SELECT 
794+   count(*) AS row_count, 
795+   crdb_internal.datums_to_bytes(xor_agg(fnv64(%s))) AS hash_value 
796+ FROM [%d AS t]@{FORCE_INDEX=[%d]}%s` ,
797+ 		hashExpr ,
798+ 		tableID ,
799+ 		index .GetID (),
800+ 		whereClause ,
801+ 	)
802+ }
803+ 
804+ // hashInputExpression creates a hash-friendly expression by encoding column 
805+ // values to bytes with NULL coalesced to empty bytes. 
806+ func  hashInputExpression (columnNames  []string ) string  {
807+ 	args  :=  make ([]string , len (columnNames ))
808+ 	for  i , col  :=  range  columnNames  {
809+ 		args [i ] =  colRef ("t" , col )
810+ 	}
811+ 	encoded  :=  fmt .Sprintf ("crdb_internal.datums_to_bytes(%s)" , strings .Join (args , ", " ))
812+ 	return  fmt .Sprintf ("COALESCE(%s, ''::BYTES)" , encoded )
813+ }
814+ 
678815// encodeColumnName properly encodes a column name for use in SQL. 
679816func  encodeColumnName (columnName  string ) string  {
680817	var  buf  bytes.Buffer 
@@ -766,3 +903,20 @@ func buildWhereClause(predicate string, nullFilters []string) string {
766903
767904	return  buf .String ()
768905}
906+ 
907+ // isQueryConstructionError checks if the given error is due to 
908+ // invalid syntax or references in the query construction. 
909+ func  isQueryConstructionError (err  error ) bool  {
910+ 	code  :=  pgerror .GetPGCode (err )
911+ 	switch  code  {
912+ 	case  pgcode .Syntax ,
913+ 		pgcode .UndefinedColumn ,
914+ 		pgcode .UndefinedTable ,
915+ 		pgcode .UndefinedFunction ,
916+ 		pgcode .DatatypeMismatch ,
917+ 		pgcode .InvalidColumnReference :
918+ 		return  true 
919+ 	default :
920+ 		return  false 
921+ 	}
922+ }
0 commit comments