Skip to content

Commit a2ec1ec

Browse files
julianKatzhajiler
authored andcommitted
Hooked up NodeGetInfo to provide disk-type labels from nodes
1 parent 7bde031 commit a2ec1ec

File tree

4 files changed

+75
-0
lines changed

4 files changed

+75
-0
lines changed

cmd/gce-pd-csi-driver/main.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"strings"
2828
"time"
2929

30+
"k8s.io/client-go/kubernetes"
31+
"k8s.io/client-go/rest"
3032
"k8s.io/klog/v2"
3133
"k8s.io/utils/strings/slices"
3234
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/constants"
@@ -99,6 +101,8 @@ var (
99101

100102
diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a disk-type.gke.io/[disk-type] topology label when the StorageClass has the use-allowed-disk-topology parameter set to true. That topology label is included in the Topologies returned in CreateVolumeResponse. This flag is disabled by default.")
101103

104+
dynamicVolumes = flag.Bool("dynamic-volumes", false, "If set to true, the CSI driver will automatically select a compatible disk type based on the presence of the dynamic-volume parameter and disk types defined in the StorageClass. Disabled by default.")
105+
102106
diskCacheSyncPeriod = flag.Duration("disk-cache-sync-period", 10*time.Minute, "Period for the disk cache to check the /dev/disk/by-id/ directory and evaluate the symlinks")
103107

104108
enableDiskSizeValidation = flag.Bool("enable-disk-size-validation", false, "If set to true, the driver will validate that the requested disk size is matches the physical disk size. This flag is disabled by default.")
@@ -256,6 +260,7 @@ func handle() {
256260
args := &driver.GCEControllerServerArgs{
257261
EnableDiskTopology: *diskTopology,
258262
EnableDiskSizeValidation: *enableDiskSizeValidation,
263+
EnableDynamicVolumes: *dynamicVolumes,
259264
}
260265

261266
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag, args)
@@ -299,6 +304,16 @@ func handle() {
299304
MetricsManager: metricsManager,
300305
DeviceCache: deviceCache,
301306
}
307+
308+
if *dynamicVolumes {
309+
klog.V(2).Infof("Setting up kubeClient")
310+
kc, err := instantiateKubeClient()
311+
if err != nil {
312+
klog.Fatalf("Failed to instantiate Kubernetes client: %v", err)
313+
}
314+
nsArgs.KubeClient = kc
315+
nsArgs.EnableDynamicVolumes = *dynamicVolumes
316+
}
302317
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
303318

304319
if *maxConcurrentFormatAndMount > 0 {
@@ -467,3 +482,15 @@ func setupDataCache(ctx context.Context, nodeName string, nodeId string) error {
467482
klog.V(4).Infof("LSSD caching is setup for the Data Cache enabled node %s", nodeName)
468483
return nil
469484
}
485+
486+
func instantiateKubeClient() (*kubernetes.Clientset, error) {
487+
cfg, err := rest.InClusterConfig()
488+
if err != nil {
489+
return nil, fmt.Errorf("failed to create REST Config for k8s client: %w", err)
490+
}
491+
kubeClient, err := kubernetes.NewForConfig(cfg)
492+
if err != nil {
493+
return nil, fmt.Errorf("failed to create k8s client: %w", err)
494+
}
495+
return kubeClient, nil
496+
}

pkg/common/utils.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,11 @@ func MapNumber(vCPUs int64, limitMap []constants.MachineHyperdiskLimit) int64 {
491491
return 15
492492
}
493493

494+
// HasDiskTypeLabelKeyPrefix checks if the label key starts with the DiskTypeKeyPrefix.
495+
func HasDiskTypeLabelKeyPrefix(labelKey string) bool {
496+
return strings.HasPrefix(labelKey, constants.DiskTypeKeyPrefix+"/")
497+
}
498+
494499
func DiskTypeLabelKey(diskType string) string {
495500
return fmt.Sprintf("%s/%s", constants.DiskTypeKeyPrefix, diskType)
496501
}

pkg/gce-pd-csi-driver/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ type GCEControllerServer struct {
132132
type GCEControllerServerArgs struct {
133133
EnableDiskTopology bool
134134
EnableDiskSizeValidation bool
135+
EnableDynamicVolumes bool
135136
}
136137

137138
type MultiZoneVolumeHandleConfig struct {

pkg/gce-pd-csi-driver/node.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import (
3535
"k8s.io/klog/v2"
3636
"k8s.io/mount-utils"
3737

38+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39+
"k8s.io/client-go/kubernetes"
3840
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
3941
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/constants"
4042
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
@@ -83,6 +85,9 @@ type GCENodeServer struct {
8385
metricsManager *metrics.MetricsManager
8486
// A cache of the device paths for the volumes that are attached to the node.
8587
DeviceCache *linkcache.DeviceCache
88+
89+
KubeClient kubernetes.Interface
90+
EnableDynamicVolumes bool
8691
}
8792

8893
type NodeServerArgs struct {
@@ -101,6 +106,10 @@ type NodeServerArgs struct {
101106

102107
MetricsManager *metrics.MetricsManager
103108
DeviceCache *linkcache.DeviceCache
109+
110+
KubeClient kubernetes.Interface
111+
EnableDiskTopology bool
112+
EnableDynamicVolumes bool
104113
}
105114

106115
var _ csi.NodeServer = &GCENodeServer{}
@@ -717,6 +726,17 @@ func (ns *GCENodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRe
717726
Segments: map[string]string{constants.TopologyKeyZone: ns.MetadataService.GetZone()},
718727
}
719728

729+
if ns.EnableDynamicVolumes {
730+
labels, err := ns.fetchGKETopologyLabels(ctx, ns.MetadataService.GetName())
731+
if err != nil {
732+
return nil, fmt.Errorf("failed to fetch GKE topology labels: %v", err)
733+
}
734+
735+
for k, v := range labels {
736+
top.Segments[k] = v
737+
}
738+
}
739+
720740
nodeID := common.CreateNodeID(ns.MetadataService.GetProject(), ns.MetadataService.GetZone(), ns.MetadataService.GetName())
721741

722742
volumeLimits, err := ns.GetVolumeLimits(ctx)
@@ -970,3 +990,25 @@ func GetAttachLimitsOverrideFromNodeLabel(ctx context.Context, nodeName string)
970990
}
971991
return 0, nil
972992
}
993+
994+
// fetchGKETopologyLabels retrieves the node labels with the prefix
995+
// `topology.gke.io/` for the specified node.
996+
func (ns *GCENodeServer) fetchGKETopologyLabels(ctx context.Context, nodeName string) (map[string]string, error) {
997+
klog.V(2).Infof("Retrieving node topology labels for node %q", nodeName)
998+
999+
node, err := ns.KubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
1000+
if err != nil {
1001+
// Q: Should we retry if we fail to get the node?
1002+
return nil, err
1003+
}
1004+
1005+
topology := make(map[string]string)
1006+
for k, v := range node.GetLabels() {
1007+
if common.HasDiskTypeLabelKeyPrefix(k) {
1008+
klog.V(2).Infof("Including node topology label %q=%q", k, v)
1009+
topology[k] = v
1010+
}
1011+
}
1012+
1013+
return topology, nil
1014+
}

0 commit comments

Comments
 (0)