Skip to content

Commit ad79c96

Browse files
authored
Merge pull request #2 from getsentry/fpacifici/internal_topicctl_deletion
Same as segmentio#198 This is a PR in our fork so we can merge it in master here and start working on that while the PR on upstream gets reviewed.
2 parents aedd232 + 6972a4c commit ad79c96

File tree

6 files changed

+69
-6
lines changed

6 files changed

+69
-6
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,8 @@ The `apply` subcommand can make changes, but under the following conditions:
523523
8. Before applying, the tool checks the cluster ID against the expected value in the
524524
cluster config. This can help prevent errors around applying in the wrong cluster when multiple
525525
clusters are accessed through the same address, e.g `localhost:2181`.
526+
9. If the `destructive` CLI argument is passed, `apply` deletes the settings that are
527+
set on the broker but not set in configuration.
526528

527529
The `reset-offsets` command can also make changes in the cluster and should be used carefully.
528530

cmd/topicctl/subcmd/apply.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type applyCmdConfig struct {
3636
retentionDropStepDurationStr string
3737
skipConfirm bool
3838
ignoreFewerPartitionsError bool
39+
destructive bool
3940
sleepLoopDuration time.Duration
4041
failFast bool
4142

@@ -107,6 +108,12 @@ func init() {
107108
false,
108109
"Don't return error when topic's config specifies fewer partitions than it currently has",
109110
)
111+
applyCmd.Flags().BoolVar(
112+
&applyConfig.destructive,
113+
"destructive",
114+
false,
115+
"Deletes topic settings from the broker if the settings are present on the broker but not in the config",
116+
)
110117
applyCmd.Flags().DurationVar(
111118
&applyConfig.sleepLoopDuration,
112119
"sleep-loop-duration",
@@ -259,6 +266,7 @@ func applyTopic(
259266
RetentionDropStepDuration: applyConfig.retentionDropStepDuration,
260267
SkipConfirm: applyConfig.skipConfirm,
261268
IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError,
269+
Destructive: applyConfig.destructive,
262270
SleepLoopDuration: applyConfig.sleepLoopDuration,
263271
TopicConfig: topicConfig,
264272
}

cmd/topicctl/subcmd/rebalance.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ func rebalanceApplyTopic(
307307
AutoContinueRebalance: true, // to continue without prompts
308308
RetentionDropStepDuration: retentionDropStepDuration, // not needed for rebalance
309309
SkipConfirm: true, // to enforce action: rebalance
310+
Destructive: false, // Irrelevant here
310311
SleepLoopDuration: rebalanceConfig.sleepLoopDuration,
311312
TopicConfig: topicConfig,
312313
}

pkg/apply/apply.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type TopicApplierConfig struct {
3737
RetentionDropStepDuration time.Duration
3838
SkipConfirm bool
3939
IgnoreFewerPartitionsError bool
40+
Destructive bool
4041
SleepLoopDuration time.Duration
4142
TopicConfig config.TopicConfig
4243
}
@@ -392,6 +393,8 @@ func (t *TopicApplier) updateSettings(
392393
return err
393394
}
394395

396+
configEntries := []kafka.ConfigEntry{}
397+
395398
if len(diffKeys) > 0 {
396399
diffsTable, err := FormatSettingsDiff(topicSettings, topicInfo.Config, diffKeys)
397400
if err != nil {
@@ -416,6 +419,23 @@ func (t *TopicApplier) updateSettings(
416419
)
417420
}
418421

422+
configEntries, err = topicSettings.ToConfigEntries(diffKeys)
423+
if err != nil {
424+
return err
425+
}
426+
}
427+
428+
if len(missingKeys) > 0 && t.config.Destructive {
429+
log.Infof(
430+
"Found %d key(s) set in cluster but missing from config to be deleted:\n%s",
431+
len(missingKeys),
432+
FormatMissingKeys(topicInfo.Config, missingKeys),
433+
)
434+
435+
configEntries = append(configEntries, topicSettings.ToEmptyConfigEntries(missingKeys)...)
436+
}
437+
438+
if len(configEntries) > 0 {
419439
if t.config.DryRun {
420440
log.Infof("Skipping update because dryRun is set to true")
421441
return nil
@@ -430,11 +450,6 @@ func (t *TopicApplier) updateSettings(
430450
}
431451
log.Infof("OK, updating")
432452

433-
configEntries, err := topicSettings.ToConfigEntries(diffKeys)
434-
if err != nil {
435-
return err
436-
}
437-
438453
_, err = t.adminClient.UpdateTopicConfig(
439454
ctx,
440455
t.topicName,
@@ -446,7 +461,7 @@ func (t *TopicApplier) updateSettings(
446461
}
447462
}
448463

449-
if len(missingKeys) > 0 {
464+
if len(missingKeys) > 0 && !t.config.Destructive {
450465
log.Warnf(
451466
"Found %d key(s) set in cluster but missing from config:\n%s\nThese will be left as-is.",
452467
len(missingKeys),

pkg/apply/apply_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,27 @@ func TestApplyBasicUpdates(t *testing.T) {
8080
applier.topicConfig.Spec.ReplicationFactor = 3
8181
err = applier.Apply(ctx)
8282
require.NotNil(t, err)
83+
applier.topicConfig.Spec.ReplicationFactor = 2
84+
85+
// Settings are not deleted if Destructive is false. They are
86+
// if it is true
87+
delete(applier.topicConfig.Spec.Settings, "cleanup.policy")
88+
err = applier.Apply(ctx)
89+
require.NoError(t, err)
90+
topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true)
91+
require.NoError(t, err)
92+
93+
assert.Equal(t, "delete", topicInfo.Config["cleanup.policy"])
94+
95+
applier.config.Destructive = true
96+
err = applier.Apply(ctx)
97+
require.NoError(t, err)
98+
topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true)
99+
require.NoError(t, err)
100+
101+
_, present := topicInfo.Config["cleanup.policy"]
102+
assert.False(t, present)
103+
83104
}
84105

85106
func TestApplyPlacementUpdates(t *testing.T) {

pkg/config/settings.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,22 @@ func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, erro
346346
return entries, nil
347347
}
348348

349+
// Produces a slice of kafka-go config entries with empty value. Thus used
350+
// for deletion of the setting.
351+
func (t TopicSettings) ToEmptyConfigEntries(keys []string) []kafka.ConfigEntry {
352+
entries := []kafka.ConfigEntry{}
353+
354+
if keys != nil {
355+
for _, key := range keys {
356+
entries = append(
357+
entries,
358+
kafka.ConfigEntry{ConfigName: key, ConfigValue: ""},
359+
)
360+
}
361+
}
362+
return entries
363+
}
364+
349365
// HasKey returns whether the current settings instance contains the argument key.
350366
func (t TopicSettings) HasKey(key string) bool {
351367
_, ok := t[key]

0 commit comments

Comments
 (0)