Skip to content

Commit

Permalink
node IP manager should manage IPs on the dpu-host nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Yun Zhou [email protected]
  • Loading branch information
cathy-zhou committed Aug 23, 2022
1 parent 550a7cb commit f138313
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 69 deletions.
3 changes: 3 additions & 0 deletions go-controller/pkg/factory/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type NodeWatchFactory interface {
AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error)
RemoveNamespaceHandler(handler *Handler)

AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error)
RemoveNodeHandler(handler *Handler)

NodeInformer() cache.SharedIndexInformer
LocalPodInformer() cache.SharedIndexInformer

Expand Down
24 changes: 21 additions & 3 deletions go-controller/pkg/node/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
type Gateway interface {
informer.ServiceAndEndpointsEventHandler
Init(factory.NodeWatchFactory) error
Start(<-chan struct{}, *sync.WaitGroup)
Start(<-chan struct{}, *sync.WaitGroup) error
GetGatewayBridgeIface() string
}

Expand Down Expand Up @@ -183,15 +183,18 @@ func (g *gateway) Init(wf factory.NodeWatchFactory) error {
return nil
}

func (g *gateway) Start(stopChan <-chan struct{}, wg *sync.WaitGroup) {
func (g *gateway) Start(stopChan <-chan struct{}, wg *sync.WaitGroup) error {
if g.nodeIPManager != nil {
g.nodeIPManager.Run(stopChan, wg)
if err := g.nodeIPManager.Run(stopChan, wg); err != nil {
return err
}
}

if g.openflowManager != nil {
klog.Info("Spawning Conntrack Rule Check Thread")
g.openflowManager.Run(stopChan, wg)
}
return nil
}

// sets up an uplink interface for UDP Generic Receive Offload forwarding as part of
Expand Down Expand Up @@ -287,6 +290,21 @@ func gatewayReady(patchPort string) (bool, error) {
return true, nil
}

func hostAddressAnnotationReady(watchFactory factory.NodeWatchFactory, nodeName string) (bool, error) {
node, err := watchFactory.GetNode(nodeName)
if err != nil {
return false, fmt.Errorf("failed to node %s: %v", nodeName, err)
}
_, err = util.ParseNodeHostAddresses(node)
if err != nil {
if !util.IsAnnotationNotSetError(err) {
return false, fmt.Errorf("failed to get host addresses for node: %s: %v", nodeName, err)
}
return false, nil
}
return true, nil
}

func (g *gateway) GetGatewayBridgeIface() string {
return g.openflowManager.defaultBridge.bridgeName
}
Expand Down
9 changes: 7 additions & 2 deletions go-controller/pkg/node/gateway_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func interfaceForEXGW(intfName string) string {
return intfName
}

func (n *OvnNode) initGatewayDPUHost(kubeNodeIP net.IP) error {
func (n *OvnNode) initGatewayDPUHost(cfg *managementPortConfig, kubeNodeIP net.IP) error {
// A DPU host gateway is complementary to the shared gateway running
// on the DPU embedded CPU. it performs some initializations and
// watch on services for iptable rule updates and run a loadBalancerHealth checker
Expand Down Expand Up @@ -423,7 +423,12 @@ func (n *OvnNode) initGatewayDPUHost(kubeNodeIP net.IP) error {
if err := initSharedGatewayIPTables(); err != nil {
return err
}
gw.nodePortWatcherIptables = newNodePortWatcherIptables()
gw.nodeIPManager, err = newAddressManager(n.name, n.Kube, cfg, n.watchFactory)
if err != nil {
return err
}

gw.nodePortWatcherIptables = newNodePortWatcherIptables(n.watchFactory, gw.nodeIPManager)
gw.loadBalancerHealthChecker = newLoadBalancerHealthChecker(n.name)
portClaimWatcher, err := newPortClaimWatcher(n.recorder)
if err != nil {
Expand Down
22 changes: 21 additions & 1 deletion go-controller/pkg/node/gateway_init_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ func shareGatewayInterfaceDPUHostTest(app *cli.App, testNS ns.NetNS, uplinkName,
clusterCIDR string = "10.1.0.0/16"
svcCIDR string = "172.16.1.0/24"
nodeName string = "node1"
nodeSubnet string = "10.1.1.0/24"
)

app.Action = func(ctx *cli.Context) error {
Expand All @@ -558,6 +559,25 @@ func shareGatewayInterfaceDPUHostTest(app *cli.App, testNS ns.NetNS, uplinkName,
EgressFirewallClient: egressFirewallFakeClient,
}

_, nodeNet, err := net.ParseCIDR(nodeSubnet)
Expect(err).NotTo(HaveOccurred())

// Make a fake MgmtPortConfig with only the fields we care about
fakeMgmtPortIPFamilyConfig := managementPortIPFamilyConfig{
ipt: nil,
allSubnets: nil,
ifAddr: nodeNet,
gwIP: nodeNet.IP,
}

fakeMgmtPortConfig := managementPortConfig{
ifName: nodeName,
link: nil,
routerMAC: nil,
ipv4: &fakeMgmtPortIPFamilyConfig,
ipv6: nil,
}

stop := make(chan struct{})
wf, err := factory.NewNodeWatchFactory(fakeClient, nodeName)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -573,7 +593,7 @@ func shareGatewayInterfaceDPUHostTest(app *cli.App, testNS ns.NetNS, uplinkName,
err = testNS.Do(func(ns.NetNS) error {
defer GinkgoRecover()

err := n.initGatewayDPUHost(net.ParseIP(hostIP))
err := n.initGatewayDPUHost(&fakeMgmtPortConfig, net.ParseIP(hostIP))
Expect(err).NotTo(HaveOccurred())

// Check svc and masquerade routes added towards eth0GWIP
Expand Down
25 changes: 20 additions & 5 deletions go-controller/pkg/node/gateway_localnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ func newLocalGateway(nodeName string, hostSubnets []*net.IPNet, gwNextHops []net
}
}

addrAnnotReady := true
if config.OvnKubeNode.Mode == types.NodeModeDPU {
addrAnnotReady, err = hostAddressAnnotationReady(watchFactory, nodeName)
if err != nil {
return nil, err
}
}

if exGwBridge != nil {
gw.readyFunc = func() (bool, error) {
ready, err := gatewayReady(gwBridge.patchPort)
Expand All @@ -65,11 +73,15 @@ func newLocalGateway(nodeName string, hostSubnets []*net.IPNet, gwNextHops []net
if err != nil {
return false, err
}
return ready && exGWReady, nil
return ready && exGWReady && addrAnnotReady, nil
}
} else {
gw.readyFunc = func() (bool, error) {
return gatewayReady(gwBridge.patchPort)
gwReady, err := gatewayReady(gwBridge.patchPort)
if err != nil {
return false, err
}
return gwReady && addrAnnotReady, nil
}
}

Expand All @@ -86,9 +98,12 @@ func newLocalGateway(nodeName string, hostSubnets []*net.IPNet, gwNextHops []net
}
}

gw.nodeIPManager = newAddressManager(nodeName, kube, cfg, watchFactory)

gw.openflowManager, err = newGatewayOpenFlowManager(gwBridge, exGwBridge, gw.nodeIPManager.ListAddresses())
gw.nodeIPManager, err = newAddressManager(nodeName, kube, cfg, watchFactory)
if err != nil {
return err
}
nodeIPs := gw.nodeIPManager.ListAddresses()
gw.openflowManager, err = newGatewayOpenFlowManager(gwBridge, exGwBridge, nodeIPs)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go-controller/pkg/node/gateway_localnet_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func initFakeNodePortWatcher(iptV4, iptV6 util.IPTablesHelper) *nodePortWatcher
ofportPatch: "patch-breth0_ov",
gatewayIPv4: v4localnetGatewayIP,
gatewayIPv6: v6localnetGatewayIP,
serviceInfo: make(map[k8stypes.NamespacedName]*serviceConfig),
SvcInfo: SvcInfo{serviceInfo: make(map[k8stypes.NamespacedName]*serviceConfig)},
ofm: &openflowManager{
flowCache: map[string][]string{},
},
Expand All @@ -59,7 +59,7 @@ func startNodePortWatcher(n *nodePortWatcher, fakeClient *util.OVNClientset, fak
}

k := &kube.Kube{fakeClient.KubeClient, nil, nil, nil}
n.nodeIPManager = newAddressManager(fakeNodeName, k, fakeMgmtPortConfig, n.watchFactory)
n.nodeIPManager, _ = newAddressManager(fakeNodeName, k, fakeMgmtPortConfig, n.watchFactory)
localHostNetEp := "192.168.18.15/32"
ip, _, _ := net.ParseCIDR(localHostNetEp)
n.nodeIPManager.addAddr(ip)
Expand Down
Loading

0 comments on commit f138313

Please sign in to comment.