Skip to content

Commit c8648ec

Browse files
Move k8s leader election library to leader-election repo
1 parent 7064fbc commit c8648ec

File tree

10 files changed

+1185
-0
lines changed

10 files changed

+1185
-0
lines changed

README.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
## Kubernetes Leader Election Library for Go
2+
3+
This library provides a thin wrapper for kubernetes leader election.
4+
5+
```
6+
go get github.com/edgedelta/leader-election
7+
```
8+
9+
10+
### Example usage
11+
12+
```go
13+
func main() {
14+
le, err := New(
15+
WithleaseDuration(15*time.Second),
16+
WithRenewTime(10*time.Second),
17+
WithRetryPeriod(2*time.Second),
18+
WithLeaseNamespace("custom-ns"))
19+
20+
if err != nil {
21+
// Handle error
22+
}
23+
24+
if err := le.Start(); err != nil {
25+
// Handle error
26+
}
27+
28+
// run the leader election check
29+
ctx, cancel := context.WithCancel(context.Background())
30+
go run(le, ctx)
31+
32+
// wait for termination signal
33+
termSignal := make(chan os.Signal, 1)
34+
signal.Notify(termSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
35+
<-termSignal
36+
37+
// stop the leader election check
38+
cancel()
39+
40+
if err := le.Stop(); err != nil {
41+
// Handle error
42+
}
43+
}
44+
45+
func run(le *K8sLeaderEngine, ctx context.Context) {
46+
// do leader stuff as long as le.IsLeader() and ctx is not Done
47+
}
48+
49+
```

go.mod

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
module github.com/edgedelta/leader-election
2+
3+
go 1.16
4+
5+
require (
6+
github.com/cenkalti/backoff v2.2.1+incompatible
7+
github.com/pingcap/errors v0.11.4
8+
k8s.io/api v0.21.0
9+
k8s.io/apimachinery v0.21.0
10+
k8s.io/client-go v0.21.0
11+
)

go.sum

Lines changed: 441 additions & 0 deletions
Large diffs are not rendered by default.

leader_election.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package leaderelection
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"path/filepath"
8+
"sync/atomic"
9+
"time"
10+
11+
"github.com/cenkalti/backoff"
12+
"github.com/pingcap/errors"
13+
"k8s.io/client-go/rest"
14+
15+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16+
k8s "k8s.io/client-go/kubernetes"
17+
)
18+
19+
const (
20+
defaultleaseName = "leader-election-lease"
21+
defaultleaseDuration = 60 * time.Second
22+
defaultrenewDeadline = 30 * time.Second
23+
defaultretryPeriod = 15 * time.Second
24+
timeoutRunLeaderEngine = 15 * time.Second
25+
defaultRandomizationFactor = 0.5
26+
)
27+
28+
var (
29+
GetAPIClient = func() (k8s.Interface, error) {
30+
config, err := rest.InClusterConfig()
31+
if err != nil {
32+
return nil, fmt.Errorf("failed to get in-cluster config, err: %v", err)
33+
}
34+
35+
apiClient, err := k8s.NewForConfig(config)
36+
if err != nil {
37+
return nil, fmt.Errorf("failed to obtain K8s API client, err: %v", err)
38+
}
39+
return apiClient, nil
40+
}
41+
ErrNotRunning = fmt.Errorf("leader engine is not running")
42+
)
43+
44+
var (
45+
initialRetryDelay = 1 * time.Second
46+
maxRetryDelay = 5 * time.Minute
47+
)
48+
49+
func WithleaseDuration(dur time.Duration) K8sLeaderEngineOption {
50+
return func(le *K8sLeaderEngine) {
51+
le.leaseDuration = dur
52+
}
53+
}
54+
55+
func WithRenewTime(dur time.Duration) K8sLeaderEngineOption {
56+
return func(le *K8sLeaderEngine) {
57+
le.renewDeadline = dur
58+
}
59+
}
60+
61+
func WithRetryPeriod(dur time.Duration) K8sLeaderEngineOption {
62+
return func(le *K8sLeaderEngine) {
63+
le.retryPeriod = dur
64+
}
65+
}
66+
67+
func WithHolderIdentity(identity string) K8sLeaderEngineOption {
68+
return func(le *K8sLeaderEngine) {
69+
le.holderIdentity = identity
70+
}
71+
}
72+
73+
func WithLeaseName(name string) K8sLeaderEngineOption {
74+
return func(le *K8sLeaderEngine) {
75+
le.leaseName = name
76+
}
77+
}
78+
79+
func WithLeaseNamespace(namespace string) K8sLeaderEngineOption {
80+
return func(le *K8sLeaderEngine) {
81+
le.leaseNamespace = namespace
82+
}
83+
}
84+
85+
func WithContext(ctx context.Context) K8sLeaderEngineOption {
86+
return func(le *K8sLeaderEngine) {
87+
le.parentCtx = ctx
88+
}
89+
}
90+
91+
func WithLogger(logger Logger) K8sLeaderEngineOption {
92+
return func(le *K8sLeaderEngine) {
93+
le.logger = logger
94+
}
95+
}
96+
97+
func WithErrorLogger(logger Logger) K8sLeaderEngineOption {
98+
return func(le *K8sLeaderEngine) {
99+
le.errorLogger = logger
100+
}
101+
}
102+
103+
func New(opts ...K8sLeaderEngineOption) (*K8sLeaderEngine, error) {
104+
e := &K8sLeaderEngine{
105+
leaseName: defaultleaseName,
106+
leaseDuration: defaultleaseDuration,
107+
renewDeadline: defaultrenewDeadline,
108+
retryPeriod: defaultretryPeriod,
109+
logger: &defaultLogger{},
110+
errorLogger: &defaultLogger{},
111+
stopped: make(chan struct{}),
112+
}
113+
114+
for _, o := range opts {
115+
o(e)
116+
}
117+
118+
e.leaseNamespace = e.getResourceNamespace()
119+
if e.parentCtx == nil {
120+
e.parentCtx = context.Background()
121+
}
122+
123+
e.ctx, e.ctxCancel = context.WithCancel(e.parentCtx)
124+
err := doWithExpBackoff(e.initializeLeaderEngine, initialRetryDelay, maxRetryDelay)
125+
if err != nil {
126+
return nil, fmt.Errorf("failed to initialize leader engine, err: %v", err)
127+
}
128+
129+
e.logger.Log("Initialized leader engine with lease name: %q, namespace: %q", e.leaseName, e.leaseNamespace)
130+
return e, nil
131+
}
132+
133+
// Start the leader engine and block until a leader is elected.
134+
func (le *K8sLeaderEngine) Start() error {
135+
if !atomic.CompareAndSwapInt32(&le.running, 0, 1) {
136+
return fmt.Errorf("leader engine is already started")
137+
}
138+
139+
go le.runLeaderEngine()
140+
timeout := time.After(timeoutRunLeaderEngine)
141+
ticker := time.NewTicker(time.Second)
142+
defer ticker.Stop()
143+
for {
144+
select {
145+
case <-ticker.C:
146+
leaderIdentity := le.GetLeader()
147+
if leaderIdentity != "" {
148+
le.logger.Log("Leader election started. Current leader is %q", leaderIdentity)
149+
return nil
150+
}
151+
case <-timeout:
152+
return fmt.Errorf("leader check timed out after %s. Stop the engine and try again", timeoutRunLeaderEngine)
153+
}
154+
}
155+
}
156+
157+
// Stop the engine and wait until the goroutines have stopped
158+
func (le *K8sLeaderEngine) Stop() error {
159+
if !atomic.CompareAndSwapInt32(&le.running, 1, 0) {
160+
return fmt.Errorf("leader engine is not started or already stopped")
161+
}
162+
163+
le.ctxCancel()
164+
le.logger.Log("Leader election engine cancelled internal context, will wait until stopped signal is recieved")
165+
<-le.stopped
166+
le.logger.Log("Leader election engine stopped")
167+
return nil
168+
}
169+
170+
func (le *K8sLeaderEngine) GetLeader() string {
171+
le.leaderIdentityMutex.Lock()
172+
defer le.leaderIdentityMutex.Unlock()
173+
return le.currentLeaderIdentity
174+
}
175+
176+
func (le *K8sLeaderEngine) IsLeader() bool {
177+
return le.holderIdentity == le.GetLeader()
178+
}
179+
180+
func (le *K8sLeaderEngine) initializeLeaderEngine() error {
181+
var err error
182+
if le.holderIdentity == "" {
183+
le.holderIdentity, err = os.Hostname()
184+
if err != nil {
185+
return fmt.Errorf("failed to obtain hostname, err: %v", err)
186+
}
187+
}
188+
189+
le.logger.Log("Initializing leader engine with holder identity %q", le.holderIdentity)
190+
191+
apiClient, err := GetAPIClient()
192+
if err != nil {
193+
return err
194+
}
195+
le.apiClient = apiClient
196+
le.coreClient = apiClient.CoreV1()
197+
le.coordinationClient = apiClient.CoordinationV1()
198+
_, err = le.coordinationClient.Leases(le.leaseNamespace).Get(le.parentCtx, le.leaseName, metav1.GetOptions{})
199+
if err != nil && !errors.IsNotFound(err) {
200+
return fmt.Errorf("failed to obtain leases from namespace %q, err: %v", le.leaseNamespace, err)
201+
}
202+
203+
le.leaderElector, err = le.newLeaderElector(le.parentCtx)
204+
if err != nil {
205+
return fmt.Errorf("failed to initialize leader elector process, err: %v", err)
206+
}
207+
le.logger.Log("Leader election for %q is initialized", le.holderIdentity)
208+
return nil
209+
}
210+
211+
func (le *K8sLeaderEngine) runLeaderEngine() {
212+
for {
213+
le.logger.Log("Starting leader election process for %q under namespace %q", le.holderIdentity, le.leaseNamespace)
214+
le.leaderElector.Run(le.ctx)
215+
select {
216+
case <-le.ctx.Done():
217+
le.logger.Log("Leader election engine is stopping")
218+
le.stopped <- struct{}{}
219+
return
220+
default:
221+
le.logger.Log("%q lost the leader lease", le.holderIdentity)
222+
}
223+
}
224+
}
225+
226+
func (le *K8sLeaderEngine) getResourceNamespace() string {
227+
namespace := os.Getenv("K8S_RESOURCE_NAMESPACE")
228+
if namespace != "" {
229+
return namespace
230+
}
231+
le.logger.Log("Environment variable K8S_RESOURCE_NAMESPACE is unset. Falling back to service account's namespace")
232+
return le.getLocalResourceNamespace()
233+
}
234+
235+
func (le *K8sLeaderEngine) getLocalResourceNamespace() string {
236+
namespaceFilePath := filepath.Join(ServiceAccountMountPath, "namespace")
237+
if _, err := os.Stat(namespaceFilePath); os.IsNotExist(err) {
238+
le.logger.Log("Namespace file %q is not found. Will use 'default' namespace", namespaceFilePath)
239+
return "default"
240+
}
241+
ns, err := os.ReadFile(namespaceFilePath)
242+
if err != nil {
243+
le.logger.Log("Failed to access namespace file %q, err: %v. Will use 'default' namespace.", err)
244+
return "default"
245+
}
246+
return string(ns)
247+
}
248+
249+
func doWithExpBackoff(f func() error, initialInterval, timeout time.Duration) error {
250+
b := backoff.NewExponentialBackOff()
251+
b.RandomizationFactor = defaultRandomizationFactor
252+
b.MaxElapsedTime = timeout
253+
b.InitialInterval = initialInterval
254+
return backoff.Retry(f, b)
255+
}

0 commit comments

Comments
 (0)