Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3f55f89
feat: add support for buffer size and read chunk streams
kanya-approve May 27, 2025
146fbfd
feat: enhance docker build
kanya-approve May 27, 2025
c2a9385
fix: shouldn't have been pulling in build image as build platform
kanya-approve May 27, 2025
b29559b
fix: correct chunk streams
kanya-approve May 27, 2025
0a4fc5a
fix: BufferSize isn't needed with vfs read ahead
kanya-approve May 27, 2025
f43981b
style: move ChunkStreams next to ChunkSizeLimit
kanya-approve May 27, 2025
2150458
Merge branch 'master' into master
rokroskar Jun 4, 2025
5a62a10
remove new folder for building
kanya-approve Jul 4, 2025
82e6e9a
CGO_ENABLED back from 0 to 1
kanya-approve Jul 4, 2025
b082fc0
remove unneeded -w on go build
kanya-approve Jul 4, 2025
a305892
use here-documents from # syntax=docker/dockerfile:1.7
kanya-approve Jul 4, 2025
f2b143d
Merge branch 'master' of https://github.com/kanya-approve/csi-rclone
kanya-approve Jul 4, 2025
24587f6
Merge branch 'master' into master
kanya-approve Jul 4, 2025
364dd14
ran format
kanya-approve Jul 4, 2025
45257f8
add extraEnv to csiNodepluginRclone.rclone
kanya-approve Jul 15, 2025
b674afa
Update csi-nodeplugin-rclone.yaml
kanya-approve Jul 15, 2025
d52de2c
Update values.yaml
kanya-approve Jul 15, 2025
01c9b0a
Merge branch 'SwissDataScienceCenter:master' into master
kanya-approve Sep 14, 2025
c3dfb47
add extraArgs for rclone
kanya-approve Sep 14, 2025
7956a57
semicolon delimiter instead
kanya-approve Sep 16, 2025
5c3cfec
switch to json EXTRA_ARGS
kanya-approve Sep 16, 2025
2fe5b14
attempt to persist volumes for easy remounting
kanya-approve Oct 22, 2025
f9ef3a2
Merge branch 'SwissDataScienceCenter:master' into master
kanya-approve Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions pkg/rclone/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,26 @@ func NewNodeServer(csiDriver *csicommon.CSIDriver, cacheDir string, cacheSize st
}
rcloneOps := NewRclone(kubeClient, rclonePort, cacheDir, cacheSize)

return &nodeServer{
// Use kubelet plugin directory for state persistence
stateFile := "/var/lib/kubelet/plugins/csi-rclone/mounted_volumes.json"

ns := &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(csiDriver),
mounter: &mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: utilexec.New(),
},
RcloneOps: rcloneOps,
}, nil
RcloneOps: rcloneOps,
mountedVolumes: make(map[string]*MountedVolume),
stateFile: stateFile,
}

// Load persisted state on startup
if err := ns.loadState(); err != nil {
klog.Warningf("Failed to load persisted volume state: %v", err)
}

return ns, nil
}

func NewControllerServer(csiDriver *csicommon.CSIDriver) *controllerServer {
Expand Down Expand Up @@ -114,7 +126,13 @@ func (d *Driver) Run() error {
)
d.server = s
if d.ns != nil && d.ns.RcloneOps != nil {
return d.ns.RcloneOps.Run()
onDaemonReady := func() error {
if d.ns != nil {
return d.ns.remountTrackedVolumes(context.Background())
}
return nil
}
return d.ns.RcloneOps.Run(onDaemonReady)
}
s.Wait()
return nil
Expand Down
158 changes: 158 additions & 0 deletions pkg/rclone/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ package rclone

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"

"gopkg.in/ini.v1"
Expand All @@ -37,6 +40,23 @@ type nodeServer struct {
*csicommon.DefaultNodeServer
mounter *mount.SafeFormatAndMount
RcloneOps Operations

// Track mounted volumes for automatic remounting
mountedVolumes map[string]*MountedVolume
mutex sync.RWMutex
stateFile string
}

type MountedVolume struct {
VolumeId string
TargetPath string
Remote string
RemotePath string
ConfigData string
ReadOnly bool
Parameters map[string]string
SecretName string
SecretNamespace string
}

// Mounting Volume (Preparation)
Expand Down Expand Up @@ -141,6 +161,10 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}
return nil, status.Error(codes.Internal, err.Error())
}

// Track the mounted volume for automatic remounting
ns.trackMountedVolume(volumeId, targetPath, remote, remotePath, configData, readOnly, flags, secretName, secretNamespace)

// err = ns.WaitForMountAvailable(targetPath)
// if err != nil {
// return nil, status.Error(codes.Internal, err.Error())
Expand Down Expand Up @@ -323,6 +347,10 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
if err := ns.RcloneOps.Unmount(ctx, req.GetVolumeId(), targetPath); err != nil {
klog.Warningf("Unmounting volume failed: %s", err)
}

// Remove the volume from tracking
ns.removeTrackedVolume(req.GetVolumeId())

mount.CleanupMountPoint(req.GetTargetPath(), ns.mounter, false)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
Expand All @@ -344,6 +372,82 @@ func (*nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolu
return nil, status.Errorf(codes.Unimplemented, "method NodeExpandVolume not implemented")
}

// Track mounted volume for automatic remounting
func (ns *nodeServer) trackMountedVolume(volumeId, targetPath, remote, remotePath, configData string, readOnly bool, parameters map[string]string, secretName, secretNamespace string) {
ns.mutex.Lock()
defer ns.mutex.Unlock()

ns.mountedVolumes[volumeId] = &MountedVolume{
VolumeId: volumeId,
TargetPath: targetPath,
Remote: remote,
RemotePath: remotePath,
ConfigData: configData,
ReadOnly: readOnly,
Parameters: parameters,
SecretName: secretName,
SecretNamespace: secretNamespace,
}
klog.Infof("Tracked mounted volume %s at path %s", volumeId, targetPath)

if err := ns.persistState(); err != nil {
klog.Errorf("Failed to persist volume state: %v", err)
}
}

// Remove tracked volume when unmounted
func (ns *nodeServer) removeTrackedVolume(volumeId string) {
ns.mutex.Lock()
defer ns.mutex.Unlock()

delete(ns.mountedVolumes, volumeId)
klog.Infof("Removed tracked volume %s", volumeId)

if err := ns.persistState(); err != nil {
klog.Errorf("Failed to persist volume state: %v", err)
}
}

// Automatically remount all tracked volumes after daemon restart
func (ns *nodeServer) remountTrackedVolumes(ctx context.Context) error {
ns.mutex.RLock()
defer ns.mutex.RUnlock()

if len(ns.mountedVolumes) == 0 {
klog.Info("No tracked volumes to remount")
return nil
}

klog.Infof("Remounting %d tracked volumes", len(ns.mountedVolumes))

for volumeId, mv := range ns.mountedVolumes {
klog.Infof("Remounting volume %s to %s", volumeId, mv.TargetPath)

// Create the mount directory if it doesn't exist
if err := os.MkdirAll(mv.TargetPath, 0750); err != nil {
klog.Errorf("Failed to create mount directory %s: %v", mv.TargetPath, err)
continue
}

// Remount the volume
rcloneVol := &RcloneVolume{
ID: mv.VolumeId,
Remote: mv.Remote,
RemotePath: mv.RemotePath,
}

err := ns.RcloneOps.Mount(ctx, rcloneVol, mv.TargetPath, mv.ConfigData, mv.ReadOnly, mv.Parameters)
if err != nil {
klog.Errorf("Failed to remount volume %s: %v", volumeId, err)
// Don't return error here - continue with other volumes
} else {
klog.Infof("Successfully remounted volume %s", volumeId)
}
}

return nil
}

func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error {
for {
select {
Expand All @@ -357,3 +461,57 @@ func (ns *nodeServer) WaitForMountAvailable(mountpoint string) error {
}
}
}

// Persist volume state to disk
func (ns *nodeServer) persistState() error {
ns.mutex.RLock()
defer ns.mutex.RUnlock()

if ns.stateFile == "" {
return nil
}

data, err := json.Marshal(ns.mountedVolumes)
if err != nil {
return fmt.Errorf("failed to marshal volume state: %v", err)
}

if err := os.MkdirAll(filepath.Dir(ns.stateFile), 0755); err != nil {
return fmt.Errorf("failed to create state directory: %v", err)
}

if err := os.WriteFile(ns.stateFile, data, 0600); err != nil {
return fmt.Errorf("failed to write state file: %v", err)
}

klog.Infof("Persisted volume state to %s", ns.stateFile)
return nil
}

// Load volume state from disk
func (ns *nodeServer) loadState() error {
ns.mutex.Lock()
defer ns.mutex.Unlock()

if ns.stateFile == "" {
return nil
}

data, err := os.ReadFile(ns.stateFile)
if err != nil {
if os.IsNotExist(err) {
klog.Info("No persisted volume state found, starting fresh")
return nil
}
return fmt.Errorf("failed to read state file: %v", err)
}

var volumes map[string]*MountedVolume
if err := json.Unmarshal(data, &volumes); err != nil {
return fmt.Errorf("failed to unmarshal volume state: %v", err)
}

ns.mountedVolumes = volumes
klog.Infof("Loaded %d tracked volumes from %s", len(ns.mountedVolumes), ns.stateFile)
return nil
}
9 changes: 7 additions & 2 deletions pkg/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Operations interface {
Unmount(ctx context.Context, volumeId string, targetPath string) error
GetVolumeById(ctx context.Context, volumeId string) (*RcloneVolume, error)
Cleanup() error
Run() error
Run(onDaemonReady func() error) error
}

type Rclone struct {
Expand Down Expand Up @@ -472,11 +472,16 @@ func (r *Rclone) start_daemon() error {
return nil
}

func (r *Rclone) Run() error {
func (r *Rclone) Run(onDaemonReady func() error) error {
err := r.start_daemon()
if err != nil {
return err
}
if onDaemonReady != nil {
if err := onDaemonReady(); err != nil {
klog.Warningf("Error in onDaemonReady callback: %v", err)
}
}
// blocks until the rclone daemon is stopped
return r.daemonCmd.Wait()
}
Expand Down
Loading