Skip to content

Commit fe15853

Browse files
craig[bot]stevendanna
andcommitted
Merge #155518
155518: workload: add changefeed-max-rate, changefeed-start-delay, and changefeed-resolved-target r=wenyihu6 a=stevendanna ``` --changefeed Optionally run a changefeed over the tables --changefeed-max-rate float Maximum frequency of changefeed ingestion. If 0, no limit. --changefeed-resolved-target duration The target frequency of resolved messages. O to disable resolved reporting and accept server defaults. (default 5s) --changefeed-start-delay duration How long to wait before starting the changefeed ``` `--changefeed-max-rate` can be used in conjunction with `--max-rate` to simulate an under-provisioned or correctly provisioned sink for the given workload. `---changefeed-start-delay` can be used to force a catch-up scan. Note that I've also renamed --with-changefeed to --changefeed so that all the changefeed options show up next to each other in the help. The resolved timestamp reporting is a bit of a hack but it seems to work for now. Epic: none Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents cf01ffa + a2c6498 commit fe15853

File tree

3 files changed

+100
-17
lines changed

3 files changed

+100
-17
lines changed

pkg/workload/changefeeds/changefeeds.go

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"encoding/json"
1111
"fmt"
1212
"strings"
13+
"time"
1314

1415
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1516
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -25,6 +26,7 @@ func AddChangefeedToQueryLoad(
2526
ctx context.Context,
2627
gen workload.ConnFlagser,
2728
dbName string,
29+
resolvedTarget time.Duration,
2830
urls []string,
2931
reg *histogram.Registry,
3032
ql *workload.QueryLoad,
@@ -33,6 +35,14 @@ func AddChangefeedToQueryLoad(
3335
Method: gen.ConnFlags().Method,
3436
MaxTotalConnections: 2,
3537
}
38+
cfLatency := reg.GetHandle().Get("changefeed")
39+
// TODO(ssd): This metric makes no sense has a histogram. The idea that every
40+
// metric we might want to report is a histogram is baked in pretty deeply so
41+
// I'll address that in a different PR (maybe)
42+
var cfResolved *histogram.NamedHistogram
43+
if resolvedTarget > 0 {
44+
cfResolved = reg.GetHandle().Get("changefeed-resolved")
45+
}
3646

3747
mcp, err := workload.NewMultiConnPool(ctx, cfg, urls...)
3848
if err != nil {
@@ -51,8 +61,6 @@ func AddChangefeedToQueryLoad(
5161
return err
5262
}
5363

54-
cfLatency := reg.GetHandle().Get("changefeed")
55-
5664
var sessionID string
5765
if err := conn.QueryRow(ctx, "SHOW session_id").Scan(&sessionID); err != nil {
5866
return errors.Wrap(err, "getting session_id")
@@ -68,6 +76,11 @@ func AddChangefeedToQueryLoad(
6876
return err
6977
}
7078

79+
var cursorStr string
80+
if err := conn.QueryRow(ctx, "SELECT cluster_logical_timestamp()").Scan(&cursorStr); err != nil {
81+
return err
82+
}
83+
7184
tableNames := strings.Builder{}
7285
for i, table := range gen.Tables() {
7386
if i == 0 {
@@ -77,8 +90,23 @@ func AddChangefeedToQueryLoad(
7790
}
7891
}
7992

80-
stmt := fmt.Sprintf("EXPERIMENTAL CHANGEFEED FOR %s WITH updated, no_initial_scan, schema_change_policy=nobackfill",
81-
tableNames.String())
93+
opts := []string{
94+
"updated",
95+
"no_initial_scan",
96+
"schema_change_policy=nobackfill",
97+
"cursor=$1",
98+
}
99+
args := []any{
100+
cursorStr,
101+
}
102+
if resolvedTarget > 0 {
103+
opts = append(opts, []string{"resolved=$2", "min_checkpoint_frequency=$2"}...)
104+
args = append(args, resolvedTarget.String())
105+
}
106+
stmt := fmt.Sprintf(
107+
"CREATE CHANGEFEED FOR %s WITH %s",
108+
tableNames.String(), strings.Join(opts, ","),
109+
)
82110
cfCtx, cancel := context.WithCancel(ctx)
83111

84112
var doneErr error
@@ -97,33 +125,57 @@ func AddChangefeedToQueryLoad(
97125
return false
98126
}
99127
var err error
100-
rows, err = conn.Query(cfCtx, stmt)
128+
rows, err = conn.Query(cfCtx, stmt, args...)
101129
return maybeMarkDone(err)
102130
}
103-
ql.WorkerFns = append(ql.WorkerFns, func(ctx context.Context) error {
131+
132+
var lastResolved hlc.Timestamp
133+
134+
ql.ChangefeedFns = append(ql.ChangefeedFns, func(ctx context.Context) error {
104135
if doneErr != nil {
105136
return doneErr
106137
}
107138
if maybeSetupRows() {
108139
return doneErr
109140
}
141+
110142
if rows.Next() {
111143
values, err := rows.Values()
112144
if maybeMarkDone(err) {
113145
return doneErr
114146
}
115147
type updatedJSON struct {
116-
Updated string `json:"updated"`
148+
Updated string `json:"updated"`
149+
Resolved string `json:"resolved"`
117150
}
118151
var v updatedJSON
119152
if maybeMarkDone(json.Unmarshal(values[2].([]byte), &v)) {
120153
return doneErr
121154
}
122-
updated, err := hlc.ParseHLC(v.Updated)
123-
if maybeMarkDone(err) {
124-
return doneErr
155+
if v.Updated != "" {
156+
updated, err := hlc.ParseHLC(v.Updated)
157+
if maybeMarkDone(err) {
158+
return doneErr
159+
}
160+
cfLatency.Record(timeutil.Since(updated.GoTime()))
161+
} else if v.Resolved != "" {
162+
resolved, err := hlc.ParseHLC(v.Resolved)
163+
if maybeMarkDone(err) {
164+
return doneErr
165+
}
166+
if resolved.Less(lastResolved) {
167+
return errors.Errorf("resolved timestamp %s is less than last resolved timestamp %s", resolved, lastResolved)
168+
}
169+
lastResolved = resolved
170+
} else {
171+
return errors.Errorf("failed to parse CHANGEFEED event: %s", values[2])
172+
}
173+
// Resolved timestamps arrived infrequently. Always record the time since
174+
// our lastResolved so that we don't get long periods of 0 in the
175+
// histogram.
176+
if cfResolved != nil {
177+
cfResolved.Record(timeutil.Since(lastResolved.GoTime()))
125178
}
126-
cfLatency.Record(timeutil.Since(updated.GoTime()))
127179
return nil
128180
}
129181
if maybeMarkDone(rows.Err()) {

pkg/workload/cli/run.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ var prometheusPort = sharedFlags.Int(
7171
2112,
7272
"Port to expose prometheus metrics if the workload has a prometheus gatherer set.",
7373
)
74-
var withChangefeed = runFlags.Bool("with-changefeed", false,
75-
"Optionally run a changefeed over the tables")
7674

7775
// individualOperationReceiverAddr is an address to send latency
7876
// measurements to. By default it will not send anything.
@@ -108,6 +106,18 @@ var secure = securityFlags.Bool("secure", false,
108106
var user = securityFlags.String("user", "root", "Specify a user to run the workload as")
109107
var password = securityFlags.String("password", "", "Optionally specify a password for the user")
110108

109+
// Options relating to the optional changefeed.
110+
var (
111+
changefeed = runFlags.Bool("changefeed", false,
112+
"Optionally run a changefeed over the tables")
113+
changefeedStartDelay = runFlags.Duration("changefeed-start-delay", 0*time.Second,
114+
"How long to wait before starting the changefeed")
115+
changefeedMaxRate = runFlags.Float64(
116+
"changefeed-max-rate", 0, "Maximum frequency of changefeed ingestion. If 0, no limit.")
117+
changefeedResolvedTarget = runFlags.Duration("changefeed-resolved-target", 5*time.Second,
118+
"The target frequency of resolved messages. O to disable resolved reporting and accept server defaults.")
119+
)
120+
111121
func init() {
112122

113123
_ = sharedFlags.MarkHidden("pprofport")
@@ -427,6 +437,10 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
427437
// with allowed burst of 1 at the maximum allowed rate.
428438
limiter = rate.NewLimiter(rate.Limit(*maxRate), 1)
429439
}
440+
var changefeedLimiter *rate.Limiter
441+
if *changefeedMaxRate > 0 {
442+
changefeedLimiter = rate.NewLimiter(rate.Limit(*changefeedMaxRate), 1)
443+
}
430444

431445
maybeLogRandomSeed(ctx, gen)
432446
o, ok := gen.(workload.Opser)
@@ -491,9 +505,9 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
491505
return errors.Wrapf(err, "failed to initialize the load generator")
492506
}
493507

494-
if *withChangefeed {
508+
if *changefeed {
495509
log.Dev.Infof(ctx, "adding changefeed to query load...")
496-
err = changefeeds.AddChangefeedToQueryLoad(ctx, gen.(workload.ConnFlagser), dbName, urls, reg, &ops)
510+
err = changefeeds.AddChangefeedToQueryLoad(ctx, gen.(workload.ConnFlagser), dbName, *changefeedResolvedTarget, urls, reg, &ops)
497511
if err != nil && !*tolerateErrors {
498512
return errors.Wrapf(err, "failed to initialize changefeed")
499513
}
@@ -537,8 +551,17 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
537551
workersCtx, cancelWorkers := context.WithCancel(ctx)
538552
defer cancelWorkers()
539553
var wg sync.WaitGroup
540-
wg.Add(len(ops.WorkerFns))
554+
wg.Add(len(ops.WorkerFns) + len(ops.ChangefeedFns))
541555
go func() {
556+
for _, workFn := range ops.ChangefeedFns {
557+
go func(workFn func(context.Context) error) {
558+
if *changefeedStartDelay > 0 {
559+
time.Sleep(*changefeedStartDelay)
560+
}
561+
workerRun(workersCtx, errCh, &wg, changefeedLimiter, workFn)
562+
}(workFn)
563+
}
564+
542565
// If a ramp period was specified, start all the workers gradually
543566
// with a new context.
544567
var rampCtx context.Context
@@ -609,6 +632,13 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
609632
log.Dev.Warningf(ctx, "histogram: %v", err)
610633
}
611634
}
635+
// TODO(ssd): Ugly hack. Until we support something other than
636+
// histograms here, for this particular metric, if we reset the
637+
// histogram. We don't reset the Cumulative histogram which lets us see
638+
// pMax.
639+
if t.Name == "changefeed-resolved" {
640+
t.Hist.Reset()
641+
}
612642
})
613643

614644
// Once the load generator is fully ramped up, we reset the histogram

pkg/workload/workload.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,8 @@ func (l requiresCCLBinaryDataLoader) InitialDataLoad(
419419
type QueryLoad struct {
420420
// WorkerFns is one function per worker. It is to be called once per unit of
421421
// work to be done.
422-
WorkerFns []func(context.Context) error
422+
WorkerFns []func(context.Context) error
423+
ChangefeedFns []func(context.Context) error
423424

424425
// Close, if set, is called before the process exits, giving workloads a
425426
// chance to print some information or perform cleanup.

0 commit comments

Comments
 (0)