Skip to content

Commit a405e2c

Browse files
dhananjay-ngGouthamML
authored andcommitted
Adding retry with exponential backoff to fetch NodeMetadata during node driver startup
1 parent 8cd0066 commit a405e2c

File tree

11 files changed

+2709
-132
lines changed

11 files changed

+2709
-132
lines changed

pkg/csi-util/utils.go

Lines changed: 79 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"sync"
2828
"time"
2929

30+
"k8s.io/apimachinery/pkg/util/wait"
31+
3032
"github.com/container-storage-interface/spec/lib/go/csi"
3133
"github.com/oracle/oci-go-sdk/v65/core"
3234
"go.uber.org/zap"
@@ -104,15 +106,19 @@ type FSSVolumeHandler struct {
104106
FsExportPath string
105107
}
106108

107-
type NodeIpFamily struct {
108-
PreferredNodeIpFamily string
109-
Ipv4Enabled bool
110-
Ipv6Enabled bool
109+
type NodeMetadata struct {
110+
PreferredNodeIpFamily string
111+
Ipv4Enabled bool
112+
Ipv6Enabled bool
113+
AvailabilityDomain string
114+
FullAvailabilityDomain string
115+
IsNodeMetadataLoaded bool
111116
}
112117

113118
// CSIConfig represents the structure of the ConfigMap data.
114119
type CSIConfig struct {
115120
Lustre *DriverConfig `yaml:"lustre"`
121+
IsLoaded bool
116122
}
117123

118124
// DriverConfig represents driver-specific configurations.
@@ -136,25 +142,75 @@ func (u *Util) LookupNodeID(k kubernetes.Interface, nodeName string) (string, er
136142
return n.Spec.ProviderID, nil
137143
}
138144

139-
func (u *Util) LookupNodeAvailableDomain(k kubernetes.Interface, nodeID string) (string, string, error) {
140-
n, err := k.CoreV1().Nodes().Get(context.Background(), nodeID, metav1.GetOptions{})
145+
func (u *Util) WaitForKubeApiServerToBeReachableWithContext(ctx context.Context, k kubernetes.Interface, backOffCap time.Duration) {
146+
147+
waitForKubeApiServerCtx, waitForKubeApiServerCtxCancel := context.WithTimeout(ctx, time.Second * 45)
148+
defer waitForKubeApiServerCtxCancel()
149+
150+
backoff := wait.Backoff{
151+
Duration: 1 * time.Second,
152+
Factor: 2.0,
153+
Steps: 5,
154+
Cap: backOffCap,
155+
}
156+
157+
wait.ExponentialBackoffWithContext(
158+
waitForKubeApiServerCtx,
159+
backoff,
160+
func(waitForKubeApiServerCtx context.Context) (bool, error) {
161+
attemptCtx, attemptCancel := context.WithTimeout(waitForKubeApiServerCtx, backoff.Step())
162+
defer attemptCancel()
163+
_, err := k.CoreV1().RESTClient().Get().AbsPath("/version").Do(attemptCtx).Raw()
164+
if err != nil {
165+
u.Logger.With(zap.Error(err)).Errorf("Waiting for kube api server to be reachable, Retrying..")
166+
return false, nil
167+
}
168+
u.Logger.Infof("Kube Api Server is Reachable")
169+
return true, nil
170+
},
171+
)
172+
}
173+
174+
func (u *Util) LoadNodeMetadataFromApiServer(ctx context.Context, k kubernetes.Interface, nodeID string, nodeMetadata *NodeMetadata) (error) {
175+
176+
u.WaitForKubeApiServerToBeReachableWithContext(ctx, k, time.Second * 30)
177+
178+
node, err := k.CoreV1().Nodes().Get(ctx, nodeID, metav1.GetOptions{})
179+
141180
if err != nil {
142-
u.Logger.With(zap.Error(err)).With("nodeId", nodeID).Error("Failed to get Node by name.")
143-
return "", "",fmt.Errorf("failed to get node %s", nodeID)
181+
u.Logger.With(zap.Error(err)).With("nodeId", nodeID).Error("Failed to get Node information from kube api server, Please check if kube api server is accessible.")
182+
return fmt.Errorf("Failed to get node information from kube api server, please check if kube api server is accessible.")
144183
}
145-
if n.Labels != nil {
146-
ad, ok := n.Labels[kubeAPI.LabelTopologyZone]
184+
185+
var ok bool
186+
if node.Labels != nil {
187+
nodeMetadata.AvailabilityDomain, ok = node.Labels[kubeAPI.LabelTopologyZone]
147188
if !ok {
148-
ad, ok = n.Labels[kubeAPI.LabelZoneFailureDomain]
189+
nodeMetadata.AvailabilityDomain, ok = node.Labels[kubeAPI.LabelZoneFailureDomain]
149190
}
150191
if ok {
151-
fullAdName, _ := n.Labels[AvailabilityDomainLabel]
152-
return ad, fullAdName, nil
192+
nodeMetadata.FullAvailabilityDomain, _ = node.Labels[AvailabilityDomainLabel]
193+
}
194+
195+
if preferredIpFamily, ok := node.Labels[LabelIpFamilyPreferred]; ok {
196+
nodeMetadata.PreferredNodeIpFamily = FormatValidIpStackInK8SConvention(preferredIpFamily)
197+
}
198+
if ipv4Enabled, ok := node.Labels[LabelIpFamilyIpv4]; ok && strings.EqualFold(ipv4Enabled, "true") {
199+
nodeMetadata.Ipv4Enabled = true
200+
}
201+
if ipv6Enabled, ok := node.Labels[LabelIpFamilyIpv6]; ok && strings.EqualFold(ipv6Enabled, "true") {
202+
nodeMetadata.Ipv6Enabled = true
153203
}
154204
}
155-
errMsg := fmt.Sprintf("Did not find the label for the fault domain. Checked Topology Labels: %s, %s", kubeAPI.LabelTopologyZone, kubeAPI.LabelZoneFailureDomain)
156-
u.Logger.With("nodeId", nodeID).Error(errMsg)
157-
return "","", fmt.Errorf(errMsg)
205+
if !nodeMetadata.Ipv4Enabled && !nodeMetadata.Ipv6Enabled {
206+
nodeMetadata.PreferredNodeIpFamily = Ipv4Stack
207+
nodeMetadata.Ipv4Enabled = true
208+
u.Logger.With("nodeId", nodeID, "nodeMetadata", nodeMetadata).Info("No IP family labels identified on node, defaulting to ipv4.")
209+
} else {
210+
u.Logger.With("nodeId", nodeID, "nodeMetadata", nodeMetadata).Info("Node IP family identified.")
211+
}
212+
nodeMetadata.IsNodeMetadataLoaded = true
213+
return nil
158214
}
159215

160216
// waitForPathToExist waits for for a given filesystem path to exist.
@@ -524,37 +580,6 @@ func GetIsFeatureEnabledFromEnv(logger *zap.SugaredLogger, featureName string, d
524580
return enableFeature
525581
}
526582

527-
func GetNodeIpFamily(k kubernetes.Interface, nodeID string, logger *zap.SugaredLogger) (*NodeIpFamily, error) {
528-
n, err := k.CoreV1().Nodes().Get(context.Background(), nodeID, metav1.GetOptions{})
529-
530-
if err != nil {
531-
logger.With(zap.Error(err)).With("nodeId", nodeID).Error("Failed to get Node information from kube api server, Please check if kube api server is accessible.")
532-
return nil, fmt.Errorf("Failed to get node information from kube api server, please check if kube api server is accessible.")
533-
}
534-
535-
nodeIpFamily := &NodeIpFamily{}
536-
537-
if n.Labels != nil {
538-
if preferredIpFamily, ok := n.Labels[LabelIpFamilyPreferred]; ok {
539-
nodeIpFamily.PreferredNodeIpFamily = FormatValidIpStackInK8SConvention(preferredIpFamily)
540-
}
541-
if ipv4Enabled, ok := n.Labels[LabelIpFamilyIpv4]; ok && strings.EqualFold(ipv4Enabled, "true") {
542-
nodeIpFamily.Ipv4Enabled = true
543-
}
544-
if ipv6Enabled, ok := n.Labels[LabelIpFamilyIpv6]; ok && strings.EqualFold(ipv6Enabled, "true") {
545-
nodeIpFamily.Ipv6Enabled = true
546-
}
547-
}
548-
if !nodeIpFamily.Ipv4Enabled && !nodeIpFamily.Ipv6Enabled {
549-
nodeIpFamily.PreferredNodeIpFamily = Ipv4Stack
550-
nodeIpFamily.Ipv4Enabled = true
551-
logger.With("nodeId", nodeID, "nodeIpFamily", *nodeIpFamily).Info("No IP family labels identified on node, defaulting to ipv4.")
552-
} else {
553-
logger.With("nodeId", nodeID, "nodeIpFamily", *nodeIpFamily).Info("Node IP family identified.")
554-
}
555-
556-
return nodeIpFamily, nil
557-
}
558583
func ConvertIscsiIpFromIpv4ToIpv6(ipv4IscsiIp string) (string, error) {
559584
ipv4IscsiIP := net.ParseIP(ipv4IscsiIp).To4()
560585
if ipv4IscsiIP == nil {
@@ -609,30 +634,27 @@ func IsValidIpFamilyPresentInClusterIpFamily(clusterIpFamily string) bool {
609634
return len(clusterIpFamily) > 0 && (strings.Contains(clusterIpFamily, Ipv4Stack) || strings.Contains(clusterIpFamily, Ipv6Stack))
610635
}
611636

612-
func IsIpv6SingleStackNode(nodeIpFamily *NodeIpFamily) bool {
613-
if nodeIpFamily == nil {
637+
func IsIpv6SingleStackNode(nodeMetadata *NodeMetadata) bool {
638+
if nodeMetadata == nil {
614639
return false
615640
}
616-
return nodeIpFamily.Ipv6Enabled == true && nodeIpFamily.Ipv4Enabled == false
641+
return nodeMetadata.Ipv6Enabled == true && nodeMetadata.Ipv4Enabled == false
617642
}
618643

619-
func LoadCSIConfigFromConfigMap(k kubernetes.Interface, configMapName string, logger *zap.SugaredLogger) (*CSIConfig) {
644+
func LoadCSIConfigFromConfigMap(csiConfig *CSIConfig, k kubernetes.Interface, configMapName string, logger *zap.SugaredLogger) {
620645
// Get the ConfigMap
621646
// Parse the configuration for each driver
622-
config := &CSIConfig{}
623647
cm, err := k.CoreV1().ConfigMaps("kube-system").Get(context.Background(), configMapName, metav1.GetOptions{})
624648
if err != nil {
625649
logger.Debugf("Failed to load ConfigMap %v due to error %v. Using default configuration.", configMapName, err)
626-
return config
650+
return
627651
}
628652

629653
if lustreConfig, exists := cm.Data["lustre"]; exists {
630-
if err := yaml.Unmarshal([]byte(lustreConfig), &config.Lustre); err != nil {
654+
if err := yaml.Unmarshal([]byte(lustreConfig), &csiConfig.Lustre); err != nil {
631655
logger.Debugf("Failed to parse lustre key in config map %v. Error: %v",configMapName, err)
632-
return config
656+
return
633657
}
634658
logger.Infof("Successfully loaded ConfigMap %v. Using customized configuration for csi driver.", configMapName)
635659
}
636-
637-
return config
638660
}

pkg/csi-util/utils_test.go

Lines changed: 81 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,20 @@
1515
package csi_util
1616

1717
import (
18+
"context"
1819
"fmt"
20+
"log"
21+
"os"
1922
"reflect"
2023
"regexp"
2124
"strings"
2225
"testing"
26+
"time"
2327

2428
"github.com/oracle/oci-cloud-controller-manager/pkg/util"
2529
"github.com/oracle/oci-go-sdk/v65/core"
2630
"go.uber.org/zap"
31+
"k8s.io/client-go/kubernetes"
2732
"k8s.io/utils/pointer"
2833
)
2934

@@ -344,18 +349,21 @@ func Test_DiskByPathPatternForPV(t *testing.T) {
344349
}
345350
}
346351

347-
func Test_GetNodeIpFamily(t *testing.T) {
352+
func Test_LoadNodeMetadataFromApiServer(t *testing.T) {
348353

349354
tests := []struct {
350355
name string
351356
nodeName string
352-
want *NodeIpFamily
357+
want *NodeMetadata
358+
kubeclient kubernetes.Interface
353359
err error
354360
}{
355361
{
356362
name: "should return ipv6 for ipv6 preferred node",
357363
nodeName: "ipv6Preferred",
358-
want: &NodeIpFamily{
364+
want: &NodeMetadata{
365+
FullAvailabilityDomain: "xyz:PHX-AD-3",
366+
AvailabilityDomain: "PHX-AD-3",
359367
PreferredNodeIpFamily: Ipv6Stack,
360368
Ipv4Enabled: true,
361369
Ipv6Enabled: true,
@@ -364,16 +372,18 @@ func Test_GetNodeIpFamily(t *testing.T) {
364372
{
365373
name: "should return ipv4 for ipv4 preferred node",
366374
nodeName: "ipv4Preferred",
367-
want: &NodeIpFamily{
375+
want: &NodeMetadata{
368376
PreferredNodeIpFamily: Ipv4Stack,
377+
AvailabilityDomain: "PHX-AD-3",
369378
Ipv4Enabled: true,
370379
Ipv6Enabled: true,
371380
},
372381
},
373382
{
374383
name: "should return default IPv4 family for no ip preference",
375384
nodeName: "noIpPreference",
376-
want: &NodeIpFamily{
385+
want: &NodeMetadata{
386+
AvailabilityDomain: "PHX-AD-3",
377387
PreferredNodeIpFamily: Ipv4Stack,
378388
Ipv4Enabled: true,
379389
Ipv6Enabled: false,
@@ -382,21 +392,73 @@ func Test_GetNodeIpFamily(t *testing.T) {
382392
{
383393
name: "should return error for invalid node",
384394
nodeName: "InvalidNode",
385-
want: nil,
395+
want: &NodeMetadata{},
386396
err: fmt.Errorf("Failed to get node information from kube api server, please check if kube api server is accessible."),
387397
},
398+
{
399+
name: "should return error for node with any ad labels",
400+
nodeName: "nodeWithMissingAdLabels",
401+
want: &NodeMetadata{
402+
PreferredNodeIpFamily: Ipv4Stack,
403+
Ipv4Enabled: true,
404+
Ipv6Enabled: false,
405+
},
406+
err: fmt.Errorf("Failed to get node information from kube api server, please check if kube api server is accessible."),
407+
},
408+
{
409+
name: "Call to get node info is done even if health check fails",
410+
nodeName: "ipv4Preferred",
411+
want: &NodeMetadata{
412+
PreferredNodeIpFamily: Ipv4Stack,
413+
AvailabilityDomain: "PHX-AD-3",
414+
Ipv4Enabled: true,
415+
Ipv6Enabled: true,
416+
},
417+
kubeclient: &util.MockKubeClientWithFailingRestClient{
418+
CoreClient: &util.MockCoreClientWithFailingRestClient{},
419+
},
420+
},
421+
{
422+
name: "should return error for invalid node and failing health check",
423+
nodeName: "InvalidNode",
424+
want: &NodeMetadata{},
425+
err: fmt.Errorf("Failed to get node information from kube api server, please check if kube api server is accessible."),
426+
kubeclient: &util.MockKubeClientWithFailingRestClient{
427+
CoreClient: &util.MockCoreClientWithFailingRestClient{},
428+
},
429+
},
430+
}
431+
432+
logger, _ := zap.NewDevelopment()
433+
sugar := logger.Sugar()
434+
u := &Util{
435+
Logger: sugar,
388436
}
437+
389438
for _, tt := range tests {
390439
t.Run(tt.name, func(t *testing.T) {
391-
got, err := GetNodeIpFamily(&util.MockKubeClient{
392-
CoreClient: &util.MockCoreClient{},
393-
}, tt.nodeName, zap.S())
394-
if (tt.want != got) && (tt.want.PreferredNodeIpFamily != got.PreferredNodeIpFamily ||
395-
tt.want.Ipv6Enabled != got.Ipv6Enabled || tt.want.Ipv4Enabled != got.Ipv4Enabled) {
396-
t.Errorf("GetNodeIpFamily() = %v, want %v", got, tt.want)
440+
441+
442+
log.SetOutput(os.Stdout)
443+
nodeMetadata := &NodeMetadata{}
444+
ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
445+
defer cancel()
446+
447+
448+
var k kubernetes.Interface
449+
if tt.kubeclient != nil {
450+
k = tt.kubeclient
451+
} else {
452+
k = &util.MockKubeClient{CoreClient: &util.MockCoreClient{}}
453+
}
454+
455+
err := u.LoadNodeMetadataFromApiServer(ctx, k, tt.nodeName, nodeMetadata)
456+
if (tt.want != nodeMetadata) && (tt.want.PreferredNodeIpFamily != nodeMetadata.PreferredNodeIpFamily ||
457+
tt.want.Ipv6Enabled != nodeMetadata.Ipv6Enabled || tt.want.Ipv4Enabled != nodeMetadata.Ipv4Enabled) {
458+
t.Errorf("LoadNodeMetadataFromApiServer() = %v, want %v", nodeMetadata, tt.want)
397459
}
398460
if err != nil && !strings.EqualFold(tt.err.Error(), err.Error()) {
399-
t.Errorf("GetNodeIpFamily() = %v, want %v", err, tt.err)
461+
t.Errorf("LoadNodeMetadataFromApiServer() = %v, want %v", err, tt.err)
400462
}
401463

402464
})
@@ -764,18 +826,20 @@ func Test_LoadCSIConfigFromConfigMap(t *testing.T) {
764826
{
765827
name: "Return default config if config map is not present",
766828
configMapName: "invalid",
767-
want: &CSIConfig{},
829+
want: &CSIConfig{
830+
},
768831
},
769832
}
770833

771834
for _, tt := range tests {
772835
t.Run(tt.name, func(t *testing.T) {
773-
got := LoadCSIConfigFromConfigMap(&util.MockKubeClient{
836+
csiConfig := &CSIConfig{}
837+
LoadCSIConfigFromConfigMap(csiConfig, &util.MockKubeClient{
774838
CoreClient: &util.MockCoreClient{},
775839
}, tt.configMapName, zap.S())
776840

777-
if !reflect.DeepEqual(tt.want, got) {
778-
t.Errorf("LoadCSIConfigFromConfigMap() => got : %v, want : %v", got, tt.want)
841+
if !reflect.DeepEqual(tt.want, csiConfig) {
842+
t.Errorf("LoadCSIConfigFromConfigMap() => got : %v, want : %v", csiConfig, tt.want)
779843
}
780844
})
781845
}

0 commit comments

Comments
 (0)