From 58205f7b139a133e4f01411eb198c77d580ec674 Mon Sep 17 00:00:00 2001 From: Surya Seetharaman Date: Sat, 7 Dec 2024 19:46:47 +0100 Subject: [PATCH] Make ANPController networks aware Signed-off-by: Surya Seetharaman --- .../admin_network_policy.go | 532 ++++++++++++------ .../baseline_admin_network_policy.go | 25 +- .../controller/admin_network_policy/types.go | 129 ++--- 3 files changed, 444 insertions(+), 242 deletions(-) diff --git a/go-controller/pkg/ovn/controller/admin_network_policy/admin_network_policy.go b/go-controller/pkg/ovn/controller/admin_network_policy/admin_network_policy.go index 1520b4561e..cdb49919da 100644 --- a/go-controller/pkg/ovn/controller/admin_network_policy/admin_network_policy.go +++ b/go-controller/pkg/ovn/controller/admin_network_policy/admin_network_policy.go @@ -14,6 +14,7 @@ import ( libovsdbutil "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdb/util" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -121,11 +122,9 @@ func (c *Controller) ensureAdminNetworkPolicy(anp *anpapi.AdminNetworkPolicy) er // fetch the anpState from our cache if it exists currentANPState, loaded := c.anpCache[anp.Name] // Based on the latest kapi ANP, namespace and pod objects: - // 1) Construct Port Group name using ANP name and ports of pods in ANP subject - // 2) Construct Address-sets with IPs of the peers in the rules - // 3) Construct ACLs using AS-es and PGs - portGroupName := c.getANPPortGroupName(desiredANPState.name, false) - + // 1) Construct Address-sets with IPs of the peers in the rules + // 2) Construct ACLs using AS-es and PGs + // for all networks in the cluster desiredPorts, err := c.convertANPSubjectToLSPs(desiredANPState) if err != nil { return fmt.Errorf("unable to fetch ports for anp %s: %v", desiredANPState.name, err) @@ -135,7 +134,7 @@ func (c *Controller) ensureAdminNetworkPolicy(anp *anpapi.AdminNetworkPolicy) er return fmt.Errorf("unable to convert peers to addresses for anp %s: %v", desiredANPState.name, err) } atLeastOneRuleUpdated := false - desiredACLs := c.convertANPRulesToACLs(desiredANPState, currentANPState, portGroupName, &atLeastOneRuleUpdated, false) + desiredACLs := c.convertANPRulesToACLs(desiredANPState, currentANPState, &atLeastOneRuleUpdated, false) if !loaded { // this is a fresh ANP create @@ -206,17 +205,16 @@ func (c *Controller) ensureAdminNetworkPolicy(anp *anpapi.AdminNetworkPolicy) er // convertANPRulesToACLs takes all the rules belonging to the ANP and initiates the conversion of rule->acl // if currentANPState exists; then we also see if any of the current v/s desired ACLs had a state change // and if so, we return atLeastOneRuleUpdated=true -func (c *Controller) convertANPRulesToACLs(desiredANPState, currentANPState *adminNetworkPolicyState, pgName string, - atLeastOneRuleUpdated *bool, isBanp bool) []*nbdb.ACL { - acls := []*nbdb.ACL{} +func (c *Controller) convertANPRulesToACLs(desiredANPState, currentANPState *adminNetworkPolicyState, + atLeastOneRuleUpdated *bool, isBanp bool) map[string][]*nbdb.ACL { + acls := make(map[string][]*nbdb.ACL) // isAtLeastOneRuleUpdatedCheckRequired is set to true, if we had an anp already in cache (update) AND the rule lengths are the same // if the rule lengths are different we do a full peer recompute in ensureAdminNetworkPolicy anyways isAtLeastOneRuleUpdatedCheckRequired := (currentANPState != nil && currentANPState.name != "" && len(currentANPState.ingressRules) == len(desiredANPState.ingressRules) && len(currentANPState.egressRules) == len(desiredANPState.egressRules)) for i, ingressRule := range desiredANPState.ingressRules { - acl := c.convertANPRuleToACL(ingressRule, pgName, desiredANPState.name, desiredANPState.aclLoggingParams, isBanp) - acls = append(acls, acl...) + c.convertANPRuleToACL(ingressRule, desiredANPState.name, desiredANPState.managedNetworks, desiredANPState.aclLoggingParams, isBanp, acls) if isAtLeastOneRuleUpdatedCheckRequired && !*atLeastOneRuleUpdated && (ingressRule.action != currentANPState.ingressRules[i].action || @@ -227,8 +225,7 @@ func (c *Controller) convertANPRulesToACLs(desiredANPState, currentANPState *adm } } for i, egressRule := range desiredANPState.egressRules { - acl := c.convertANPRuleToACL(egressRule, pgName, desiredANPState.name, desiredANPState.aclLoggingParams, isBanp) - acls = append(acls, acl...) + c.convertANPRuleToACL(egressRule, desiredANPState.name, desiredANPState.managedNetworks, desiredANPState.aclLoggingParams, isBanp, acls) if isAtLeastOneRuleUpdatedCheckRequired && !*atLeastOneRuleUpdated && (egressRule.action != currentANPState.egressRules[i].action || @@ -244,16 +241,11 @@ func (c *Controller) convertANPRulesToACLs(desiredANPState, currentANPState *adm // convertANPRuleToACL takes the given gressRule and converts it into an ACL(0 ports rule) or // multiple ACLs(ports are set) and returns those ACLs for a given gressRule -func (c *Controller) convertANPRuleToACL(rule *gressRule, pgName, anpName string, aclLoggingParams *libovsdbutil.ACLLoggingLevels, isBanp bool) []*nbdb.ACL { +func (c *Controller) convertANPRuleToACL(rule *gressRule, anpName string, managedNetworks sets.Set[string], + aclLoggingParams *libovsdbutil.ACLLoggingLevels, isBanp bool, acls map[string][]*nbdb.ACL) { klog.V(5).Infof("Creating ACL for rule %d/%s belonging to ANP %s", rule.priority, rule.gressPrefix, anpName) - // create match based on direction and address-set name - asIndex := GetANPPeerAddrSetDbIDs(anpName, rule.gressPrefix, fmt.Sprintf("%d", rule.gressIndex), c.controllerName, isBanp) - l3Match := constructMatchFromAddressSet(rule.gressPrefix, asIndex) - // create match based on rule type (ingress/egress) and port-group - lportMatch := libovsdbutil.GetACLMatch(pgName, "", libovsdbutil.ACLDirection(rule.gressPrefix)) var match string hasNamedPorts := len(rule.namedPorts) > 0 - acls := []*nbdb.ACL{} // We will have // - one single ACL if len(rule.ports) == 0 && len(rule.namedPorts) == 0 // - one ACL per protocol if len(rule.ports) > 0 and len(rule.namedPorts) == 0 @@ -262,44 +254,67 @@ func (c *Controller) convertANPRuleToACL(rule *gressRule, pgName, anpName string // (so max 3 ACLs (tcp,udp,sctp) per rule {namedPort type ports ONLY}) // - one ACL per protocol if len(rule.ports) > 0 and one ACL per protocol if len(rule.namedPorts) > 0 // (so max 6 ACLs (2tcp,2udp,2sctp) per rule {{portNumber, portRange, namedPorts ALL PRESENT}}) + // for each network matching this ANP for protocol, l4Match := range libovsdbutil.GetL4MatchesFromNetworkPolicyPorts(rule.ports) { - if l4Match == libovsdbutil.UnspecifiedL4Match { - if hasNamedPorts { - continue - } // if we have namedPorts we shouldn't add the noneProtocol ACL even if the namedPort doesn't match any pods - match = fmt.Sprintf("%s && %s", lportMatch, l3Match) - } else { - match = fmt.Sprintf("%s && %s && %s", lportMatch, l3Match, l4Match) - } - acl := libovsdbutil.BuildANPACL( - getANPRuleACLDbIDs(anpName, rule.gressPrefix, fmt.Sprintf("%d", rule.gressIndex), protocol, c.controllerName, isBanp), - int(rule.priority), - match, - rule.action, - libovsdbutil.ACLDirectionToACLPipeline(libovsdbutil.ACLDirection(rule.gressPrefix)), - aclLoggingParams, - ) - acls = append(acls, acl) + for netName := range managedNetworks { + // create match based on direction and address-set name + asIndex := GetANPPeerAddrSetDbIDs(anpName, rule.gressPrefix, fmt.Sprintf("%d", rule.gressIndex), netName, isBanp) + l3Match := constructMatchFromAddressSet(rule.gressPrefix, asIndex) + // create match based on rule type (ingress/egress) and port-group + pgName := c.getANPPortGroupName(anpName, netName, isBanp) + lportMatch := libovsdbutil.GetACLMatch(pgName, "", libovsdbutil.ACLDirection(rule.gressPrefix)) + if l4Match == libovsdbutil.UnspecifiedL4Match { + if hasNamedPorts { + continue + } // if we have namedPorts we shouldn't add the noneProtocol ACL even if the namedPort doesn't match any pods + match = fmt.Sprintf("%s && %s", lportMatch, l3Match) + } else { + match = fmt.Sprintf("%s && %s && %s", lportMatch, l3Match, l4Match) + } + acl := libovsdbutil.BuildANPACL( + getANPRuleACLDbIDs(anpName, rule.gressPrefix, fmt.Sprintf("%d", rule.gressIndex), protocol, netName, isBanp), + int(rule.priority), + match, + rule.action, + libovsdbutil.ACLDirectionToACLPipeline(libovsdbutil.ACLDirection(rule.gressPrefix)), + aclLoggingParams, + ) + _, ok := acls[netName] + if !ok { + acls[netName] = []*nbdb.ACL{} + } + acls[netName] = append(acls[netName], acl) + } } // Process match for NamedPorts if any for protocol, l3l4Match := range libovsdbutil.GetL3L4MatchesFromNamedPorts(rule.namedPorts) { - if rule.gressPrefix == string(libovsdbutil.ACLIngress) { - match = fmt.Sprintf("%s && %s", l3Match, l3l4Match) - } else { - match = fmt.Sprintf("%s && %s", lportMatch, l3l4Match) + for netName := range managedNetworks { + // create match based on direction and address-set name + asIndex := GetANPPeerAddrSetDbIDs(anpName, rule.gressPrefix, fmt.Sprintf("%d", rule.gressIndex), netName, isBanp) + l3Match := constructMatchFromAddressSet(rule.gressPrefix, asIndex) + // create match based on rule type (ingress/egress) and port-group + pgName := c.getANPPortGroupName(anpName, netName, isBanp) + lportMatch := libovsdbutil.GetACLMatch(pgName, "", libovsdbutil.ACLDirection(rule.gressPrefix)) + if rule.gressPrefix == string(libovsdbutil.ACLIngress) { + match = fmt.Sprintf("%s && %s", l3Match, l3l4Match) + } else { + match = fmt.Sprintf("%s && %s", lportMatch, l3l4Match) + } + acl := libovsdbutil.BuildANPACL( + getANPRuleACLDbIDs(anpName, rule.gressPrefix, fmt.Sprintf("%d", rule.gressIndex), protocol+libovsdbutil.NamedPortL4MatchSuffix, netName, isBanp), + int(rule.priority), + match, + rule.action, + libovsdbutil.ACLDirectionToACLPipeline(libovsdbutil.ACLDirection(rule.gressPrefix)), + aclLoggingParams, + ) + _, ok := acls[netName] + if !ok { + acls[netName] = []*nbdb.ACL{} + } + acls[netName] = append(acls[netName], acl) } - acl := libovsdbutil.BuildANPACL( - getANPRuleACLDbIDs(anpName, rule.gressPrefix, fmt.Sprintf("%d", rule.gressIndex), protocol+libovsdbutil.NamedPortL4MatchSuffix, c.controllerName, isBanp), - int(rule.priority), - match, - rule.action, - libovsdbutil.ACLDirectionToACLPipeline(libovsdbutil.ACLDirection(rule.gressPrefix)), - aclLoggingParams, - ) - acls = append(acls, acl) } - - return acls } // expandANPRulePeers takes all the peers belonging to each of the ANP rule and initiates the conversion @@ -307,14 +322,14 @@ func (c *Controller) convertANPRuleToACL(rule *gressRule, pgName, anpName string func (c *Controller) expandANPRulePeers(anp *adminNetworkPolicyState) error { var err error for _, ingressRule := range anp.ingressRules { - err = c.expandRulePeers(ingressRule) // namedPorts has to be processed for subject in case of ingress rules + err = c.expandRulePeers(ingressRule, anp.managedNetworks) // namedPorts has to be processed for subject in case of ingress rules if err != nil { return fmt.Errorf("unable to create address set for "+ " rule %s with priority %d: %w", ingressRule.name, ingressRule.priority, err) } } for _, egressRule := range anp.egressRules { - err = c.expandRulePeers(egressRule) + err = c.expandRulePeers(egressRule, anp.managedNetworks) if err != nil { return fmt.Errorf("unable to create address set for "+ " rule %s with priority %d: %w", egressRule.name, egressRule.priority, err) @@ -328,7 +343,8 @@ func (c *Controller) expandANPRulePeers(anp *adminNetworkPolicyState) error { // This function also takes care of populating the adminNetworkPolicyPeer.namespaces cache // It also adds up all the peerAddresses that are supposed to be present in the created AddressSet and returns them on // a per-rule basis so that the actual ops to transact these into the AddressSet can be constructed using that -func (c *Controller) expandRulePeers(rule *gressRule) error { +func (c *Controller) expandRulePeers(rule *gressRule, managedNetworks sets.Set[string]) error { + networkScopedPeerAddresses := make(map[string]sets.Set[string]) for _, peer := range rule.peers { namespaces, err := c.anpNamespaceLister.List(peer.namespaceSelector) if err != nil { @@ -347,12 +363,22 @@ func (c *Controller) expandRulePeers(rule *gressRule) error { if err != nil { return err } + netInfo, err := c.networkManager.GetActiveNetworkForNamespace(namespace.Name) + if err != nil { + return err + } + managedNetworks.Insert(netInfo.GetNetworkName()) + peerAddresses, ok := networkScopedPeerAddresses[netInfo.GetNetworkName()] + if !ok { + peerAddresses = sets.Set[string]{} + networkScopedPeerAddresses[netInfo.GetNetworkName()] = peerAddresses + } for _, pod := range pods { // we don't handle HostNetworked or completed pods; unscheduled pods shall be handled via pod update path if util.PodWantsHostNetwork(pod) || util.PodCompleted(pod) || !util.PodScheduled(pod) { continue } - podIPs, err := util.GetPodIPsOfNetwork(pod, &util.DefaultNetInfo{}) + podIPs, err := util.GetPodIPsOfNetwork(pod, netInfo) if err != nil { if errors.Is(err, util.ErrNoPodIPFound) { // we ignore podIPsNotFound error here because onANPPodUpdate @@ -362,7 +388,7 @@ func (c *Controller) expandRulePeers(rule *gressRule) error { } return err // we won't hit this TBH because the only error that GetPodIPsOfNetwork returns is podIPsNotFound } - rule.peerAddresses.Insert(util.StringSlice(podIPs)...) + peerAddresses.Insert(util.StringSlice(podIPs)...) podCache.Insert(pod.Name) // Process NamedPorts if any if len(rule.namedPorts) == 0 { @@ -398,21 +424,42 @@ func (c *Controller) expandRulePeers(rule *gressRule) error { if err != nil { // Annotation not found errors are ignored, they will come as node updates return err } - rule.peerAddresses.Insert(nodeIPs...) + // for each network's set of peerIPs, add the nodeIPs + for networkName := range managedNetworks { + _, ok := networkScopedPeerAddresses[networkName] + if !ok { + // means a representation exists for this network matching a subject namespace since subject is processed first + // so let's add the nodeIPs as peers + networkScopedPeerAddresses[networkName] = make(sets.Set[string]) + } + networkScopedPeerAddresses[networkName].Insert(nodeIPs...) + } nodeCache.Insert(node.Name) } peer.nodes = nodeCache + // for each network's set of peerIPs, add the CIDR ranges + for networkName := range managedNetworks { + _, ok := networkScopedPeerAddresses[networkName] + if !ok { + // means a representation exists for this network matching a subject namespace since subject is processed first + // so let's add the nodeIPs as peers + networkScopedPeerAddresses[networkName] = make(sets.Set[string]) + } + networkScopedPeerAddresses[networkName].Insert(peer.networks.UnsortedList()...) + } } + rule.peerAddresses = networkScopedPeerAddresses return nil } -// convertANPSubjectToLSPs calculates all the LSP's that match for the provided anp's subject and returns them +// convertANPSubjectToLSPs calculates all the LSP's that match for the provided anp's subject across all networks +// and returns them // It also populates the adminNetworkPolicySubject.namespaces and adminNetworkPolicySubject.podPorts // pieces of the cache // Since we have to loop through all the pods here, we also take the opportunity to update our namedPorts cache -func (c *Controller) convertANPSubjectToLSPs(anp *adminNetworkPolicyState) ([]*nbdb.LogicalSwitchPort, error) { - lsports := []*nbdb.LogicalSwitchPort{} - anp.subject.podPorts = sets.Set[string]{} +func (c *Controller) convertANPSubjectToLSPs(anp *adminNetworkPolicyState) (map[string][]*nbdb.LogicalSwitchPort, error) { + networkScopedLSPs := make(map[string][]*nbdb.LogicalSwitchPort) + networkScopedPodPorts := make(map[string]sets.Set[string]) namespaces, err := c.anpNamespaceLister.List(anp.subject.namespaceSelector) if err != nil { return nil, err @@ -441,11 +488,39 @@ func (c *Controller) convertANPSubjectToLSPs(anp *adminNetworkPolicyState) ([]*n if err != nil { return nil, err } + netInfo, err := c.networkManager.GetActiveNetworkForNamespace(namespace.Name) + if err != nil { + return nil, err + } + anp.managedNetworks.Insert(netInfo.GetNetworkName()) + podPorts, ok := networkScopedPodPorts[netInfo.GetNetworkName()] + if !ok { + podPorts = sets.Set[string]{} + networkScopedPodPorts[netInfo.GetNetworkName()] = podPorts + } + lsPorts, ok := networkScopedLSPs[netInfo.GetNetworkName()] + if !ok { + lsPorts = []*nbdb.LogicalSwitchPort{} + } for _, pod := range pods { if util.PodWantsHostNetwork(pod) || util.PodCompleted(pod) || !util.PodScheduled(pod) || !c.isPodScheduledinLocalZone(pod) { continue } - logicalPortName := util.GetLogicalPortName(pod.Namespace, pod.Name) + var logicalPortName string + if netInfo.IsDefault() { + logicalPortName = util.GetLogicalPortName(pod.Namespace, pod.Name) + } else { + nadNames, err := util.PodNadNames(pod, netInfo) + if err != nil { + return nil, err + } + if len(nadNames) == 0 { + return nil, fmt.Errorf("pod %s/%s must contain network attach definition for its user defined network %s", + pod.Namespace, pod.Name, netInfo.GetNetworkName()) + } + logicalPortName = util.GetSecondaryNetworkLogicalPortName(pod.Namespace, pod.Name, nadNames[0]) + } + lsp := &nbdb.LogicalSwitchPort{Name: logicalPortName} lsp, err = libovsdbops.GetLogicalSwitchPort(c.nbClient, lsp) if err != nil { @@ -463,8 +538,8 @@ func (c *Controller) convertANPSubjectToLSPs(anp *adminNetworkPolicyState) ([]*n return nil, fmt.Errorf("error retrieving logical switch port with name %s "+ " from libovsdb cache: %w", logicalPortName, err) } - lsports = append(lsports, lsp) - anp.subject.podPorts.Insert(lsp.UUID) + lsPorts = append(lsPorts, lsp) + podPorts.Insert(lsp.UUID) podCache.Insert(pod.Name) if len(namedPortMatchingRulesIndexes) == 0 { continue @@ -496,6 +571,7 @@ func (c *Controller) convertANPSubjectToLSPs(anp *adminNetworkPolicyState) ([]*n } } } + networkScopedLSPs[netInfo.GetNetworkName()] = lsPorts } // we have to store the sorted slice here because in convertANPRulesToACLs // we use DeepEqual to compare ports which doesn't do well with unordered slices @@ -505,8 +581,9 @@ func (c *Controller) convertANPSubjectToLSPs(anp *adminNetworkPolicyState) ([]*n } } anp.subject.namespaces = namespaceCache + anp.subject.podPorts = networkScopedPodPorts - return lsports, nil + return networkScopedLSPs, nil } // clearAdminNetworkPolicy will handle the logic for deleting all db objects related @@ -523,12 +600,16 @@ func (c *Controller) clearAdminNetworkPolicy(anpName string) error { // clear NBDB objects for the given ANP (PG, ACLs on that PG, AddrSets used by the ACLs) var err error - // remove PG for Subject (ACLs will get cleaned up automatically) - portGroupName := c.getANPPortGroupName(anp.name, false) + // remove PG for Subject (ACLs will get cleaned up automatically) across all networks + predicateIDs := libovsdbops.NewDbObjectIDsAcrossAllContollers(libovsdbops.AddressSetAdminNetworkPolicy, + map[libovsdbops.ExternalIDKey]string{ + libovsdbops.ObjectNameKey: anp.name, + }) + pgPredicate := libovsdbops.GetPredicateAcrossAllControllers[*nbdb.PortGroup](predicateIDs, nil) // no need to batch this with address-set deletes since this itself will contain a bunch of ACLs that need to be deleted which is heavy enough. - err = libovsdbops.DeletePortGroups(c.nbClient, portGroupName) + err = libovsdbops.DeletePortGroupsWithPredicate(c.nbClient, pgPredicate) if err != nil { - return fmt.Errorf("unable to delete PG %s for ANP %s: %w", portGroupName, anp.name, err) + return fmt.Errorf("unable to delete PGs for ANP %s: %w", anp.name, err) } // remove address-sets that were created for the peers of each rule fpr the whole ANP // do this after ACLs are gone so that there is no lingering references @@ -549,11 +630,11 @@ func (c *Controller) clearAdminNetworkPolicy(anpName string) error { // clearASForPeers takes the externalID objectIDs and uses them to delete all the address-sets // that were owned by anpName func (c *Controller) clearASForPeers(anpName string, idType *libovsdbops.ObjectIDsType) error { - predicateIDs := libovsdbops.NewDbObjectIDs(idType, c.controllerName, + predicateIDs := libovsdbops.NewDbObjectIDsAcrossAllContollers(idType, map[libovsdbops.ExternalIDKey]string{ libovsdbops.ObjectNameKey: anpName, }) - asPredicate := libovsdbops.GetPredicate[*nbdb.AddressSet](predicateIDs, nil) + asPredicate := libovsdbops.GetPredicateAcrossAllControllers[*nbdb.AddressSet](predicateIDs, nil) if err := libovsdbops.DeleteAddressSetsWithPredicate(c.nbClient, asPredicate); err != nil { return fmt.Errorf("failed to destroy address-set for ANP %s, err: %v", anpName, err) } @@ -561,26 +642,34 @@ func (c *Controller) clearASForPeers(anpName string, idType *libovsdbops.ObjectI } // createNewANP takes the desired state of the anp and creates the corresponding objects in the NBDB -func (c *Controller) createNewANP(desiredANPState *adminNetworkPolicyState, desiredACLs []*nbdb.ACL, - desiredPorts []*nbdb.LogicalSwitchPort, isBanp bool) error { +func (c *Controller) createNewANP(desiredANPState *adminNetworkPolicyState, desiredACLs map[string][]*nbdb.ACL, + desiredPorts map[string][]*nbdb.LogicalSwitchPort, isBanp bool) error { ops := []ovsdb.Operation{} // now CreateOrUpdate the address-sets; add the right IPs - we treat the rest of the address-set cases as a fresh add or update - addrSetOps, err := c.constructOpsForRuleChanges(desiredANPState, isBanp) + addrSetOps, err := c.constructOpsForRuleChanges(desiredANPState, isBanp, nil) if err != nil { return fmt.Errorf("failed to create address-sets, %v", err) } ops = append(ops, addrSetOps...) - ops, err = libovsdbops.CreateOrUpdateACLsOps(c.nbClient, ops, c.GetSamplingConfig(), desiredACLs...) + var aclsAcrossAllNetworks []*nbdb.ACL + for _, acls := range desiredACLs { + aclsAcrossAllNetworks = append(aclsAcrossAllNetworks, acls...) + } + ops, err = libovsdbops.CreateOrUpdateACLsOps(c.nbClient, ops, c.GetSamplingConfig(), aclsAcrossAllNetworks...) if err != nil { return fmt.Errorf("failed to create ACL ops: %v", err) } - pgDbIDs := GetANPPortGroupDbIDs(desiredANPState.name, isBanp, c.controllerName) - pg := libovsdbutil.BuildPortGroup(pgDbIDs, desiredPorts, desiredACLs) - ops, err = libovsdbops.CreateOrUpdatePortGroupsOps(c.nbClient, ops, pg) - if err != nil { - return fmt.Errorf("failed to create ops to add port to a port group: %v", err) + // For a given ANP there will be 1PG representation for each network in the cluster + for networkName := range desiredANPState.managedNetworks { + pgDbIDs := GetANPPortGroupDbIDs(desiredANPState.name, isBanp, networkName) + pg := libovsdbutil.BuildPortGroup(pgDbIDs, desiredPorts[networkName], desiredACLs[networkName]) + ops, err = libovsdbops.CreateOrUpdatePortGroupsOps(c.nbClient, ops, pg) + if err != nil { + return fmt.Errorf("failed to create ops to add port to a port group: %v", err) + } } + _, err = libovsdbops.TransactAndCheck(c.nbClient, ops) if err != nil { return fmt.Errorf("failed to run ovsdb txn to add ports to port group: %v", err) @@ -589,24 +678,34 @@ func (c *Controller) createNewANP(desiredANPState *adminNetworkPolicyState, desi } func (c *Controller) updateExistingANP(currentANPState, desiredANPState *adminNetworkPolicyState, atLeastOneRuleUpdated, - hasPriorityChanged, isBanp bool, desiredACLs []*nbdb.ACL) error { + hasPriorityChanged, isBanp bool, desiredACLs map[string][]*nbdb.ACL) error { var ops []ovsdb.Operation var err error - portGroupName := c.getANPPortGroupName(desiredANPState.name, isBanp) - // Did ANP.Spec.Ingress Change (rule inserts/deletes)? && || Did ANP.Spec.Egress Change (rule inserts/deletes)? && || - // If yes we need to fully recompute the acls present in our ANP's port group; Let's do a full recompute and return. + // Based on network adds/deletes the port-groups will have to be added/deleted + // Hence we need to transact the ops for port-group changes first so that + // subsequent ACL/Port changes all reference existing port-groups + // NOTE that this will also remove any ACLs for the deleted networks + networksToAdd := desiredANPState.managedNetworks.Difference(currentANPState.managedNetworks) + networksToDelete := currentANPState.managedNetworks.Difference(desiredANPState.managedNetworks) + if err := c.updatePortGroupsForNetworkChanges(networksToAdd, networksToDelete, desiredANPState.name, isBanp); err != nil { + return fmt.Errorf("failed to create or delete port groups for ANP %s, err: %v", desiredANPState.name, err) + } + // Did ANP.Spec.Ingress Change (rule inserts/deletes)? && || Did ANP.Spec.Egress Change (rule inserts/deletes)? && || networkChanges? + // If yes we need to fully recompute the acls present in our ANP's port groups for each network; Let's do a full recompute and return. + // constructOpsForRuleChanges updates all the address-sets for our ANPs across all networks + // NOTE that it is also called to create new address-sets for newly added networks + // and delete address-sets for deleted networks => for rest of the networks it will be no-op // Reason behind a full recompute: Each rule has precedence based on its position and priority of ANP; if any of that changes - // better to delete and recreate ACLs rather than figure out from caches - // rather than always cleaning up everything and recreating them. But this is tricky since rules have precedence - // from their ordering. - // NOTE: Changes to admin policies should be a rare action (can be improved post user feedback) - usually churn would be around namespaces and pods + // better to delete and recreate ACLs rather than figure out from caches a diff of what needs change since that would be more complicated + // NOTE: Changes to admin policies should be a rare action (so this can be improved post user feedback) - usually churn would be around namespaces and pods fullPeerRecompute := (len(currentANPState.egressRules) != len(desiredANPState.egressRules) || len(currentANPState.ingressRules) != len(desiredANPState.ingressRules)) - if fullPeerRecompute { + networkChanges := len(networksToAdd) > 0 || len(networksToDelete) > 0 + if fullPeerRecompute || networkChanges { // full recompute // which means update all ACLs and address-sets klog.V(3).Infof("ANP %s with priority (old %d, new %d) was updated", desiredANPState.name, currentANPState.anpPriority, desiredANPState.anpPriority) - ops, err = c.constructOpsForRuleChanges(desiredANPState, isBanp) + ops, err = c.constructOpsForRuleChanges(desiredANPState, isBanp, networksToDelete) if err != nil { return fmt.Errorf("failed to create update ANP ops %s: %v", desiredANPState.name, err) } @@ -623,10 +722,10 @@ func (c *Controller) updateExistingANP(currentANPState, desiredANPState *adminNe // 2) ANP.Spec.Ingress.Peers.Pods changed && || // 3) A namespace started or stopped matching the peer && || // 4) A pod started or stopped matching the peer - // If yes we need to recompute the IPs present in our ANP's peer's address-sets - if !fullPeerRecompute && !reflect.DeepEqual(desiredANPState.ingressRules, currentANPState.ingressRules) { + // If yes we need to recompute the IPs present in our ANP's peer's address-sets for all networks + if !fullPeerRecompute && !networkChanges && !reflect.DeepEqual(desiredANPState.ingressRules, currentANPState.ingressRules) { addrOps, err := c.constructOpsForPeerChanges(desiredANPState.ingressRules, - currentANPState.ingressRules, desiredANPState.name, isBanp) + currentANPState.ingressRules, desiredANPState.name, isBanp, desiredANPState.managedNetworks) if err != nil { return fmt.Errorf("failed to create ops for changes to ANP ingress peers: %v", err) } @@ -646,10 +745,10 @@ func (c *Controller) updateExistingANP(currentANPState, desiredANPState *adminNe // 4) A namespace started or stopped matching the peer && || // 5) A pod started or stopped matching the peer && || // 6) A node started or stopped matching the peer - // If yes we need to recompute the IPs present in our ANP's peer's address-sets - if !fullPeerRecompute && !reflect.DeepEqual(desiredANPState.egressRules, currentANPState.egressRules) { + // If yes we need to recompute the IPs present in our ANP's peer's address-sets for all networks + if !fullPeerRecompute && !networkChanges && !reflect.DeepEqual(desiredANPState.egressRules, currentANPState.egressRules) { addrOps, err := c.constructOpsForPeerChanges(desiredANPState.egressRules, - currentANPState.egressRules, desiredANPState.name, isBanp) + currentANPState.egressRules, desiredANPState.name, isBanp, desiredANPState.managedNetworks) if err != nil { return fmt.Errorf("failed to create ops for changes to ANP egress peers: %v", err) } @@ -669,18 +768,26 @@ func (c *Controller) updateExistingANP(currentANPState, desiredANPState *adminNe // (2) atLeastOneRuleUpdated=true which means the gress rules were of same lengths but action or ports changed on at least one rule // (3) hasPriorityChanged=true which means we should update acl.Priority for every ACL // (4) hasACLLoggingParamsChanged=true which means we should update acl.Severity/acl.Log for every ACL - if fullPeerRecompute || atLeastOneRuleUpdated || hasPriorityChanged || hasACLLoggingParamsChanged { + // (5) len(networksToAdd) > 0 which means new networks were added (delete case is already covered when PGs were deleted) + if fullPeerRecompute || atLeastOneRuleUpdated || hasPriorityChanged || hasACLLoggingParamsChanged || len(networksToAdd) > 0 { klog.V(3).Infof("ANP %s with priority %d was updated", desiredANPState.name, desiredANPState.anpPriority) // now update the acls to the desired ones - ops, err = libovsdbops.CreateOrUpdateACLsOps(c.nbClient, ops, c.GetSamplingConfig(), desiredACLs...) - if err != nil { - return fmt.Errorf("failed to create new ACL ops for anp %s: %v", desiredANPState.name, err) - } - // since we update the portgroup with the new set of ACLs, any unreferenced set of ACLs - // will be automatically removed - ops, err = libovsdbops.UpdatePortGroupSetACLsOps(c.nbClient, ops, portGroupName, desiredACLs) - if err != nil { - return fmt.Errorf("failed to create ACL-on-PG update ops for anp %s: %v", desiredANPState.name, err) + for networkName := range desiredANPState.managedNetworks { + acls, ok := desiredACLs[networkName] + if !ok { + acls = []*nbdb.ACL{} + } + ops, err = libovsdbops.CreateOrUpdateACLsOps(c.nbClient, ops, c.GetSamplingConfig(), acls...) + if err != nil { + return fmt.Errorf("failed to create new ACL ops for anp %s: %v", desiredANPState.name, err) + } + // since we update the portgroup with the new set of ACLs, any unreferenced set of ACLs + // will be automatically removed + portGroupName := c.getANPPortGroupName(desiredANPState.name, networkName, isBanp) + ops, err = libovsdbops.UpdatePortGroupSetACLsOps(c.nbClient, ops, portGroupName, acls) + if err != nil { + return fmt.Errorf("failed to create ACL-on-PG update ops for anp %s: %v", desiredANPState.name, err) + } } } @@ -689,8 +796,8 @@ func (c *Controller) updateExistingANP(currentANPState, desiredANPState *adminNe // 2) ANP.Spec.Pods changed && || // 3) A namespace started or stopped matching the subject && || // 4) A pod started or stopped matching the subject - // If yes we need to recompute the ports present in our ANP's port group - subjectOps, err := c.constructOpsForSubjectChanges(currentANPState, desiredANPState, portGroupName) + // If yes we need to recompute the ports present in our ANP's port groups for all networks + subjectOps, err := c.constructOpsForSubjectChanges(currentANPState, desiredANPState, isBanp) if err != nil { return fmt.Errorf("failed to create ops for changes to ANP %s subject: %v", desiredANPState.name, err) } @@ -702,8 +809,40 @@ func (c *Controller) updateExistingANP(currentANPState, desiredANPState *adminNe return nil } +// updatePortGroupsForNetworkChanges takes the newly added and deleted networks and +// creates/deletes the correspnding port groups for those networks for the given ANP +func (c *Controller) updatePortGroupsForNetworkChanges(networksToAdd, networksToDelete sets.Set[string], + anpName string, isBanp bool) error { + var pgOps []ovsdb.Operation + var err error + // If there are new networks then it means we need to create new PGs + // before we process any ACL or peer updates + for networkName := range networksToAdd { + pgDbIDs := GetANPPortGroupDbIDs(anpName, isBanp, networkName) + pg := libovsdbutil.BuildPortGroup(pgDbIDs, nil, nil) + pgOps, err = libovsdbops.CreateOrUpdatePortGroupsOps(c.nbClient, pgOps, pg) + if err != nil { + return fmt.Errorf("failed to create ops to add port to a port group: %v", err) + } + } + // If there are stale networks then it means we need to delete the PGs + // before we process any ACL or peer updates + for networkName := range networksToDelete { + portGroupName := c.getANPPortGroupName(anpName, networkName, isBanp) + pgOps, err = libovsdbops.DeletePortGroupsOps(c.nbClient, pgOps, portGroupName) + if err != nil { + return fmt.Errorf("failed to create ops to add port to a port group: %v", err) + } + } + if _, err := libovsdbops.TransactAndCheck(c.nbClient, pgOps); err != nil { + return fmt.Errorf("failed to run ovsdb txn to update portgroups for ANP %s: %v", + anpName, err) + } + return nil +} + // constructOpsForRuleChanges takes the desired state of the anp and returns the corresponding ops for updating NBDB objects -func (c *Controller) constructOpsForRuleChanges(desiredANPState *adminNetworkPolicyState, isBanp bool) ([]ovsdb.Operation, error) { +func (c *Controller) constructOpsForRuleChanges(desiredANPState *adminNetworkPolicyState, isBanp bool, networksToDelete sets.Set[string]) ([]ovsdb.Operation, error) { var ops []ovsdb.Operation var err error // Logic to delete address-sets: @@ -714,7 +853,7 @@ func (c *Controller) constructOpsForRuleChanges(desiredANPState *adminNetworkPol if isBanp { idType = libovsdbops.AddressSetBaselineAdminNetworkPolicy } - predicateIDs := libovsdbops.NewDbObjectIDs(idType, c.controllerName, + predicateIDs := libovsdbops.NewDbObjectIDsAcrossAllContollers(idType, map[libovsdbops.ExternalIDKey]string{ libovsdbops.ObjectNameKey: desiredANPState.name, }) @@ -725,31 +864,62 @@ func (c *Controller) constructOpsForRuleChanges(desiredANPState *adminNetworkPol (as.ExternalIDs[libovsdbops.PolicyDirectionKey.String()] == string(libovsdbutil.ACLIngress) && asIndex >= len(desiredANPState.ingressRules)) } - asPredicate := libovsdbops.GetPredicate[*nbdb.AddressSet](predicateIDs, predicateFunc) + asPredicate := libovsdbops.GetPredicateAcrossAllControllers[*nbdb.AddressSet](predicateIDs, predicateFunc) ops, err = libovsdbops.DeleteAddressSetsWithPredicateOps(c.nbClient, ops, asPredicate) if err != nil { return nil, fmt.Errorf("failed to create address-set destroy ops for ANP %s, err: %v", desiredANPState.name, err) } + if len(networksToDelete) > 0 { + for networkName := range networksToDelete { + if networkName == types.DefaultNetworkName { + networkName = defaultNetworkControllerName + } + // network delete event: so let's remove all the address-sets for this network + predicateIDs := libovsdbops.NewDbObjectIDs(idType, networkName, + map[libovsdbops.ExternalIDKey]string{ + libovsdbops.ObjectNameKey: desiredANPState.name, + }) + asPredicate := libovsdbops.GetPredicate[*nbdb.AddressSet](predicateIDs, nil) + ops, err = libovsdbops.DeleteAddressSetsWithPredicateOps(c.nbClient, ops, asPredicate) + if err != nil { + return nil, fmt.Errorf("failed to create address-set destroy ops for ANP %s, err: %v", desiredANPState.name, err) + } + } + } // TODO (tssurya): Revisit this logic to see if its better to do one address-set per peer instead of one address-set for all peers // Had briefly discussed this OVN team. We are not yet clear which is better since both have advantages and disadvantages. // Decide this after doing some scale runs. for _, rule := range desiredANPState.ingressRules { - asIndex := GetANPPeerAddrSetDbIDs(desiredANPState.name, rule.gressPrefix, fmt.Sprintf("%d", rule.gressIndex), c.controllerName, isBanp) - _, addrSetOps, err := c.addressSetFactory.NewAddressSetOps(asIndex, rule.peerAddresses.UnsortedList()) - if err != nil { - return nil, fmt.Errorf("failed to create address-sets for ANP %s's"+ - " ingress rule %s/%s/%d: %v", desiredANPState.name, rule.name, rule.gressPrefix, rule.priority, err) + for networkName := range desiredANPState.managedNetworks { + peerAddresses, ok := rule.peerAddresses[networkName] + if !ok { + // network add event: then we should create an empty representation of peers + peerAddresses = sets.Set[string]{} + } + asIndex := GetANPPeerAddrSetDbIDs(desiredANPState.name, rule.gressPrefix, fmt.Sprintf("%d", rule.gressIndex), networkName, isBanp) + _, addrSetOps, err := c.addressSetFactory.NewAddressSetOps(asIndex, peerAddresses.UnsortedList()) + if err != nil { + return nil, fmt.Errorf("failed to create address-sets for ANP %s's"+ + " ingress rule %s/%s/%d: %v as part of network %s", desiredANPState.name, rule.name, rule.gressPrefix, rule.priority, err, networkName) + } + ops = append(ops, addrSetOps...) } - ops = append(ops, addrSetOps...) } for _, rule := range desiredANPState.egressRules { - asIndex := GetANPPeerAddrSetDbIDs(desiredANPState.name, rule.gressPrefix, fmt.Sprintf("%d", rule.gressIndex), c.controllerName, isBanp) - _, addrSetOps, err := c.addressSetFactory.NewAddressSetOps(asIndex, rule.peerAddresses.UnsortedList()) - if err != nil { - return nil, fmt.Errorf("failed to create address-sets for ANP %s's"+ - " egress rule %s/%s/%d: %v", desiredANPState.name, rule.name, rule.gressPrefix, rule.priority, err) + for networkName := range desiredANPState.managedNetworks { + peerAddresses, ok := rule.peerAddresses[networkName] + if !ok { + // If this network has no matching peers, then we should create an empty representation of peers + peerAddresses = sets.Set[string]{} + } + asIndex := GetANPPeerAddrSetDbIDs(desiredANPState.name, rule.gressPrefix, fmt.Sprintf("%d", rule.gressIndex), networkName, isBanp) + _, addrSetOps, err := c.addressSetFactory.NewAddressSetOps(asIndex, peerAddresses.UnsortedList()) + if err != nil { + return nil, fmt.Errorf("failed to create address-sets for ANP %s's"+ + " egress rule %s/%s/%d: %v as part of network %s", desiredANPState.name, rule.name, rule.gressPrefix, rule.priority, err, networkName) + } + ops = append(ops, addrSetOps...) } - ops = append(ops, addrSetOps...) } return ops, nil } @@ -758,41 +928,56 @@ func (c *Controller) constructOpsForRuleChanges(desiredANPState *adminNetworkPol // for updating NBDB AddressSet objects for those peers // This should be called if namespace/pod is being created/updated func (c *Controller) constructOpsForPeerChanges(desiredRules, currentRules []*gressRule, - anpName string, isBanp bool) ([]ovsdb.Operation, error) { + anpName string, isBanp bool, managedNetworks sets.Set[string]) ([]ovsdb.Operation, error) { var ops []ovsdb.Operation for i := range desiredRules { desiredRule := desiredRules[i] currentRule := currentRules[i] - addressesToAdd := desiredRule.peerAddresses.Difference(currentRule.peerAddresses) - asIndex := GetANPPeerAddrSetDbIDs(anpName, desiredRule.gressPrefix, fmt.Sprintf("%d", desiredRule.gressIndex), c.controllerName, isBanp) - if len(addressesToAdd) > 0 { - as, err := c.addressSetFactory.GetAddressSet(asIndex) - if err != nil { - return nil, fmt.Errorf("cannot ensure that addressSet %+v exists: err %v", asIndex.GetExternalIDs(), err) + for networkName := range managedNetworks { + desiredPeerAddresses, ok := desiredRule.peerAddresses[networkName] + if !ok { + // empty peers for this network: so let's set the desired value to empty set for this network + desiredPeerAddresses = sets.Set[string]{} } - klog.V(5).Infof("Adding peerAddresses %+v to address-set %s for ANP %s", addressesToAdd, as.GetName(), anpName) - addrOps, err := as.AddAddressesReturnOps(addressesToAdd.UnsortedList()) - if err != nil { - return nil, fmt.Errorf("failed to construct address-set %s's IP add ops for anp %s's rule"+ - " %s/%s/%d: %v", as.GetName(), anpName, desiredRule.name, - desiredRule.gressPrefix, desiredRule.priority, err) + currentPeerAddresses, ok := currentRule.peerAddresses[networkName] + if !ok { + // network add event or empty network peers: so let's set the current value to empty set for this network + currentPeerAddresses = sets.Set[string]{} } - ops = append(ops, addrOps...) - } - addressesToRemove := currentRule.peerAddresses.Difference(desiredRule.peerAddresses) - if len(addressesToRemove) > 0 { - as, err := c.addressSetFactory.GetAddressSet(asIndex) - if err != nil { - return nil, fmt.Errorf("cannot ensure that addressSet %+v exists: err %v", asIndex.GetExternalIDs(), err) + addressesToAdd := desiredPeerAddresses.Difference(currentPeerAddresses) + asIndex := GetANPPeerAddrSetDbIDs(anpName, desiredRule.gressPrefix, + fmt.Sprintf("%d", desiredRule.gressIndex), networkName, isBanp) + if len(addressesToAdd) > 0 { + as, err := c.addressSetFactory.GetAddressSet(asIndex) + if err != nil { + return nil, fmt.Errorf("cannot ensure that addressSet %+v exists: err %v", asIndex.GetExternalIDs(), err) + } + klog.V(5).Infof("Adding peerAddresses %+v to address-set %s for ANP %s for network %s", + addressesToAdd, as.GetName(), anpName, networkName) + addrOps, err := as.AddAddressesReturnOps(addressesToAdd.UnsortedList()) + if err != nil { + return nil, fmt.Errorf("failed to construct address-set %s's IP add ops for anp %s's rule"+ + " %s/%s/%d: %v for network %s", as.GetName(), anpName, desiredRule.name, + desiredRule.gressPrefix, desiredRule.priority, err, networkName) + } + ops = append(ops, addrOps...) } - klog.V(5).Infof("Deleting peerAddresses %+v from address-set %s for ANP %s", addressesToRemove, as.GetName(), anpName) - addrOps, err := as.DeleteAddressesReturnOps(addressesToRemove.UnsortedList()) - if err != nil { - return nil, fmt.Errorf("failed to construct address-set %s's IP delete ops for anp %s's rule"+ - " %s/%s/%d: %v", as.GetName(), anpName, desiredRule.name, - desiredRule.gressPrefix, desiredRule.priority, err) + addressesToRemove := currentPeerAddresses.Difference(desiredPeerAddresses) + if len(addressesToRemove) > 0 { + as, err := c.addressSetFactory.GetAddressSet(asIndex) + if err != nil { + return nil, fmt.Errorf("cannot ensure that addressSet %+v exists: err %v", asIndex.GetExternalIDs(), err) + } + klog.V(5).Infof("Deleting peerAddresses %+v to address-set %s for ANP %s for network %s", + addressesToRemove, as.GetName(), anpName, networkName) + addrOps, err := as.DeleteAddressesReturnOps(addressesToRemove.UnsortedList()) + if err != nil { + return nil, fmt.Errorf("failed to construct address-set %s's IP delete ops for anp %s's rule"+ + " %s/%s/%d: %v", as.GetName(), anpName, desiredRule.name, + desiredRule.gressPrefix, desiredRule.priority, err) + } + ops = append(ops, addrOps...) } - ops = append(ops, addrOps...) } } return ops, nil @@ -800,23 +985,40 @@ func (c *Controller) constructOpsForPeerChanges(desiredRules, currentRules []*gr // constructOpsForSubjectChanges takes the current and desired cache states for a given ANP and returns the ops // required to construct the transact to insert/delete ports to/from port-groups according to the ANP subject changes -func (c *Controller) constructOpsForSubjectChanges(currentANPState, desiredANPState *adminNetworkPolicyState, portGroupName string) ([]ovsdb.Operation, error) { +func (c *Controller) constructOpsForSubjectChanges(currentANPState, desiredANPState *adminNetworkPolicyState, isBanp bool) ([]ovsdb.Operation, error) { var ops []ovsdb.Operation var err error - portsToAdd := desiredANPState.subject.podPorts.Difference(currentANPState.subject.podPorts).UnsortedList() - portsToDelete := currentANPState.subject.podPorts.Difference(desiredANPState.subject.podPorts).UnsortedList() - if len(portsToAdd) > 0 { - klog.V(5).Infof("Adding ports %+v to port-group %s for ANP %s", portsToAdd, portGroupName, desiredANPState.name) - ops, err = libovsdbops.AddPortsToPortGroupOps(c.nbClient, ops, portGroupName, portsToAdd...) - if err != nil { - return nil, fmt.Errorf("failed to create Port-to-PG add ops for anp %s: %v", desiredANPState.name, err) + // loop through all networks to get the state change across all networks so that we can + // construct ops to update per network portgroup of this ANP's subject + // In the beginning of updateExistingANP we have already taken care of network add/delete + // cases for port groups. So here only updates need to be handled since + for networkName := range desiredANPState.managedNetworks { + desiredANPStatePodPorts, ok := desiredANPState.subject.podPorts[networkName] + if !ok { + // means it is a network with empty subjects + desiredANPStatePodPorts = make(sets.Set[string]) } - } - if len(portsToDelete) > 0 { - klog.V(5).Infof("Deleting ports %+v from port-group %s for ANP %s", portsToDelete, portGroupName, desiredANPState.name) - ops, err = libovsdbops.DeletePortsFromPortGroupOps(c.nbClient, ops, portGroupName, portsToDelete...) - if err != nil { - return nil, fmt.Errorf("failed to create Port-from-PG delete ops for anp %s: %v", desiredANPState.name, err) + currentANPStatePodPorts, ok := currentANPState.subject.podPorts[networkName] + if !ok { + // means it is a new network add OR a network that had empty subjects + currentANPStatePodPorts = make(sets.Set[string]) + } + portsToAdd := desiredANPStatePodPorts.Difference(currentANPStatePodPorts).UnsortedList() + portsToDelete := currentANPStatePodPorts.Difference(desiredANPStatePodPorts).UnsortedList() + portGroupName := c.getANPPortGroupName(desiredANPState.name, networkName, isBanp) + if len(portsToAdd) > 0 { + klog.V(5).Infof("Adding ports %+v to port-group %s for ANP %s", portsToAdd, portGroupName, desiredANPState.name) + ops, err = libovsdbops.AddPortsToPortGroupOps(c.nbClient, ops, portGroupName, portsToAdd...) + if err != nil { + return nil, fmt.Errorf("failed to create Port-to-PG add ops for anp %s: %v", desiredANPState.name, err) + } + } + if len(portsToDelete) > 0 { + klog.V(5).Infof("Deleting ports %+v from port-group %s for ANP %s", portsToDelete, portGroupName, desiredANPState.name) + ops, err = libovsdbops.DeletePortsFromPortGroupOps(c.nbClient, ops, portGroupName, portsToDelete...) + if err != nil { + return nil, fmt.Errorf("failed to create Port-from-PG delete ops for anp %s: %v", desiredANPState.name, err) + } } } return ops, nil diff --git a/go-controller/pkg/ovn/controller/admin_network_policy/baseline_admin_network_policy.go b/go-controller/pkg/ovn/controller/admin_network_policy/baseline_admin_network_policy.go index 7337084d42..a76e285aef 100644 --- a/go-controller/pkg/ovn/controller/admin_network_policy/baseline_admin_network_policy.go +++ b/go-controller/pkg/ovn/controller/admin_network_policy/baseline_admin_network_policy.go @@ -7,6 +7,7 @@ import ( libovsdbops "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdb/ops" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" apierrors "k8s.io/apimachinery/pkg/api/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" @@ -93,15 +94,20 @@ func (c *Controller) clearBaselineAdminNetworkPolicy(banpName string) error { } // clear NBDB objects for the given BANP (PG, ACLs on that PG, AddrSets used by the ACLs) - // remove PG for Subject (ACLs will get cleaned up automatically) - portGroupName := c.getANPPortGroupName(banp.name, true) + // remove PG for Subject (ACLs will get cleaned up automatically) across all networks + predicateIDs := libovsdbops.NewDbObjectIDsAcrossAllContollers(libovsdbops.AddressSetBaselineAdminNetworkPolicy, + map[libovsdbops.ExternalIDKey]string{ + libovsdbops.ObjectNameKey: banp.name, + }) + pgPredicate := libovsdbops.GetPredicateAcrossAllControllers[*nbdb.PortGroup](predicateIDs, nil) // no need to batch this with address-set deletes since this itself will contain a bunch of ACLs that need to be deleted which is heavy enough. - err := libovsdbops.DeletePortGroups(c.nbClient, portGroupName) + err := libovsdbops.DeletePortGroupsWithPredicate(c.nbClient, pgPredicate) if err != nil { - return fmt.Errorf("unable to delete PG %s for BANP %s: %w", portGroupName, banp.name, err) + return fmt.Errorf("unable to delete PGs for BANP %s: %w", banpName, err) } - // remove address-sets that were created for the peers of each rule fpr the whole ANP + // remove address-sets that were created for the peers of each rule fpr the whole BANP // do this after ACLs are gone so that there is no lingering references + // do this across all networks for this BANP err = c.clearASForPeers(banp.name, libovsdbops.AddressSetBaselineAdminNetworkPolicy) if err != nil { return fmt.Errorf("failed to delete address-sets for BANP %s: %w", banp.name, err) @@ -124,10 +130,9 @@ func (c *Controller) ensureBaselineAdminNetworkPolicy(banp *anpapi.BaselineAdmin // fetch the banpState from our cache currentBANPState := c.banpCache // Based on the latest kapi BANP, namespace and pod objects: - // 1) Construct Port Group name using ANP name - // 2) Construct Address-sets with IPs of the peers in the rules - // 3) Construct ACLs using AS-es and PGs - portGroupName := c.getANPPortGroupName(desiredBANPState.name, true) + // 1) Construct Address-sets with IPs of the peers in the rules + // 2) Construct ACLs using AS-es and PGs + // acrss all networks in the cluster desiredPorts, err := c.convertANPSubjectToLSPs(desiredBANPState) if err != nil { return fmt.Errorf("unable to fetch ports for banp %s: %v", desiredBANPState.name, err) @@ -137,7 +142,7 @@ func (c *Controller) ensureBaselineAdminNetworkPolicy(banp *anpapi.BaselineAdmin return fmt.Errorf("unable to convert peers to addresses for banp %s: %v", desiredBANPState.name, err) } atLeastOneRuleUpdated := false - desiredACLs := c.convertANPRulesToACLs(desiredBANPState, currentBANPState, portGroupName, &atLeastOneRuleUpdated, true) + desiredACLs := c.convertANPRulesToACLs(desiredBANPState, currentBANPState, &atLeastOneRuleUpdated, true) // Comparing names for figuring out if cache is populated or not is safe // because the singleton BANP will always be called "default" in any cluster diff --git a/go-controller/pkg/ovn/controller/admin_network_policy/types.go b/go-controller/pkg/ovn/controller/admin_network_policy/types.go index 2b1391180f..3870ae694a 100644 --- a/go-controller/pkg/ovn/controller/admin_network_policy/types.go +++ b/go-controller/pkg/ovn/controller/admin_network_policy/types.go @@ -39,7 +39,8 @@ type adminNetworkPolicySubject struct { // transact ops calculation. If not, for every pod/namespace update // we would need to do a lookup in the libovsdb cache for the ns_name // LSP index. - podPorts sets.Set[string] + // {K: networkName; V: {set of podLSPs matching the provided podSelector}} + podPorts map[string]sets.Set[string] } type adminNetworkPolicyPeer struct { @@ -52,7 +53,8 @@ type adminNetworkPolicyPeer struct { // {K: namespace name; V: {set of pods matching the provided podSelector}} namespaces map[string]sets.Set[string] // set of nodes matching the provided nodeSelector - nodes sets.Set[string] + nodes sets.Set[string] + networks sets.Set[string] } type gressRule struct { @@ -68,7 +70,9 @@ type gressRule struct { peers []*adminNetworkPolicyPeer ports []*libovsdbutil.NetworkPolicyPort // all the peerAddresses of the peer entities (podIPs, nodeIPs, CIDR ranges) selected by this ANP Rule - peerAddresses sets.Set[string] + // for all networks + // {K: networkName; V: {set of IPs}} + peerAddresses map[string]sets.Set[string] // saves NamedPort representation; // key is the name of the Port // value is an array of possible representations of this port (relevance wrt to rule, peers) @@ -94,17 +98,21 @@ type adminNetworkPolicyState struct { // aclLoggingParams stores the log levels for the ACLs created for this ANP // this is based off the "k8s.ovn.org/acl-logging" annotation set on the ANP's aclLoggingParams *libovsdbutil.ACLLoggingLevels + // set of networkNames selected either as part of ANP's + // subject OR peers in any one of the rules + managedNetworks sets.Set[string] } // newAdminNetworkPolicyState takes the provided ANP API object and creates a new corresponding // adminNetworkPolicyState cache object for that API object. func newAdminNetworkPolicyState(raw *anpapi.AdminNetworkPolicy) (*adminNetworkPolicyState, error) { anp := &adminNetworkPolicyState{ - name: raw.Name, - anpPriority: raw.Spec.Priority, - ovnPriority: (ANPFlowStartPriority - raw.Spec.Priority*ANPMaxRulesPerObject), - ingressRules: make([]*gressRule, 0), - egressRules: make([]*gressRule, 0), + name: raw.Name, + anpPriority: raw.Spec.Priority, + ovnPriority: (ANPFlowStartPriority - raw.Spec.Priority*ANPMaxRulesPerObject), + ingressRules: make([]*gressRule, 0), + egressRules: make([]*gressRule, 0), + managedNetworks: make(sets.Set[string]), } var err error anp.subject, err = newAdminNetworkPolicySubject(raw.Spec.Subject) @@ -271,6 +279,14 @@ func newAdminNetworkPolicyEgressPeer(raw anpapi.AdminNetworkPolicyEgressPeer) (* namespaceSelector: labels.Nothing(), // doesn't match any namespaces podSelector: labels.Nothing(), // doesn't match any pods nodeSelector: labels.Nothing(), // doesn't match any nodes + networks: make(sets.Set[string]), + } + for _, cidr := range raw.Networks { + _, ipNet, err := net.ParseCIDR(string(cidr)) + if err != nil { + return nil, err + } + anpPeer.networks.Insert(ipNet.String()) } } return anpPeer, nil @@ -280,15 +296,14 @@ func newAdminNetworkPolicyEgressPeer(raw anpapi.AdminNetworkPolicyEgressPeer) (* // gressRule cache object for that Rule. func newAdminNetworkPolicyIngressRule(raw anpapi.AdminNetworkPolicyIngressRule, index, priority int32) (*gressRule, error) { anpRule := &gressRule{ - name: raw.Name, - priority: priority, - gressIndex: index, - action: GetACLActionForANPRule(raw.Action), - gressPrefix: string(libovsdbutil.ACLIngress), - peers: make([]*adminNetworkPolicyPeer, 0), - ports: make([]*libovsdbutil.NetworkPolicyPort, 0), - namedPorts: make(map[string][]libovsdbutil.NamedNetworkPolicyPort, 0), - peerAddresses: sets.New[string](), + name: raw.Name, + priority: priority, + gressIndex: index, + action: GetACLActionForANPRule(raw.Action), + gressPrefix: string(libovsdbutil.ACLIngress), + peers: make([]*adminNetworkPolicyPeer, 0), + ports: make([]*libovsdbutil.NetworkPolicyPort, 0), + namedPorts: make(map[string][]libovsdbutil.NamedNetworkPolicyPort, 0), } for _, peer := range raw.From { anpPeer, err := newAdminNetworkPolicyIngressPeer(peer) @@ -315,15 +330,14 @@ func newAdminNetworkPolicyIngressRule(raw anpapi.AdminNetworkPolicyIngressRule, // gressRule cache object for that Rule. func newAdminNetworkPolicyEgressRule(raw anpapi.AdminNetworkPolicyEgressRule, index, priority int32) (*gressRule, error) { anpRule := &gressRule{ - name: raw.Name, - priority: priority, - gressIndex: index, - action: GetACLActionForANPRule(raw.Action), - gressPrefix: string(libovsdbutil.ACLEgress), - peers: make([]*adminNetworkPolicyPeer, 0), - ports: make([]*libovsdbutil.NetworkPolicyPort, 0), - namedPorts: make(map[string][]libovsdbutil.NamedNetworkPolicyPort, 0), - peerAddresses: sets.New[string](), + name: raw.Name, + priority: priority, + gressIndex: index, + action: GetACLActionForANPRule(raw.Action), + gressPrefix: string(libovsdbutil.ACLEgress), + peers: make([]*adminNetworkPolicyPeer, 0), + ports: make([]*libovsdbutil.NetworkPolicyPort, 0), + namedPorts: make(map[string][]libovsdbutil.NamedNetworkPolicyPort, 0), } for _, peer := range raw.To { anpPeer, err := newAdminNetworkPolicyEgressPeer(peer) @@ -331,15 +345,6 @@ func newAdminNetworkPolicyEgressRule(raw anpapi.AdminNetworkPolicyEgressRule, in return nil, err } anpRule.peers = append(anpRule.peers, anpPeer) - if len(peer.Networks) > 0 { - for _, cidr := range peer.Networks { - _, ipNet, err := net.ParseCIDR(string(cidr)) - if err != nil { - return nil, err - } - anpRule.peerAddresses.Insert(ipNet.String()) - } - } } if raw.Ports != nil { for _, port := range *raw.Ports { @@ -358,11 +363,12 @@ func newAdminNetworkPolicyEgressRule(raw anpapi.AdminNetworkPolicyEgressRule, in // adminNetworkPolicyState cache object for that API object. func newBaselineAdminNetworkPolicyState(raw *anpapi.BaselineAdminNetworkPolicy) (*adminNetworkPolicyState, error) { banp := &adminNetworkPolicyState{ - name: raw.Name, - anpPriority: 0, // since BANP does not have priority, we hardcode to 0 - ovnPriority: BANPFlowPriority, - ingressRules: make([]*gressRule, 0), - egressRules: make([]*gressRule, 0), + name: raw.Name, + anpPriority: 0, // since BANP does not have priority, we hardcode to 0 + ovnPriority: BANPFlowPriority, + ingressRules: make([]*gressRule, 0), + egressRules: make([]*gressRule, 0), + managedNetworks: make(sets.Set[string]), } var err error banp.subject, err = newAdminNetworkPolicySubject(raw.Spec.Subject) @@ -402,15 +408,14 @@ func newBaselineAdminNetworkPolicyState(raw *anpapi.BaselineAdminNetworkPolicy) // gressRule cache object for that Rule. func newBaselineAdminNetworkPolicyIngressRule(raw anpapi.BaselineAdminNetworkPolicyIngressRule, index, priority int32) (*gressRule, error) { banpRule := &gressRule{ - name: raw.Name, - priority: priority, - gressIndex: index, - action: GetACLActionForBANPRule(raw.Action), - gressPrefix: string(libovsdbutil.ACLIngress), - peers: make([]*adminNetworkPolicyPeer, 0), - ports: make([]*libovsdbutil.NetworkPolicyPort, 0), - namedPorts: make(map[string][]libovsdbutil.NamedNetworkPolicyPort, 0), - peerAddresses: sets.New[string](), + name: raw.Name, + priority: priority, + gressIndex: index, + action: GetACLActionForBANPRule(raw.Action), + gressPrefix: string(libovsdbutil.ACLIngress), + peers: make([]*adminNetworkPolicyPeer, 0), + ports: make([]*libovsdbutil.NetworkPolicyPort, 0), + namedPorts: make(map[string][]libovsdbutil.NamedNetworkPolicyPort, 0), } for _, peer := range raw.From { anpPeer, err := newAdminNetworkPolicyIngressPeer(peer) @@ -437,15 +442,14 @@ func newBaselineAdminNetworkPolicyIngressRule(raw anpapi.BaselineAdminNetworkPol // gressRule cache object for that Rule. func newBaselineAdminNetworkPolicyEgressRule(raw anpapi.BaselineAdminNetworkPolicyEgressRule, index, priority int32) (*gressRule, error) { banpRule := &gressRule{ - name: raw.Name, - priority: priority, - gressIndex: index, - action: GetACLActionForBANPRule(raw.Action), - gressPrefix: string(libovsdbutil.ACLEgress), - peers: make([]*adminNetworkPolicyPeer, 0), - ports: make([]*libovsdbutil.NetworkPolicyPort, 0), - namedPorts: make(map[string][]libovsdbutil.NamedNetworkPolicyPort, 0), - peerAddresses: sets.New[string](), + name: raw.Name, + priority: priority, + gressIndex: index, + action: GetACLActionForBANPRule(raw.Action), + gressPrefix: string(libovsdbutil.ACLEgress), + peers: make([]*adminNetworkPolicyPeer, 0), + ports: make([]*libovsdbutil.NetworkPolicyPort, 0), + namedPorts: make(map[string][]libovsdbutil.NamedNetworkPolicyPort, 0), } for _, peer := range raw.To { banpPeer, err := newAdminNetworkPolicyEgressPeer(peer) @@ -453,15 +457,6 @@ func newBaselineAdminNetworkPolicyEgressRule(raw anpapi.BaselineAdminNetworkPoli return nil, err } banpRule.peers = append(banpRule.peers, banpPeer) - if len(peer.Networks) > 0 { - for _, cidr := range peer.Networks { - _, ipNet, err := net.ParseCIDR(string(cidr)) - if err != nil { - return nil, err - } - banpRule.peerAddresses.Insert(ipNet.String()) - } - } } if raw.Ports != nil { for _, port := range *raw.Ports {