@@ -14,22 +14,33 @@ import (
1414 "github.com/cockroachdb/cockroach/pkg/keys"
1515 "github.com/cockroachdb/cockroach/pkg/roachpb"
1616 "github.com/cockroachdb/cockroach/pkg/security/username"
17+ "github.com/cockroachdb/cockroach/pkg/settings"
1718 "github.com/cockroachdb/cockroach/pkg/sql/catalog"
1819 "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
1920 "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2021 "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2122 "github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2223 "github.com/cockroachdb/cockroach/pkg/sql/isql"
2324 "github.com/cockroachdb/cockroach/pkg/sql/lexbase"
25+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
26+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
2427 "github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
2528 "github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2629 "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
2730 "github.com/cockroachdb/cockroach/pkg/sql/spanutils"
2831 "github.com/cockroachdb/cockroach/pkg/util/hlc"
32+ "github.com/cockroachdb/cockroach/pkg/util/log"
2933 "github.com/cockroachdb/errors"
3034 "github.com/cockroachdb/redact"
3135)
3236
37+ var indexConsistencyHashEnabled = settings .RegisterBoolSetting (
38+ settings .ApplicationLevel ,
39+ "sql.inspect.index_consistency_hash.enabled" ,
40+ "if false, the index consistency check skips the hash precheck and always runs the full join" ,
41+ true ,
42+ )
43+
3344// indexConsistencyCheckApplicability is a lightweight version that only implements applicability logic.
3445type indexConsistencyCheckApplicability struct {
3546 tableID descpb.ID
@@ -44,6 +55,21 @@ func (c *indexConsistencyCheckApplicability) AppliesTo(
4455 return spanContainsTable (c .tableID , codec , span )
4556}
4657
58+ // checkState represents the state of an index consistency check.
59+ type checkState int
60+
61+ const (
62+ // checkNotStarted indicates Start() has not been called yet.
63+ checkNotStarted checkState = iota
64+ // checkHashMatched indicates the hash precheck passed - no corruption detected,
65+ // so the full check can be skipped.
66+ checkHashMatched
67+ // checkRunning indicates the full check is actively running and may produce more results.
68+ checkRunning
69+ // checkDone indicates the check has finished (iterator exhausted or error occurred).
70+ checkDone
71+ )
72+
4773// indexConsistencyCheck verifies consistency between a table's primary index
4874// and a specified secondary index by streaming rows from both sides of a
4975// query. It reports an issue if a key exists in the primary but not the
@@ -55,11 +81,11 @@ type indexConsistencyCheck struct {
5581 indexID descpb.IndexID
5682 asOf hlc.Timestamp
5783
58- tableDesc catalog.TableDescriptor
59- secIndex catalog.Index
60- priIndex catalog.Index
61- rowIter isql.Rows
62- exhaustedIter bool
84+ tableDesc catalog.TableDescriptor
85+ secIndex catalog.Index
86+ priIndex catalog.Index
87+ rowIter isql.Rows
88+ state checkState
6389
6490 // columns is a list of the columns returned by one side of the
6591 // queries join. The actual resulting rows from the RowContainer is
@@ -76,7 +102,7 @@ var _ inspectCheckApplicability = (*indexConsistencyCheck)(nil)
76102
77103// Started implements the inspectCheck interface.
78104func (c * indexConsistencyCheck ) Started () bool {
79- return c .rowIter != nil
105+ return c .state != checkNotStarted
80106}
81107
82108// Start implements the inspectCheck interface.
@@ -132,6 +158,10 @@ func (c *indexConsistencyCheck) Start(
132158 return res
133159 }
134160
161+ pkColNames := colNames (pkColumns )
162+ otherColNames := colNames (otherColumns )
163+ allColNames := colNames (c .columns )
164+
135165 // Generate query bounds from the span to limit the query to the specified range
136166 var predicate string
137167 var queryArgs []interface {}
@@ -167,6 +197,7 @@ func (c *indexConsistencyCheck) Start(
167197
168198 // Nothing to do if no rows exist in the span.
169199 if ! hasRows {
200+ c .state = checkDone
170201 return nil
171202 }
172203
@@ -175,7 +206,6 @@ func (c *indexConsistencyCheck) Start(
175206 }
176207
177208 // Generate SQL predicate from the bounds
178- pkColNames := colNames (pkColumns )
179209 // Encode column names for SQL usage
180210 encodedPkColNames := make ([]string , len (pkColNames ))
181211 for i , colName := range pkColNames {
@@ -202,8 +232,26 @@ func (c *indexConsistencyCheck) Start(
202232 queryArgs = append (queryArgs , datum )
203233 }
204234
235+ if indexConsistencyHashEnabled .Get (& c .flowCtx .Cfg .Settings .SV ) && len (allColNames ) > 0 {
236+ match , hashErr := c .hashesMatch (ctx , allColNames , predicate , queryArgs )
237+ if hashErr != nil {
238+ // If hashing fails, we usually fall back to the full check. But if the
239+ // error stems from query construction, that's an internal bug and shouldn't
240+ // be ignored.
241+ if isQueryConstructionError (hashErr ) {
242+ return errors .WithAssertionFailure (hashErr )
243+ }
244+ log .Dev .Infof (ctx , "hash precheck for index consistency did not match; falling back to full check: %v" , hashErr )
245+ }
246+ if match {
247+ // Hashes match, no corruption detected - skip the full check.
248+ c .state = checkHashMatched
249+ return nil
250+ }
251+ }
252+
205253 checkQuery := c .createIndexCheckQuery (
206- colNames ( pkColumns ), colNames ( otherColumns ) , c .tableDesc .GetID (), c .secIndex , c .priIndex .GetID (), predicate ,
254+ pkColNames , otherColNames , c .tableDesc .GetID (), c .secIndex , c .priIndex .GetID (), predicate ,
207255 )
208256
209257 // Wrap the query with AS OF SYSTEM TIME to ensure it uses the specified timestamp
@@ -232,13 +280,19 @@ func (c *indexConsistencyCheck) Start(
232280 // do that here because the results of the iterator are used in the Next()
233281 // function.
234282 c .rowIter = it
283+ c .state = checkRunning
235284 return nil
236285}
237286
238287// Next implements the inspectCheck interface.
239288func (c * indexConsistencyCheck ) Next (
240289 ctx context.Context , cfg * execinfra.ServerConfig ,
241290) (* inspectIssue , error ) {
291+ // If hashes matched, there's no corruption to report.
292+ if c .state == checkHashMatched {
293+ return nil , nil
294+ }
295+
242296 if c .rowIter == nil {
243297 return nil , errors .AssertionFailedf ("nil rowIter unexpected" )
244298 }
@@ -248,7 +302,7 @@ func (c *indexConsistencyCheck) Next(
248302 // Close the iterator to prevent further usage. The close may emit the
249303 // internal error too, but we only need to capture it once.
250304 _ = c .Close (ctx )
251- c .exhaustedIter = true
305+ c .state = checkDone
252306
253307 // Convert internal errors to inspect issues rather than failing the entire job.
254308 // This allows us to capture and log data corruption or encoding errors as
@@ -269,7 +323,7 @@ func (c *indexConsistencyCheck) Next(
269323 }, nil
270324 }
271325 if ! ok {
272- c .exhaustedIter = true
326+ c .state = checkDone
273327 return nil , nil
274328 }
275329
@@ -336,12 +390,8 @@ func (c *indexConsistencyCheck) Next(
336390
337391// Done implements the inspectCheck interface.
338392func (c * indexConsistencyCheck ) Done (context.Context ) bool {
339- // If we never started (rowIter is nil), we're done
340- if c .rowIter == nil {
341- return true
342- }
343- // Otherwise, we're done when the iterator is exhausted
344- return c .exhaustedIter
393+ done := c .state == checkHashMatched || c .state == checkDone
394+ return done
345395}
346396
347397// Close implements the inspectCheck interface.
@@ -660,6 +710,93 @@ func (c *indexConsistencyCheck) createIndexCheckQuery(
660710 )
661711}
662712
713+ type hashResult struct {
714+ rowCount int64
715+ hash string
716+ }
717+
718+ // hashesMatch performs a fast comparison of primary and secondary indexes by
719+ // computing row counts and hash values. Returns true if both indexes have
720+ // identical row counts and hash values, indicating no corruption.
721+ func (c * indexConsistencyCheck ) hashesMatch (
722+ ctx context.Context , columnNames []string , predicate string , queryArgs []interface {},
723+ ) (bool , error ) {
724+ primary , err := c .computeHashAndRowCount (ctx , c .priIndex , columnNames , predicate , queryArgs )
725+ if err != nil {
726+ return false , errors .Wrapf (err , "computing hash for primary index %s" , c .priIndex .GetName ())
727+ }
728+ secondary , err := c .computeHashAndRowCount (ctx , c .secIndex , columnNames , predicate , queryArgs )
729+ if err != nil {
730+ return false , errors .Wrapf (err , "computing hash for secondary index %s" , c .secIndex .GetName ())
731+ }
732+ // Hashes match only if both row count and hash value are identical.
733+ return primary .rowCount == secondary .rowCount && primary .hash == secondary .hash , nil
734+ }
735+
736+ // computeHashAndRowCount executes a hash query for the specified index and
737+ // returns the row count and XOR aggregate hash value.
738+ func (c * indexConsistencyCheck ) computeHashAndRowCount (
739+ ctx context.Context ,
740+ index catalog.Index ,
741+ columnNames []string ,
742+ predicate string ,
743+ queryArgs []interface {},
744+ ) (hashResult , error ) {
745+ query := buildIndexHashQuery (c .tableDesc .GetID (), index , columnNames , predicate )
746+ queryWithAsOf := fmt .Sprintf ("SELECT * FROM (%s) AS OF SYSTEM TIME %s" , query , c .asOf .AsOfSystemTime ())
747+
748+ qos := getInspectQoS (& c .flowCtx .Cfg .Settings .SV )
749+ row , err := c .flowCtx .Cfg .DB .Executor ().QueryRowEx (
750+ ctx , "inspect-index-consistency-hash" , nil , /* txn */
751+ sessiondata.InternalExecutorOverride {
752+ User : username .NodeUserName (),
753+ QualityOfService : & qos ,
754+ },
755+ queryWithAsOf ,
756+ queryArgs ... ,
757+ )
758+ if err != nil {
759+ return hashResult {}, err
760+ }
761+ if len (row ) != 2 {
762+ return hashResult {}, errors .AssertionFailedf ("hash query returned unexpected column count: %d" , len (row ))
763+ }
764+ return hashResult {
765+ rowCount : int64 (tree .MustBeDInt (row [0 ])),
766+ hash : string (tree .MustBeDBytes (row [1 ])),
767+ }, nil
768+ }
769+
770+ // buildIndexHashQuery constructs a query that computes row count and XOR
771+ // aggregate hash for the specified index and columns.
772+ func buildIndexHashQuery (
773+ tableID descpb.ID , index catalog.Index , columnNames []string , predicate string ,
774+ ) string {
775+ hashExpr := hashInputExpression (columnNames )
776+ whereClause := buildWhereClause (predicate , nil /* nullFilters */ )
777+ return fmt .Sprintf (`
778+ SELECT
779+ count(*) AS row_count,
780+ crdb_internal.datums_to_bytes(xor_agg(fnv64(%s))) AS hash_value
781+ FROM [%d AS t]@{FORCE_INDEX=[%d]}%s` ,
782+ hashExpr ,
783+ tableID ,
784+ index .GetID (),
785+ whereClause ,
786+ )
787+ }
788+
789+ // hashInputExpression creates a hash-friendly expression by encoding column
790+ // values to bytes with NULL coalesced to empty bytes.
791+ func hashInputExpression (columnNames []string ) string {
792+ args := make ([]string , len (columnNames ))
793+ for i , col := range columnNames {
794+ args [i ] = colRef ("t" , col )
795+ }
796+ encoded := fmt .Sprintf ("crdb_internal.datums_to_bytes(%s)" , strings .Join (args , ", " ))
797+ return fmt .Sprintf ("COALESCE(%s, ''::BYTES)" , encoded )
798+ }
799+
663800// encodeColumnName properly encodes a column name for use in SQL.
664801func encodeColumnName (columnName string ) string {
665802 var buf bytes.Buffer
@@ -751,3 +888,20 @@ func buildWhereClause(predicate string, nullFilters []string) string {
751888
752889 return buf .String ()
753890}
891+
892+ // isQueryConstructionError checks if the given error is due to
893+ // invalid syntax or references in the query construction.
894+ func isQueryConstructionError (err error ) bool {
895+ code := pgerror .GetPGCode (err )
896+ switch code {
897+ case pgcode .Syntax ,
898+ pgcode .UndefinedColumn ,
899+ pgcode .UndefinedTable ,
900+ pgcode .UndefinedFunction ,
901+ pgcode .DatatypeMismatch ,
902+ pgcode .InvalidColumnReference :
903+ return true
904+ default :
905+ return false
906+ }
907+ }
0 commit comments