Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/burrow.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ min-distance=1

[notifier.default]
class-name="http"
cluster="local"
url-open="http://someservice.example.com:1467/v1/event"
interval=60
timeout=5
Expand Down
50 changes: 50 additions & 0 deletions config/default-wecom-post.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{"msgtype": "markdown","markdown": {"content": "
{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}}
{{- $FormatString := "2006-01-02 15:04:05"}}
# Kafka: {{.Cluster}}
ConsumerGroup: {{.Group}}{{- with .Result.Status}}
{{- if eq . 0}}NotFound{{end}}
{{- if eq . 1}}<font color=\"info\">normal</font>{{end}}
{{- if eq . 2}}<font color=\"warning\">lagging</font>{{end}}
{{- if eq . 3}}<font color=\"comment\">abnormal</font>{{end}}
{{- end}}
**Status**: Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}}
{{- if eq . 0}}NotFound{{end}}
{{- if eq . 1}}<font color=\"info\">{{.}}</font>{{end}}
{{- if eq . 2}}<font color=\"warning\">{{.}}</font>{{end}}
{{- if eq . 3}}<font color=\"comment\">{{.}}</font>{{end}}
{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}}
{{printf "Time: %s" (formattimestamp 0 $FormatString)}}
**MaxLagDetails:**\tMaxLag={{.Result.Maxlag|maxlag}}{{- with .Result.Maxlag}}
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
\tCurrentLag={{.CurrentLag}}
\tPartition={{.Partition}}
{{- end}}
{{- $TotalErrors := len .Result.Partitions}}
{{- if $TotalErrors}}
### <font color=\"comment\">{{$TotalErrors}} partitions have problems</font>
>**CountPartitions:**
{{- range $k,$v := .Result.Partitions|partitioncounts}}
{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}}
{{- end}}
**TopicsByStatus:**
{{- range $k,$v := .Result.Partitions|topicsbystatus}}
\t{{$k}}={{$v}}
{{- end}}
**PartitionDetails:**
{{- range .Result.Partitions}}
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
\tCurrentLag={{.CurrentLag}}
\tPartition={{.Partition}}
\tStart={{formattimestamp .Start.Timestamp $FormatString}}
\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}}
\tEnd={{formattimestamp .End.Timestamp $FormatString}}
\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}}
{{- end}}
{{- end}}
"
}}
6 changes: 6 additions & 0 deletions core/internal/helpers/coordinators.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ func (m *MockModule) GetName() string {
return args.String(0)
}

// GetCluster mocks the notifier.Module GetCluster func
func (m *MockModule) GetCluster() string {
args := m.Called()
return args.String(0)
}

// GetGroupAllowlist mocks the notifier.Module GetGroupAllowlist func
func (m *MockModule) GetGroupAllowlist() *regexp.Regexp {
args := m.Called()
Expand Down
2 changes: 2 additions & 0 deletions core/internal/httpserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (hc *Coordinator) configNotifierHTTP(w http.ResponseWriter, r *http.Request
SendClose: viper.GetBool(configRoot + ".send-close"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
Expand Down Expand Up @@ -266,6 +267,7 @@ func (hc *Coordinator) configNotifierEmail(w http.ResponseWriter, r *http.Reques
To: viper.GetString(configRoot + ".to"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
Expand Down
2 changes: 2 additions & 0 deletions core/internal/httpserver/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type httpResponseConfigModuleNotifierHTTP struct {
SendClose bool `json:"send-close"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
Cluster string `json:"cluster"`
}

type httpResponseConfigModuleNotifierSlack struct {
Expand Down Expand Up @@ -239,6 +240,7 @@ type httpResponseConfigModuleNotifierEmail struct {
To string `json:"to"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
Cluster string `json:"cluster"`
}

type httpResponseConfigModuleNotifierNull struct {
Expand Down
13 changes: 11 additions & 2 deletions core/internal/notifier/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
type Module interface {
protocol.Module
GetName() string
GetCluster() string
GetGroupAllowlist() *regexp.Regexp
GetGroupDenylist() *regexp.Regexp
GetLogger() *zap.Logger
Expand Down Expand Up @@ -96,7 +97,7 @@ type Coordinator struct {

// getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there
// is any error, it will panic with an appropriate message describing the problem.
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template) protocol.Module {
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template, cluster string) protocol.Module {
logger := app.Logger.With(
zap.String("type", "module"),
zap.String("coordinator", "notifier"),
Expand All @@ -114,6 +115,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
case "email":
return &EmailNotifier{
Expand All @@ -124,6 +126,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
case "null":
return &NullNotifier{
Expand All @@ -134,6 +137,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
default:
panic("Unknown notifier className provided: " + className)
Expand Down Expand Up @@ -195,6 +199,8 @@ func (nc *Coordinator) Configure() {
groupAllowlist = re
}

cluster := viper.GetString(configRoot + ".cluster")

// Compile the denylist for the consumer groups to not notify for
var groupDenylist *regexp.Regexp
denylist := viper.GetString(configRoot + ".group-denylist")
Expand Down Expand Up @@ -228,7 +234,7 @@ func (nc *Coordinator) Configure() {
templateClose = tmpl.Templates()[0]
}

module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose)
module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose, cluster)
module.Configure(name, configRoot)
nc.modules[name] = module
interval := viper.GetInt64(configRoot + ".interval")
Expand Down Expand Up @@ -437,6 +443,9 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer
for _, genericModule := range nc.modules {
module := genericModule.(Module)

if module.GetCluster() != "" && response.Cluster != module.GetCluster() {
continue
}
// No allowlist means everything passes
groupAllowlist := module.GetGroupAllowlist()
groupDenylist := module.GetGroupDenylist()
Expand Down
52 changes: 31 additions & 21 deletions core/internal/notifier/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,34 +473,39 @@ var notifyModuleTests = []struct {
ExpectClose bool
ExpectID bool
SendOnce bool
Cluster string
}{
/*{1, 0, false, false, false, false, false},
{2, 0, false, false, false, false, false},
{1, 0, true, false, false, false, false},
{1, 0, false, true, false, false, false},
{1, 0, true, true, false, false, false}, */

{1, 1, false, false, true, false, false, false},
{1, 1, false, true, true, false, false, false},
{1, 1, true, false, true, false, false, false},
{1, 1, true, true, true, true, false, true},

{1, 2, false, false, true, false, true, false},
{1, 2, false, true, true, false, true, false},
{1, 2, true, false, true, false, true, false},
{1, 2, true, true, true, false, true, false},
{1, 2, true, true, false, false, true, true},
{1, 2, false, true, true, false, true, true},

{3, 2, false, false, false, false, true, false},
{3, 2, false, true, false, false, true, false},
{3, 2, true, false, false, false, true, false},
{3, 2, true, true, false, false, true, false},

{2, 1, false, false, false, false, false, false},
{2, 1, false, true, false, false, false, false},
{2, 1, true, false, false, false, false, false},
{2, 1, true, true, true, true, false, false},
{1, 1, false, false, true, false, false, false, ""},
{1, 1, false, true, true, false, false, false, "testcluster"},
{1, 1, true, false, true, false, false, false, "unmatchedCluster"},
{1, 1, true, true, true, true, false, true, ""},

{1, 2, false, false, true, false, true, false, ""},
{1, 2, false, true, true, false, true, false, ""},
{1, 2, true, false, true, false, true, false, ""},
{1, 2, true, true, true, false, true, false, ""},
{1, 2, true, true, false, false, true, true, ""},
{1, 2, false, true, true, false, true, true, ""},

{3, 2, false, false, false, false, true, false, ""},
{3, 2, false, true, false, false, true, false, ""},
{3, 2, true, false, false, false, true, false, ""},
{3, 2, true, true, false, false, true, false, ""},

{2, 1, false, false, false, false, false, false, ""},
{2, 1, false, true, false, false, false, false, ""},
{2, 1, true, false, false, false, false, false, ""},
{2, 1, true, true, true, true, false, false, ""},
}

func checkNotifierClusterMatch(cluster string) bool {
return cluster == "" || cluster == "testcluster"
}

func TestCoordinator_checkAndSendResponseToModules(t *testing.T) {
Expand Down Expand Up @@ -559,10 +564,15 @@ func TestCoordinator_checkAndSendResponseToModules(t *testing.T) {
// Set up the mock module and expected calls
mockModule := &helpers.MockModule{}
coordinator.modules["test"] = mockModule
mockModule.On("GetCluster").Return(testSet.Cluster)

if checkNotifierClusterMatch(testSet.Cluster) {
mockModule.On("GetName").Return("test")
mockModule.On("GetGroupAllowlist").Return((*regexp.Regexp)(nil))
mockModule.On("GetGroupDenylist").Return((*regexp.Regexp)(nil))
mockModule.On("AcceptConsumerGroup", response).Return(true)
}

if testSet.ExpectSend {
mockModule.On("Notify", response, mock.MatchedBy(func(s string) bool { return true }), mock.MatchedBy(func(t time.Time) bool { return true }), testSet.ExpectClose).Return()
}
Expand Down
6 changes: 6 additions & 0 deletions core/internal/notifier/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type EmailNotifier struct {
Log *zap.Logger

name string
cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -140,6 +141,11 @@ func (module *EmailNotifier) GetName() string {
return module.name
}

// GetCluster returns the configured name of this module
func (module *EmailNotifier) GetCluster() string {
return module.cluster
}

// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *EmailNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist
Expand Down
6 changes: 5 additions & 1 deletion core/internal/notifier/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,5 +164,9 @@ func maxLagHelper(a *protocol.PartitionStatus) uint64 {
}

func formatTimestamp(timestamp int64, formatString string) string {
return time.Unix(0, timestamp*int64(time.Millisecond)).Format(formatString)
if timestamp > 0 {
return time.Unix(0, timestamp*int64(time.Millisecond)).Format(formatString)
} else {
return time.Now().Format(formatString)
}
}
5 changes: 5 additions & 0 deletions core/internal/notifier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type HTTPNotifier struct {
Log *zap.Logger

name string
cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -125,6 +126,10 @@ func (module *HTTPNotifier) GetName() string {
return module.name
}

func (module *HTTPNotifier) GetCluster() string {
return module.cluster
}

// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *HTTPNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist
Expand Down
6 changes: 6 additions & 0 deletions core/internal/notifier/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type NullNotifier struct {
Log *zap.Logger

name string
cluster string
groupAllowlist *regexp.Regexp
groupDenylist *regexp.Regexp
extras map[string]string
Expand Down Expand Up @@ -76,6 +77,11 @@ func (module *NullNotifier) GetName() string {
return module.name
}

// GetCluster returns the configured name of this module
func (module *NullNotifier) GetCluster() string {
return module.cluster
}

// GetGroupAllowlist returns the compiled group allowlist (or nil, if there is not one)
func (module *NullNotifier) GetGroupAllowlist() *regexp.Regexp {
return module.groupAllowlist
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ go 1.17

require (
github.com/OneOfOne/xxhash v1.2.8
github.com/Shopify/sarama v1.31.1
github.com/Shopify/sarama v1.32.0
github.com/julienschmidt/httprouter v1.3.0
github.com/karrick/goswarm v1.10.0
github.com/pborman/uuid v1.2.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414
github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
github.com/xdg/scram v1.0.5
go.uber.org/zap v1.20.0
go.uber.org/zap v1.21.0
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
Expand All @@ -37,7 +37,7 @@ require (
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.14.2 // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
Expand All @@ -57,7 +57,7 @@ require (
github.com/xdg/stringprep v1.0.3 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
Loading