diff --git a/go-controller/pkg/factory/types.go b/go-controller/pkg/factory/types.go index 5a31d07d9ea..c844561340b 100644 --- a/go-controller/pkg/factory/types.go +++ b/go-controller/pkg/factory/types.go @@ -45,6 +45,9 @@ type NodeWatchFactory interface { AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) RemoveNamespaceHandler(handler *Handler) + AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error) + RemoveNodeHandler(handler *Handler) + NodeInformer() cache.SharedIndexInformer LocalPodInformer() cache.SharedIndexInformer diff --git a/go-controller/pkg/node/gateway.go b/go-controller/pkg/node/gateway.go index f882943faaa..dc84729e266 100644 --- a/go-controller/pkg/node/gateway.go +++ b/go-controller/pkg/node/gateway.go @@ -25,7 +25,7 @@ import ( type Gateway interface { informer.ServiceAndEndpointsEventHandler Init(factory.NodeWatchFactory) error - Start(<-chan struct{}, *sync.WaitGroup) + Start(<-chan struct{}, *sync.WaitGroup) error GetGatewayBridgeIface() string } @@ -183,15 +183,18 @@ func (g *gateway) Init(wf factory.NodeWatchFactory) error { return nil } -func (g *gateway) Start(stopChan <-chan struct{}, wg *sync.WaitGroup) { +func (g *gateway) Start(stopChan <-chan struct{}, wg *sync.WaitGroup) error { if g.nodeIPManager != nil { - g.nodeIPManager.Run(stopChan, wg) + if err := g.nodeIPManager.Run(stopChan, wg); err != nil { + return err + } } if g.openflowManager != nil { klog.Info("Spawning Conntrack Rule Check Thread") g.openflowManager.Run(stopChan, wg) } + return nil } // sets up an uplink interface for UDP Generic Receive Offload forwarding as part of @@ -287,6 +290,21 @@ func gatewayReady(patchPort string) (bool, error) { return true, nil } +func hostAddressAnnotationReady(watchFactory factory.NodeWatchFactory, nodeName string) (bool, error) { + node, err := watchFactory.GetNode(nodeName) + if err != nil { + return false, fmt.Errorf("failed to node %s: %v", nodeName, err) + } + _, err = util.ParseNodeHostAddresses(node) + if err != nil { + if !util.IsAnnotationNotSetError(err) { + return false, fmt.Errorf("failed to get host addresses for node: %s: %v", nodeName, err) + } + return false, nil + } + return true, nil +} + func (g *gateway) GetGatewayBridgeIface() string { return g.openflowManager.defaultBridge.bridgeName } diff --git a/go-controller/pkg/node/gateway_init.go b/go-controller/pkg/node/gateway_init.go index 9bd9c85ab7f..4d42502eb56 100644 --- a/go-controller/pkg/node/gateway_init.go +++ b/go-controller/pkg/node/gateway_init.go @@ -382,7 +382,7 @@ func interfaceForEXGW(intfName string) string { return intfName } -func (n *OvnNode) initGatewayDPUHost(kubeNodeIP net.IP) error { +func (n *OvnNode) initGatewayDPUHost(cfg *managementPortConfig, kubeNodeIP net.IP) error { // A DPU host gateway is complementary to the shared gateway running // on the DPU embedded CPU. it performs some initializations and // watch on services for iptable rule updates and run a loadBalancerHealth checker @@ -423,7 +423,12 @@ func (n *OvnNode) initGatewayDPUHost(kubeNodeIP net.IP) error { if err := initSharedGatewayIPTables(); err != nil { return err } - gw.nodePortWatcherIptables = newNodePortWatcherIptables() + gw.nodeIPManager, err = newAddressManager(n.name, n.Kube, cfg, n.watchFactory) + if err != nil { + return err + } + + gw.nodePortWatcherIptables = newNodePortWatcherIptables(n.watchFactory, gw.nodeIPManager) gw.loadBalancerHealthChecker = newLoadBalancerHealthChecker(n.name) portClaimWatcher, err := newPortClaimWatcher(n.recorder) if err != nil { diff --git a/go-controller/pkg/node/gateway_init_linux_test.go b/go-controller/pkg/node/gateway_init_linux_test.go index f2c03d6b1bd..0069f53f066 100644 --- a/go-controller/pkg/node/gateway_init_linux_test.go +++ b/go-controller/pkg/node/gateway_init_linux_test.go @@ -535,6 +535,7 @@ func shareGatewayInterfaceDPUHostTest(app *cli.App, testNS ns.NetNS, uplinkName, clusterCIDR string = "10.1.0.0/16" svcCIDR string = "172.16.1.0/24" nodeName string = "node1" + nodeSubnet string = "10.1.1.0/24" ) app.Action = func(ctx *cli.Context) error { @@ -558,6 +559,25 @@ func shareGatewayInterfaceDPUHostTest(app *cli.App, testNS ns.NetNS, uplinkName, EgressFirewallClient: egressFirewallFakeClient, } + _, nodeNet, err := net.ParseCIDR(nodeSubnet) + Expect(err).NotTo(HaveOccurred()) + + // Make a fake MgmtPortConfig with only the fields we care about + fakeMgmtPortIPFamilyConfig := managementPortIPFamilyConfig{ + ipt: nil, + allSubnets: nil, + ifAddr: nodeNet, + gwIP: nodeNet.IP, + } + + fakeMgmtPortConfig := managementPortConfig{ + ifName: nodeName, + link: nil, + routerMAC: nil, + ipv4: &fakeMgmtPortIPFamilyConfig, + ipv6: nil, + } + stop := make(chan struct{}) wf, err := factory.NewNodeWatchFactory(fakeClient, nodeName) Expect(err).NotTo(HaveOccurred()) @@ -573,7 +593,7 @@ func shareGatewayInterfaceDPUHostTest(app *cli.App, testNS ns.NetNS, uplinkName, err = testNS.Do(func(ns.NetNS) error { defer GinkgoRecover() - err := n.initGatewayDPUHost(net.ParseIP(hostIP)) + err := n.initGatewayDPUHost(&fakeMgmtPortConfig, net.ParseIP(hostIP)) Expect(err).NotTo(HaveOccurred()) // Check svc and masquerade routes added towards eth0GWIP diff --git a/go-controller/pkg/node/gateway_localnet.go b/go-controller/pkg/node/gateway_localnet.go index f77ddf910a1..5779c75774d 100644 --- a/go-controller/pkg/node/gateway_localnet.go +++ b/go-controller/pkg/node/gateway_localnet.go @@ -55,6 +55,14 @@ func newLocalGateway(nodeName string, hostSubnets []*net.IPNet, gwNextHops []net } } + addrAnnotReady := true + if config.OvnKubeNode.Mode == types.NodeModeDPU { + addrAnnotReady, err = hostAddressAnnotationReady(watchFactory, nodeName) + if err != nil { + return nil, err + } + } + if exGwBridge != nil { gw.readyFunc = func() (bool, error) { ready, err := gatewayReady(gwBridge.patchPort) @@ -65,11 +73,15 @@ func newLocalGateway(nodeName string, hostSubnets []*net.IPNet, gwNextHops []net if err != nil { return false, err } - return ready && exGWReady, nil + return ready && exGWReady && addrAnnotReady, nil } } else { gw.readyFunc = func() (bool, error) { - return gatewayReady(gwBridge.patchPort) + gwReady, err := gatewayReady(gwBridge.patchPort) + if err != nil { + return false, err + } + return gwReady && addrAnnotReady, nil } } @@ -86,9 +98,12 @@ func newLocalGateway(nodeName string, hostSubnets []*net.IPNet, gwNextHops []net } } - gw.nodeIPManager = newAddressManager(nodeName, kube, cfg, watchFactory) - - gw.openflowManager, err = newGatewayOpenFlowManager(gwBridge, exGwBridge, gw.nodeIPManager.ListAddresses()) + gw.nodeIPManager, err = newAddressManager(nodeName, kube, cfg, watchFactory) + if err != nil { + return err + } + nodeIPs := gw.nodeIPManager.ListAddresses() + gw.openflowManager, err = newGatewayOpenFlowManager(gwBridge, exGwBridge, nodeIPs) if err != nil { return err } diff --git a/go-controller/pkg/node/gateway_localnet_linux_test.go b/go-controller/pkg/node/gateway_localnet_linux_test.go index 89f898a52ff..b08e6a3d8c4 100644 --- a/go-controller/pkg/node/gateway_localnet_linux_test.go +++ b/go-controller/pkg/node/gateway_localnet_linux_test.go @@ -45,7 +45,7 @@ func initFakeNodePortWatcher(iptV4, iptV6 util.IPTablesHelper) *nodePortWatcher ofportPatch: "patch-breth0_ov", gatewayIPv4: v4localnetGatewayIP, gatewayIPv6: v6localnetGatewayIP, - serviceInfo: make(map[k8stypes.NamespacedName]*serviceConfig), + SvcInfo: SvcInfo{serviceInfo: make(map[k8stypes.NamespacedName]*serviceConfig)}, ofm: &openflowManager{ flowCache: map[string][]string{}, }, @@ -59,7 +59,7 @@ func startNodePortWatcher(n *nodePortWatcher, fakeClient *util.OVNClientset, fak } k := &kube.Kube{fakeClient.KubeClient, nil, nil, nil} - n.nodeIPManager = newAddressManager(fakeNodeName, k, fakeMgmtPortConfig, n.watchFactory) + n.nodeIPManager, _ = newAddressManager(fakeNodeName, k, fakeMgmtPortConfig, n.watchFactory) localHostNetEp := "192.168.18.15/32" ip, _, _ := net.ParseCIDR(localHostNetEp) n.nodeIPManager.addAddr(ip) diff --git a/go-controller/pkg/node/gateway_shared_intf.go b/go-controller/pkg/node/gateway_shared_intf.go index 1e75ee048e0..56929c4f6a8 100644 --- a/go-controller/pkg/node/gateway_shared_intf.go +++ b/go-controller/pkg/node/gateway_shared_intf.go @@ -50,30 +50,42 @@ var ( HostNodePortCTZone = config.Default.ConntrackZone + 3 //64003 ) +type SvcInfo struct { + // Map of service name to programmed iptables/OF rules + serviceInfo map[ktypes.NamespacedName]*serviceConfig + serviceInfoLock sync.Mutex +} + // nodePortWatcherIptables manages iptables rules for shared gateway // to ensure that services using NodePorts are accessible. type nodePortWatcherIptables struct { + SvcInfo + nodeIPManager *addressManager + watchFactory factory.NodeWatchFactory } -func newNodePortWatcherIptables() *nodePortWatcherIptables { - return &nodePortWatcherIptables{} +func newNodePortWatcherIptables(watchFactory factory.NodeWatchFactory, nodeIPManager *addressManager) *nodePortWatcherIptables { + npw := &nodePortWatcherIptables{ + SvcInfo: SvcInfo{serviceInfo: make(map[ktypes.NamespacedName]*serviceConfig)}, + nodeIPManager: nodeIPManager, + watchFactory: watchFactory, + } + return npw } // nodePortWatcher manages OpenFlow and iptables rules // to ensure that services using NodePorts are accessible type nodePortWatcher struct { - dpuMode bool - gatewayIPv4 string - gatewayIPv6 string - ofportPhys string - ofportPatch string - gwBridge string - // Map of service name to programmed iptables/OF rules - serviceInfo map[ktypes.NamespacedName]*serviceConfig - serviceInfoLock sync.Mutex - ofm *openflowManager - nodeIPManager *addressManager - watchFactory factory.NodeWatchFactory + SvcInfo + dpuMode bool + gatewayIPv4 string + gatewayIPv6 string + ofportPhys string + ofportPatch string + gwBridge string + ofm *openflowManager + nodeIPManager *addressManager + watchFactory factory.NodeWatchFactory } type serviceConfig struct { @@ -343,40 +355,40 @@ func (npw *nodePortWatcher) generateArpBypassFlow(protocol string, ipAddr string } // getAndDeleteServiceInfo returns the serviceConfig for a service and if it exists and then deletes the entry -func (npw *nodePortWatcher) getAndDeleteServiceInfo(index ktypes.NamespacedName) (out *serviceConfig, exists bool) { - npw.serviceInfoLock.Lock() - defer npw.serviceInfoLock.Unlock() - out, exists = npw.serviceInfo[index] - delete(npw.serviceInfo, index) +func (svcInfo *SvcInfo) getAndDeleteServiceInfo(index ktypes.NamespacedName) (out *serviceConfig, exists bool) { + svcInfo.serviceInfoLock.Lock() + defer svcInfo.serviceInfoLock.Unlock() + out, exists = svcInfo.serviceInfo[index] + delete(svcInfo.serviceInfo, index) return out, exists } // getServiceInfo returns the serviceConfig for a service and if it exists -func (npw *nodePortWatcher) getServiceInfo(index ktypes.NamespacedName) (out *serviceConfig, exists bool) { - npw.serviceInfoLock.Lock() - defer npw.serviceInfoLock.Unlock() - out, exists = npw.serviceInfo[index] +func (svcInfo *SvcInfo) getServiceInfo(index ktypes.NamespacedName) (out *serviceConfig, exists bool) { + svcInfo.serviceInfoLock.Lock() + defer svcInfo.serviceInfoLock.Unlock() + out, exists = svcInfo.serviceInfo[index] return out, exists } // getAndSetServiceInfo creates and sets the serviceConfig, returns if it existed and whatever was there -func (npw *nodePortWatcher) getAndSetServiceInfo(index ktypes.NamespacedName, service *kapi.Service, hasLocalHostNetworkEp bool) (old *serviceConfig, exists bool) { - npw.serviceInfoLock.Lock() - defer npw.serviceInfoLock.Unlock() +func (svcInfo *SvcInfo) getAndSetServiceInfo(index ktypes.NamespacedName, service *kapi.Service, hasLocalHostNetworkEp bool) (old *serviceConfig, exists bool) { + svcInfo.serviceInfoLock.Lock() + defer svcInfo.serviceInfoLock.Unlock() - old, exists = npw.serviceInfo[index] - npw.serviceInfo[index] = &serviceConfig{service: service, hasLocalHostNetworkEp: hasLocalHostNetworkEp} + old, exists = svcInfo.serviceInfo[index] + svcInfo.serviceInfo[index] = &serviceConfig{service: service, hasLocalHostNetworkEp: hasLocalHostNetworkEp} return old, exists } // addOrSetServiceInfo creates and sets the serviceConfig if it doesn't exist -func (npw *nodePortWatcher) addOrSetServiceInfo(index ktypes.NamespacedName, service *kapi.Service, hasLocalHostNetworkEp bool) (exists bool) { - npw.serviceInfoLock.Lock() - defer npw.serviceInfoLock.Unlock() +func (svcInfo *SvcInfo) addOrSetServiceInfo(index ktypes.NamespacedName, service *kapi.Service, hasLocalHostNetworkEp bool) (exists bool) { + svcInfo.serviceInfoLock.Lock() + defer svcInfo.serviceInfoLock.Unlock() - if _, exists := npw.serviceInfo[index]; !exists { + if _, exists := svcInfo.serviceInfo[index]; !exists { // Only set this if it doesn't exist - npw.serviceInfo[index] = &serviceConfig{service: service, hasLocalHostNetworkEp: hasLocalHostNetworkEp} + svcInfo.serviceInfo[index] = &serviceConfig{service: service, hasLocalHostNetworkEp: hasLocalHostNetworkEp} return false } return true @@ -385,22 +397,22 @@ func (npw *nodePortWatcher) addOrSetServiceInfo(index ktypes.NamespacedName, ser // updateServiceInfo sets the serviceConfig for a service and returns the existing serviceConfig, if inputs are nil // do not update those fields, if it does not exist return nil. -func (npw *nodePortWatcher) updateServiceInfo(index ktypes.NamespacedName, service *kapi.Service, hasLocalHostNetworkEp *bool) (old *serviceConfig, exists bool) { +func (svcInfo *SvcInfo) updateServiceInfo(index ktypes.NamespacedName, service *kapi.Service, hasLocalHostNetworkEp *bool) (old *serviceConfig, exists bool) { - npw.serviceInfoLock.Lock() - defer npw.serviceInfoLock.Unlock() + svcInfo.serviceInfoLock.Lock() + defer svcInfo.serviceInfoLock.Unlock() - if old, exists = npw.serviceInfo[index]; !exists { + if old, exists = svcInfo.serviceInfo[index]; !exists { klog.V(5).Infof("No serviceConfig found for service %s in namespace %s", index.Name, index.Namespace) return nil, exists } if service != nil { - npw.serviceInfo[index].service = service + svcInfo.serviceInfo[index].service = service } if hasLocalHostNetworkEp != nil { - npw.serviceInfo[index].hasLocalHostNetworkEp = *hasLocalHostNetworkEp + svcInfo.serviceInfo[index].hasLocalHostNetworkEp = *hasLocalHostNetworkEp } return old, exists @@ -725,26 +737,55 @@ func (npw *nodePortWatcher) UpdateEndpoints(old *kapi.Endpoints, new *kapi.Endpo } func (npwipt *nodePortWatcherIptables) AddService(service *kapi.Service) { + var hasLocalHostNetworkEp bool // don't process headless service or services that doesn't have NodePorts or ExternalIPs if !util.ServiceTypeHasClusterIP(service) || !util.IsClusterIPSet(service) { return } - addServiceRules(service, false, nil) + klog.V(5).Infof("Adding service %s in namespace %s", service.Name, service.Namespace) + name := ktypes.NamespacedName{Namespace: service.Namespace, Name: service.Name} + ep, err := npwipt.watchFactory.GetEndpoint(service.Namespace, service.Name) + if err != nil { + klog.V(5).Infof("No endpoint found for service %s in namespace %s during service Add", service.Name, service.Namespace) + // No endpoint object exists yet so default to false + hasLocalHostNetworkEp = false + } else { + nodeIPs := npwipt.nodeIPManager.ListAddresses() + hasLocalHostNetworkEp = hasLocalHostNetworkEndpoints(ep, nodeIPs) + } + + // If something didn't already do it add correct Service rules + if exists := npwipt.addOrSetServiceInfo(name, service, hasLocalHostNetworkEp); !exists { + klog.V(5).Infof("Service Add %s event in namespace %s came before endpoint event setting svcConfig", service.Name, service.Namespace) + addServiceRules(service, hasLocalHostNetworkEp, nil) + } else { + klog.V(5).Infof("Rules already programmed for %s in namespace %s", service.Name, service.Namespace) + } } func (npwipt *nodePortWatcherIptables) UpdateService(old, new *kapi.Service) { + name := ktypes.NamespacedName{Namespace: old.Namespace, Name: old.Name} + if serviceUpdateNotNeeded(old, new) { klog.V(5).Infof("Skipping service update for: %s as change does not apply to any of .Spec.Ports, "+ ".Spec.ExternalIP, .Spec.ClusterIP, .Spec.ClusterIPs, .Spec.Type, .Status.LoadBalancer.Ingress", new.Name) return } + // Update the service in svcConfig if we need to so that other handler + // threads do the correct thing, leave hasLocalHostNetworkEp alone in the cache + svcConfig, exists := npwipt.updateServiceInfo(name, new, nil) + if !exists { + klog.V(5).Infof("Service %s in namespace %s was deleted during service Update", old.Name, old.Namespace) + return + } + if util.ServiceTypeHasClusterIP(old) && util.IsClusterIPSet(old) { delServiceRules(old, nil) } if util.ServiceTypeHasClusterIP(new) && util.IsClusterIPSet(new) { - addServiceRules(new, false, nil) + addServiceRules(new, svcConfig.hasLocalHostNetworkEp, nil) } } @@ -753,21 +794,39 @@ func (npwipt *nodePortWatcherIptables) DeleteService(service *kapi.Service) { if !util.ServiceTypeHasClusterIP(service) || !util.IsClusterIPSet(service) { return } - delServiceRules(service, nil) + klog.V(5).Infof("Deleting service %s in namespace %s", service.Name, service.Namespace) + name := ktypes.NamespacedName{Namespace: service.Namespace, Name: service.Name} + if svcConfig, exists := npwipt.getAndDeleteServiceInfo(name); exists { + delServiceRules(svcConfig.service, nil) + } else { + klog.Warningf("Deletion failed No service found in cache for endpoint %s in namespace %s", service.Name, service.Namespace) + } } func (npwipt *nodePortWatcherIptables) SyncServices(services []interface{}) error { keepIPTRules := []iptRule{} for _, serviceInterface := range services { + name := ktypes.NamespacedName{Namespace: serviceInterface.(*kapi.Service).Namespace, Name: serviceInterface.(*kapi.Service).Name} + service, ok := serviceInterface.(*kapi.Service) if !ok { klog.Errorf("Spurious object in syncServices: %v", serviceInterface) continue } + + ep, err := npwipt.watchFactory.GetEndpoint(service.Namespace, service.Name) + if err != nil { + klog.V(5).Infof("No endpoint found for service %s in namespace %s during sync", service.Name, service.Namespace) + continue + } + nodeIPs := npwipt.nodeIPManager.ListAddresses() + hasLocalHostNetworkEp := hasLocalHostNetworkEndpoints(ep, nodeIPs) + npwipt.getAndSetServiceInfo(name, service, hasLocalHostNetworkEp) + // Add correct iptables rules. // TODO: ETP and ITP is not implemented for smart NIC mode. - keepIPTRules = append(keepIPTRules, getGatewayIPTRules(service, false)...) + keepIPTRules = append(keepIPTRules, getGatewayIPTRules(service, hasLocalHostNetworkEp)...) } // sync IPtables rules once @@ -1313,6 +1372,14 @@ func newSharedGateway(nodeName string, subnets []*net.IPNet, gwNextHops []net.IP } } + addrAnnotReady := true + if config.OvnKubeNode.Mode == types.NodeModeDPU { + addrAnnotReady, err = hostAddressAnnotationReady(watchFactory, nodeName) + if err != nil { + return nil, err + } + } + if exGwBridge != nil { gw.readyFunc = func() (bool, error) { ready, err := gatewayReady(gwBridge.patchPort) @@ -1323,11 +1390,15 @@ func newSharedGateway(nodeName string, subnets []*net.IPNet, gwNextHops []net.IP if err != nil { return false, err } - return ready && exGWReady, nil + return ready && exGWReady && addrAnnotReady, nil } } else { gw.readyFunc = func() (bool, error) { - return gatewayReady(gwBridge.patchPort) + gwReady, err := gatewayReady(gwBridge.patchPort) + if err != nil { + return false, err + } + return gwReady && addrAnnotReady, nil } } @@ -1345,7 +1416,12 @@ func newSharedGateway(nodeName string, subnets []*net.IPNet, gwNextHops []net.IP return err } } - gw.nodeIPManager = newAddressManager(nodeName, kube, cfg, watchFactory) + + gw.nodeIPManager, err = newAddressManager(nodeName, kube, cfg, watchFactory) + if err != nil { + return err + } + nodeIPs := gw.nodeIPManager.ListAddresses() gw.openflowManager, err = newGatewayOpenFlowManager(gwBridge, exGwBridge, nodeIPs) @@ -1356,7 +1432,8 @@ func newSharedGateway(nodeName string, subnets []*net.IPNet, gwNextHops []net.IP // resync flows on IP change gw.nodeIPManager.OnChanged = func() { klog.V(5).Info("Node addresses changed, re-syncing bridge flows") - if err := gw.openflowManager.updateBridgeFlowCache(gw.nodeIPManager.ListAddresses()); err != nil { + nodeIPs = gw.nodeIPManager.ListAddresses() + if err := gw.openflowManager.updateBridgeFlowCache(nodeIPs); err != nil { // very unlikely - somehow node has lost its IP address klog.Errorf("Failed to re-generate gateway flows after address change: %v", err) } @@ -1438,7 +1515,7 @@ func newNodePortWatcher(patchPort, gwBridge, gwIntf string, ips []*net.IPNet, of ofportPhys: ofportPhys, ofportPatch: ofportPatch, gwBridge: gwBridge, - serviceInfo: make(map[ktypes.NamespacedName]*serviceConfig), + SvcInfo: SvcInfo{serviceInfo: make(map[ktypes.NamespacedName]*serviceConfig)}, nodeIPManager: nodeIPManager, ofm: ofm, watchFactory: watchFactory, diff --git a/go-controller/pkg/node/node.go b/go-controller/pkg/node/node.go index 1c202785a23..cd8a3cfc0df 100644 --- a/go-controller/pkg/node/node.go +++ b/go-controller/pkg/node/node.go @@ -411,7 +411,7 @@ func (n *OvnNode) Start(ctx context.Context, wg *sync.WaitGroup) error { // Initialize gateway if config.OvnKubeNode.Mode == types.NodeModeDPUHost { - err = n.initGatewayDPUHost(nodeAddr) + err = n.initGatewayDPUHost(mgmtPortConfig, nodeAddr) if err != nil { return err } @@ -431,7 +431,9 @@ func (n *OvnNode) Start(ctx context.Context, wg *sync.WaitGroup) error { if err := waiter.Wait(); err != nil { return err } - n.gateway.Start(n.stopChan, wg) + if err = n.gateway.Start(n.stopChan, wg); err != nil { + return err + } klog.Infof("Gateway and management port readiness took %v", time.Since(start)) // Note(adrianc): DPU deployments are expected to support the new shared gateway changes, upgrade flow diff --git a/go-controller/pkg/node/node_ip_handler_linux.go b/go-controller/pkg/node/node_ip_handler_linux.go index eb0854c1605..42381a57d30 100644 --- a/go-controller/pkg/node/node_ip_handler_linux.go +++ b/go-controller/pkg/node/node_ip_handler_linux.go @@ -4,16 +4,21 @@ package node import ( + "fmt" "net" "sync" "time" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" "github.com/vishvananda/netlink" + kapi "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" utilnet "k8s.io/utils/net" ) @@ -30,17 +35,30 @@ type addressManager struct { } // initializes a new address manager which will hold all the IPs on a node -func newAddressManager(nodeName string, k kube.Interface, config *managementPortConfig, watchFactory factory.NodeWatchFactory) *addressManager { +func newAddressManager(nodeName string, k kube.Interface, mpConfig *managementPortConfig, watchFactory factory.NodeWatchFactory) (*addressManager, error) { mgr := &addressManager{ nodeName: nodeName, watchFactory: watchFactory, addresses: sets.NewString(), - mgmtPortConfig: config, + mgmtPortConfig: mpConfig, OnChanged: func() {}, } - mgr.nodeAnnotator = kube.NewNodeAnnotator(k, nodeName) - mgr.sync() - return mgr + if config.OvnKubeNode.Mode != types.NodeModeDPU { + mgr.nodeAnnotator = kube.NewNodeAnnotator(k, nodeName) + mgr.sync() + } else { + node, err := mgr.watchFactory.GetNode(nodeName) + if err != nil { + return nil, fmt.Errorf("failed to node %s: %v", nodeName, err) + } + currAddresses, err := util.ParseNodeHostAddresses(node) + if err != nil && !util.IsAnnotationNotSetError(err) { + return nil, fmt.Errorf("failed to get host addresses for node: %s: %v", nodeName, err) + } else { + mgr.assignAddresses(currAddresses) + } + } + return mgr, nil } // updates the address manager with a new IP @@ -87,8 +105,54 @@ func (c *addressManager) ListAddresses() []net.IP { return out } -func (c *addressManager) Run(stopChan <-chan struct{}, doneWg *sync.WaitGroup) { +func (c *addressManager) syncHostAddr() error { + _, err := c.watchFactory.AddNodeHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node := obj.(*kapi.Node) + if node.Name != c.nodeName { + return + } + + klog.V(5).Infof("Added event for Node %q", node.Name) + currAddresses, err := util.ParseNodeHostAddresses(node) + if err != nil && !util.IsAnnotationNotSetError(err) { + klog.Errorf("Failed to get host addresses for node: %s: %v", node.Name, err) + return + } + addrChanged := c.assignAddresses(currAddresses) + if addrChanged { + c.OnChanged() + } + }, + UpdateFunc: func(old, new interface{}) { + node := new.(*kapi.Node) + if node.Name != c.nodeName { + return + } + + currAddresses, err := util.ParseNodeHostAddresses(node) + if err != nil && !util.IsAnnotationNotSetError(err) { + klog.Errorf("Failed to get host addresses for node: %s: %v", node.Name, err) + return + } + addrChanged := c.assignAddresses(currAddresses) + if addrChanged { + c.OnChanged() + } + }, + DeleteFunc: func(obj interface{}) { + }, + }, nil) + return err +} + +func (c *addressManager) Run(stopChan <-chan struct{}, doneWg *sync.WaitGroup) error { var addrChan chan netlink.AddrUpdate + + if config.OvnKubeNode.Mode == types.NodeModeDPU { + return c.syncHostAddr() + } + addrSubscribeOptions := netlink.AddrSubscribeOptions{ ErrorCallback: func(err error) { klog.Errorf("Failed during AddrSubscribe callback: %v", err) @@ -161,6 +225,7 @@ func (c *addressManager) Run(stopChan <-chan struct{}, doneWg *sync.WaitGroup) { }() klog.Info("Node IP manager is running") + return nil } func (c *addressManager) assignAddresses(nodeHostAddresses sets.String) bool {