Skip to content

Commit

Permalink
Fixes leftover podref issue
Browse files Browse the repository at this point in the history
Signed-off-by: Muhammad Adil Ghaffar <[email protected]>
  • Loading branch information
adilGhaffarDev committed Feb 6, 2025
1 parent c4d2f71 commit 14f993e
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 23 deletions.
23 changes: 23 additions & 0 deletions pkg/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,29 @@ func getMatchingIPReservationIndex(reservelist []types.IPReservation, id, ifName
return -1
}

// DeallocateIPWithPodRef removes allocation from reserve list. Returns the updated reserve list and the deallocated IP.
func DeallocateIPWithPodRef(reservelist []types.IPReservation, podRef string) ([]types.IPReservation, net.IP) {
index := getMatchingIPReservationIndexByPodRef(reservelist, podRef)
if index < 0 {
// Allocation not found. Return the original reserve list and nil IP.
return reservelist, nil
}

ip := reservelist[index].IP
logging.Debugf("Deallocating given previously used IP: %v", ip.String())

return removeIdxFromSlice(reservelist, index), ip
}

func getMatchingIPReservationIndexByPodRef(reservelist []types.IPReservation, podRef string) int {
for idx, v := range reservelist {
if v.PodRef == podRef {
return idx
}
}
return -1
}

func removeIdxFromSlice(s []types.IPReservation, i int) []types.IPReservation {
s[i] = s[len(s)-1]
return s[:len(s)-1]
Expand Down
55 changes: 45 additions & 10 deletions pkg/controlloop/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
nadlister "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
"github.com/pkg/errors"

"github.com/k8snetworkplumbingwg/whereabouts/pkg/allocate"
whereaboutsv1alpha1 "github.com/k8snetworkplumbingwg/whereabouts/pkg/api/whereabouts.cni.cncf.io/v1alpha1"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/config"
wbclientset "github.com/k8snetworkplumbingwg/whereabouts/pkg/generated/clientset/versioned"
Expand Down Expand Up @@ -224,27 +225,53 @@ func (pc *PodController) garbageCollectPodIPs(pod *v1.Pod) error {
if allocation.PodRef == podID(podNamespace, podName) {
logging.Verbosef("stale allocation to cleanup: %+v", allocation)

client := *wbclient.NewKubernetesClient(nil, pc.k8sClient)
client := *wbclient.NewKubernetesClient(pc.wbClient, pc.k8sClient)
wbClient := &wbclient.KubernetesIPAM{
Client: client,
Config: *ipamConfig,
Namespace: pool.Namespace,
}

if err != nil {
logging.Debugf("error while generating the IPAM client: %v", err)
continue
}
if _, err := pc.cleanupFunc(context.TODO(), types.Deallocate, *ipamConfig, wbClient); err != nil {
logging.Errorf("failed to cleanup allocation: %v", err)
}
if err := pc.addressGarbageCollected(pod, nad.GetName(), pool.Spec.Range, allocationIndex); err != nil {
logging.Errorf("failed to issue event for successful IP address cleanup: %v", err)
podExist ,_ := podExists(wbClient, podNamespace, podName)
if podExist {
logging.Verbosef("Pod: %s exists", podName)
if _, err := pc.cleanupFunc(context.TODO(), types.Deallocate, *ipamConfig, wbClient); err != nil {
logging.Errorf("failed to cleanup allocation: %v", err)
}
if err := pc.addressGarbageCollected(pod, nad.GetName(), pool.Spec.Range, allocationIndex); err != nil {
logging.Errorf("failed to issue event for successful IP address cleanup: %v", err)
}
} else {
logging.Verbosef("Pod: %s does not exist", podName)
logging.Verbosef("pool.Namespace: %s ", pool.Namespace)
if err = cleanUpLeftoverPodRefs(context.TODO(), wbClient, *ipamConfig, podName, podNamespace, pool); err != nil {
logging.Errorf("failed to clean up Leftover PodRefs: %v", err)
}
}
}
}
}
}
return nil
}

// cleanUpLeftoverPodRefs cleans up leftover pod references from the pool
func cleanUpLeftoverPodRefs(ctx context.Context, ipam *wbclient.KubernetesIPAM, ipamConf types.IPAMConfig, podName string, podNamespace string, poolInput *whereaboutsv1alpha1.IPPool) error {
firstIP, _, err := poolInput.ParseCIDR()
if err != nil {
return err
}
reservelist := wbclient.ToIPReservationList(poolInput.Spec.Allocations, firstIP)
updatedreservelist, _ := allocate.DeallocateIPWithPodRef(reservelist, podID(podNamespace, podName))
if updatedreservelist == nil {
// Do not fail if allocation was not found.
logging.Debugf("Failed to find allocation for Pod: %s", podID(podNamespace, podName))
return nil
} else {
if err := wbclient.UpdatePool(ctx, ipam, poolInput.Spec.Range, ipamConf, updatedreservelist); err!=nil {
logging.Debugf("Failed to UpdatePool: %s", err)
}
}
return nil
}

Expand Down Expand Up @@ -357,6 +384,14 @@ func onPodDelete(queue workqueue.TypedRateLimitingInterface[*v1.Pod], obj interf
func podID(podNamespace string, podName string) string {
return fmt.Sprintf("%s/%s", podNamespace, podName)
}
// podExists checks if a pod exists in the cluster
func podExists(client *wbclient.KubernetesIPAM, namespace, podName string) (bool, error) {
pod, err := client.GetPod(namespace,podName)
if err != nil {
return false, err
}
return pod != nil, nil
}

func podNetworkStatus(pod *v1.Pod) ([]nadv1.NetworkStatus, error) {
var ifaceStatuses []nadv1.NetworkStatus
Expand Down
40 changes: 27 additions & 13 deletions pkg/storage/kubernetes/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const UnnamedNetwork string = ""
type KubernetesIPAM struct {
Client
Config whereaboutstypes.IPAMConfig
namespace string
Namespace string
containerID string
IfName string
}
Expand All @@ -46,7 +46,7 @@ func newKubernetesIPAM(containerID, ifName string, ipamConf whereaboutstypes.IPA
Config: ipamConf,
containerID: containerID,
IfName: ifName,
namespace: namespace,
Namespace: namespace,
Client: kubernetesClient,
}
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func NewKubernetesIPAMWithNamespace(containerID, ifName string, ipamConf whereab
if err != nil {
return nil, err
}
k8sIPAM.namespace = namespace
k8sIPAM.Namespace = namespace
return k8sIPAM, nil
}

Expand Down Expand Up @@ -137,14 +137,14 @@ func (i *KubernetesIPAM) getPool(ctx context.Context, name string, iprange strin
ctxWithTimeout, cancel := context.WithTimeout(ctx, storage.RequestTimeout)
defer cancel()

pool, err := i.client.WhereaboutsV1alpha1().IPPools(i.namespace).Get(ctxWithTimeout, name, metav1.GetOptions{})
pool, err := i.client.WhereaboutsV1alpha1().IPPools(i.Namespace).Get(ctxWithTimeout, name, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
// pool does not exist, create it
newPool := &whereaboutsv1alpha1.IPPool{}
newPool.ObjectMeta.Name = name
newPool.Spec.Range = iprange
newPool.Spec.Allocations = make(map[string]whereaboutsv1alpha1.IPAllocation)
_, err = i.client.WhereaboutsV1alpha1().IPPools(i.namespace).Create(ctxWithTimeout, newPool, metav1.CreateOptions{})
_, err = i.client.WhereaboutsV1alpha1().IPPools(i.Namespace).Create(ctxWithTimeout, newPool, metav1.CreateOptions{})
if err != nil && errors.IsAlreadyExists(err) {
// the pool was just created -- allow retry
return nil, &temporaryError{err}
Expand All @@ -162,7 +162,7 @@ func (i *KubernetesIPAM) getPool(ctx context.Context, name string, iprange strin

// Status tests connectivity to the kubernetes backend
func (i *KubernetesIPAM) Status(ctx context.Context) error {
_, err := i.client.WhereaboutsV1alpha1().IPPools(i.namespace).List(ctx, metav1.ListOptions{})
_, err := i.client.WhereaboutsV1alpha1().IPPools(i.Namespace).List(ctx, metav1.ListOptions{})
return err
}

Expand All @@ -180,7 +180,7 @@ type KubernetesIPPool struct {

// Allocations returns the initially retrieved set of allocations for this pool
func (p *KubernetesIPPool) Allocations() []whereaboutstypes.IPReservation {
return toIPReservationList(p.pool.Spec.Allocations, p.firstIP)
return ToIPReservationList(p.pool.Spec.Allocations, p.firstIP)
}

// Update sets the pool allocated IP list to the given IP reservations
Expand Down Expand Up @@ -240,7 +240,7 @@ func (p *KubernetesIPPool) Update(ctx context.Context, reservations []whereabout
return nil
}

func toIPReservationList(allocations map[string]whereaboutsv1alpha1.IPAllocation, firstip net.IP) []whereaboutstypes.IPReservation {
func ToIPReservationList(allocations map[string]whereaboutsv1alpha1.IPAllocation, firstip net.IP) []whereaboutstypes.IPReservation {
reservelist := []whereaboutstypes.IPReservation{}
for offset, a := range allocations {
numOffset, err := strconv.ParseInt(offset, 10, 64)
Expand Down Expand Up @@ -276,7 +276,7 @@ type KubernetesOverlappingRangeStore struct {

// GetOverlappingRangeStore returns a clusterstore interface
func (i *KubernetesIPAM) GetOverlappingRangeStore() (storage.OverlappingRangeStore, error) {
return &KubernetesOverlappingRangeStore{i.client, i.namespace}, nil
return &KubernetesOverlappingRangeStore{i.client, i.Namespace}, nil
}

// IsAllocatedInOverlappingRange checks for IP addresses to see if they're allocated cluster wide, for overlapping
Expand Down Expand Up @@ -457,7 +457,7 @@ func IPManagement(ctx context.Context, mode int, ipamConf whereaboutstypes.IPAMC
}

// setup leader election
le, leader, deposed := newLeaderElector(ctx, client.clientSet, client.namespace, client)
le, leader, deposed := newLeaderElector(ctx, client.clientSet, client.Namespace, client)
var wg sync.WaitGroup
wg.Add(2)

Expand Down Expand Up @@ -510,11 +510,25 @@ func IPManagement(ctx context.Context, mode int, ipamConf whereaboutstypes.IPAMC
return newips, err
}

func UpdatePool(ctx context.Context, ipam *KubernetesIPAM, ipRange string, ipamConf whereaboutstypes.IPAMConfig, usereservelist []whereaboutstypes.IPReservation) error {
pool, err := ipam.GetIPPool(ctx, PoolIdentifier{IpRange: ipRange, NetworkName: ipamConf.NetworkName})
if err != nil {
logging.Errorf("Failed to get pool: %v", err)
return err
}

if err := pool.Update(ctx, usereservelist); err != nil {
logging.Errorf("Failed to update pool: %v", err)
return err
}
return nil
}

func GetNodeSlicePoolRange(ctx context.Context, ipam *KubernetesIPAM, nodeName string) (string, error) {
logging.Debugf("ipam namespace is %v", ipam.namespace)
nodeSlice, err := ipam.client.WhereaboutsV1alpha1().NodeSlicePools(ipam.namespace).Get(ctx, getNodeSliceName(ipam), metav1.GetOptions{})
logging.Debugf("ipam namespace is %v", ipam.Namespace)
nodeSlice, err := ipam.client.WhereaboutsV1alpha1().NodeSlicePools(ipam.Namespace).Get(ctx, getNodeSliceName(ipam), metav1.GetOptions{})
if err != nil {
logging.Errorf("error getting node slice %s/%s %v", ipam.namespace, getNodeSliceName(ipam), err)
logging.Errorf("error getting node slice %s/%s %v", ipam.Namespace, getNodeSliceName(ipam), err)
return "", err
}
for _, allocation := range nodeSlice.Status.Allocations {
Expand Down

0 comments on commit 14f993e

Please sign in to comment.