@@ -14,23 +14,34 @@ import (
1414 "github.com/cockroachdb/cockroach/pkg/keys"
1515 "github.com/cockroachdb/cockroach/pkg/roachpb"
1616 "github.com/cockroachdb/cockroach/pkg/security/username"
17+ "github.com/cockroachdb/cockroach/pkg/settings"
1718 "github.com/cockroachdb/cockroach/pkg/sql/catalog"
1819 "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
1920 "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2021 "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2122 "github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2223 "github.com/cockroachdb/cockroach/pkg/sql/isql"
2324 "github.com/cockroachdb/cockroach/pkg/sql/lexbase"
25+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
26+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
2427 "github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
2528 "github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2629 "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
2730 "github.com/cockroachdb/cockroach/pkg/sql/spanutils"
2831 "github.com/cockroachdb/cockroach/pkg/sql/types"
2932 "github.com/cockroachdb/cockroach/pkg/util/hlc"
33+ "github.com/cockroachdb/cockroach/pkg/util/log"
3034 "github.com/cockroachdb/errors"
3135 "github.com/cockroachdb/redact"
3236)
3337
38+ var indexConsistencyHashEnabled = settings .RegisterBoolSetting (
39+ settings .ApplicationLevel ,
40+ "sql.inspect.index_consistency_hash.enabled" ,
41+ "if false, the index consistency check skips the hash precheck and always runs the full join" ,
42+ true ,
43+ )
44+
3445// indexConsistencyCheckApplicability is a lightweight version that only implements applicability logic.
3546type indexConsistencyCheckApplicability struct {
3647 tableID descpb.ID
@@ -45,6 +56,21 @@ func (c *indexConsistencyCheckApplicability) AppliesTo(
4556 return spanContainsTable (c .tableID , codec , span )
4657}
4758
59+ // checkState represents the state of an index consistency check.
60+ type checkState int
61+
62+ const (
63+ // checkNotStarted indicates Start() has not been called yet.
64+ checkNotStarted checkState = iota
65+ // checkHashMatched indicates the hash precheck passed - no corruption detected,
66+ // so the full check can be skipped.
67+ checkHashMatched
68+ // checkRunning indicates the full check is actively running and may produce more results.
69+ checkRunning
70+ // checkDone indicates the check has finished (iterator exhausted or error occurred).
71+ checkDone
72+ )
73+
4874// indexConsistencyCheck verifies consistency between a table's primary index
4975// and a specified secondary index by streaming rows from both sides of a
5076// query. It reports an issue if a key exists in the primary but not the
@@ -56,11 +82,11 @@ type indexConsistencyCheck struct {
5682 indexID descpb.IndexID
5783 asOf hlc.Timestamp
5884
59- tableDesc catalog.TableDescriptor
60- secIndex catalog.Index
61- priIndex catalog.Index
62- rowIter isql.Rows
63- exhaustedIter bool
85+ tableDesc catalog.TableDescriptor
86+ secIndex catalog.Index
87+ priIndex catalog.Index
88+ rowIter isql.Rows
89+ state checkState
6490
6591 // columns is a list of the columns returned by one side of the
6692 // queries join. The actual resulting rows from the RowContainer is
@@ -77,7 +103,7 @@ var _ inspectCheckApplicability = (*indexConsistencyCheck)(nil)
77103
78104// Started implements the inspectCheck interface.
79105func (c * indexConsistencyCheck ) Started () bool {
80- return c .rowIter != nil
106+ return c .state != checkNotStarted
81107}
82108
83109// Start implements the inspectCheck interface.
@@ -141,6 +167,10 @@ func (c *indexConsistencyCheck) Start(
141167 return res
142168 }
143169
170+ pkColNames := colNames (pkColumns )
171+ otherColNames := colNames (otherColumns )
172+ allColNames := colNames (c .columns )
173+
144174 // Generate query bounds from the span to limit the query to the specified range
145175 var predicate string
146176 var queryArgs []interface {}
@@ -188,7 +218,6 @@ func (c *indexConsistencyCheck) Start(
188218 }
189219
190220 // Generate SQL predicate from the bounds
191- pkColNames := colNames (pkColumns )
192221 // Encode column names for SQL usage
193222 encodedPkColNames := make ([]string , len (pkColNames ))
194223 for i , colName := range pkColNames {
@@ -216,8 +245,27 @@ func (c *indexConsistencyCheck) Start(
216245 }
217246 }
218247
248+ if indexConsistencyHashEnabled .Get (& c .flowCtx .Cfg .Settings .SV ) && len (allColNames ) > 0 {
249+ match , hashErr := c .hashesMatch (ctx , allColNames , predicate , queryArgs )
250+ if hashErr != nil {
251+ // If hashing fails, we usually fall back to the full check. But if the
252+ // error stems from query construction, that's an internal bug and shouldn't
253+ // be ignored.
254+ if isQueryConstructionError (hashErr ) {
255+ return errors .WithAssertionFailure (hashErr )
256+ }
257+ log .Dev .Infof (ctx , "hash precheck for index consistency did not match; falling back to full check: %v" , hashErr )
258+ }
259+ if match {
260+ // Hashes match, no corruption detected - skip the full check.
261+ c .state = checkHashMatched
262+ return nil
263+ }
264+ }
265+
266+ joinColNames := colNames (joinColumns )
219267 checkQuery := c .createIndexCheckQuery (
220- colNames ( pkColumns ), colNames ( otherColumns ), colNames ( joinColumns ) ,
268+ pkColNames , otherColNames , joinColNames ,
221269 c .tableDesc .GetID (), c .secIndex , c .priIndex .GetID (), predicate ,
222270 )
223271
@@ -247,13 +295,19 @@ func (c *indexConsistencyCheck) Start(
247295 // do that here because the results of the iterator are used in the Next()
248296 // function.
249297 c .rowIter = it
298+ c .state = checkRunning
250299 return nil
251300}
252301
253302// Next implements the inspectCheck interface.
254303func (c * indexConsistencyCheck ) Next (
255304 ctx context.Context , cfg * execinfra.ServerConfig ,
256305) (* inspectIssue , error ) {
306+ // If hashes matched, there's no corruption to report.
307+ if c .state == checkHashMatched {
308+ return nil , nil
309+ }
310+
257311 if c .rowIter == nil {
258312 return nil , errors .AssertionFailedf ("nil rowIter unexpected" )
259313 }
@@ -263,7 +317,7 @@ func (c *indexConsistencyCheck) Next(
263317 // Close the iterator to prevent further usage. The close may emit the
264318 // internal error too, but we only need to capture it once.
265319 _ = c .Close (ctx )
266- c .exhaustedIter = true
320+ c .state = checkDone
267321
268322 // Convert internal errors to inspect issues rather than failing the entire job.
269323 // This allows us to capture and log data corruption or encoding errors as
@@ -284,7 +338,7 @@ func (c *indexConsistencyCheck) Next(
284338 }, nil
285339 }
286340 if ! ok {
287- c .exhaustedIter = true
341+ c .state = checkDone
288342 return nil , nil
289343 }
290344
@@ -351,12 +405,8 @@ func (c *indexConsistencyCheck) Next(
351405
352406// Done implements the inspectCheck interface.
353407func (c * indexConsistencyCheck ) Done (context.Context ) bool {
354- // If we never started (rowIter is nil), we're done
355- if c .rowIter == nil {
356- return true
357- }
358- // Otherwise, we're done when the iterator is exhausted
359- return c .exhaustedIter
408+ done := c .state == checkHashMatched || c .state == checkDone
409+ return done
360410}
361411
362412// Close implements the inspectCheck interface.
@@ -675,6 +725,93 @@ func (c *indexConsistencyCheck) createIndexCheckQuery(
675725 )
676726}
677727
728+ type hashResult struct {
729+ rowCount int64
730+ hash string
731+ }
732+
733+ // hashesMatch performs a fast comparison of primary and secondary indexes by
734+ // computing row counts and hash values. Returns true if both indexes have
735+ // identical row counts and hash values, indicating no corruption.
736+ func (c * indexConsistencyCheck ) hashesMatch (
737+ ctx context.Context , columnNames []string , predicate string , queryArgs []interface {},
738+ ) (bool , error ) {
739+ primary , err := c .computeHashAndRowCount (ctx , c .priIndex , columnNames , predicate , queryArgs )
740+ if err != nil {
741+ return false , errors .Wrapf (err , "computing hash for primary index %s" , c .priIndex .GetName ())
742+ }
743+ secondary , err := c .computeHashAndRowCount (ctx , c .secIndex , columnNames , predicate , queryArgs )
744+ if err != nil {
745+ return false , errors .Wrapf (err , "computing hash for secondary index %s" , c .secIndex .GetName ())
746+ }
747+ // Hashes match only if both row count and hash value are identical.
748+ return primary .rowCount == secondary .rowCount && primary .hash == secondary .hash , nil
749+ }
750+
751+ // computeHashAndRowCount executes a hash query for the specified index and
752+ // returns the row count and XOR aggregate hash value.
753+ func (c * indexConsistencyCheck ) computeHashAndRowCount (
754+ ctx context.Context ,
755+ index catalog.Index ,
756+ columnNames []string ,
757+ predicate string ,
758+ queryArgs []interface {},
759+ ) (hashResult , error ) {
760+ query := buildIndexHashQuery (c .tableDesc .GetID (), index , columnNames , predicate )
761+ queryWithAsOf := fmt .Sprintf ("SELECT * FROM (%s) AS OF SYSTEM TIME %s" , query , c .asOf .AsOfSystemTime ())
762+
763+ qos := getInspectQoS (& c .flowCtx .Cfg .Settings .SV )
764+ row , err := c .flowCtx .Cfg .DB .Executor ().QueryRowEx (
765+ ctx , "inspect-index-consistency-hash" , nil , /* txn */
766+ sessiondata.InternalExecutorOverride {
767+ User : username .NodeUserName (),
768+ QualityOfService : & qos ,
769+ },
770+ queryWithAsOf ,
771+ queryArgs ... ,
772+ )
773+ if err != nil {
774+ return hashResult {}, err
775+ }
776+ if len (row ) != 2 {
777+ return hashResult {}, errors .AssertionFailedf ("hash query returned unexpected column count: %d" , len (row ))
778+ }
779+ return hashResult {
780+ rowCount : int64 (tree .MustBeDInt (row [0 ])),
781+ hash : string (tree .MustBeDBytes (row [1 ])),
782+ }, nil
783+ }
784+
785+ // buildIndexHashQuery constructs a query that computes row count and XOR
786+ // aggregate hash for the specified index and columns.
787+ func buildIndexHashQuery (
788+ tableID descpb.ID , index catalog.Index , columnNames []string , predicate string ,
789+ ) string {
790+ hashExpr := hashInputExpression (columnNames )
791+ whereClause := buildWhereClause (predicate , nil /* nullFilters */ )
792+ return fmt .Sprintf (`
793+ SELECT
794+ count(*) AS row_count,
795+ crdb_internal.datums_to_bytes(xor_agg(fnv64(%s))) AS hash_value
796+ FROM [%d AS t]@{FORCE_INDEX=[%d]}%s` ,
797+ hashExpr ,
798+ tableID ,
799+ index .GetID (),
800+ whereClause ,
801+ )
802+ }
803+
804+ // hashInputExpression creates a hash-friendly expression by encoding column
805+ // values to bytes with NULL coalesced to empty bytes.
806+ func hashInputExpression (columnNames []string ) string {
807+ args := make ([]string , len (columnNames ))
808+ for i , col := range columnNames {
809+ args [i ] = colRef ("t" , col )
810+ }
811+ encoded := fmt .Sprintf ("crdb_internal.datums_to_bytes(%s)" , strings .Join (args , ", " ))
812+ return fmt .Sprintf ("COALESCE(%s, ''::BYTES)" , encoded )
813+ }
814+
678815// encodeColumnName properly encodes a column name for use in SQL.
679816func encodeColumnName (columnName string ) string {
680817 var buf bytes.Buffer
@@ -766,3 +903,20 @@ func buildWhereClause(predicate string, nullFilters []string) string {
766903
767904 return buf .String ()
768905}
906+
907+ // isQueryConstructionError checks if the given error is due to
908+ // invalid syntax or references in the query construction.
909+ func isQueryConstructionError (err error ) bool {
910+ code := pgerror .GetPGCode (err )
911+ switch code {
912+ case pgcode .Syntax ,
913+ pgcode .UndefinedColumn ,
914+ pgcode .UndefinedTable ,
915+ pgcode .UndefinedFunction ,
916+ pgcode .DatatypeMismatch ,
917+ pgcode .InvalidColumnReference :
918+ return true
919+ default :
920+ return false
921+ }
922+ }
0 commit comments