Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion magnum/api/controllers/v1/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,26 @@ def __init__(self, **kwargs):
def convert_with_links(rpc_clusters, limit, url=None, expand=False,
**kwargs):
collection = ClusterCollection()
# Pre-fetch all unique ClusterTemplates needed for this page in one
# batch to avoid N separate ClusterTemplate.get_by_uuid() RPC calls
# (one per cluster). Clusters in a project commonly share the same
# template, so this usually collapses to a single RPC regardless of
# page size.
template_cache = {}
for rpc_cluster in rpc_clusters:
tid = rpc_cluster.cluster_template_id
if tid and tid not in template_cache:
template_cache[tid] = objects.ClusterTemplate.get_by_uuid(
pecan.request.context, tid)
# Inject the pre-fetched template so obj_load_attr is never triggered
# during Cluster.convert_with_links below.
for rpc_cluster in rpc_clusters:
tid = rpc_cluster.cluster_template_id
if tid and tid in template_cache:
rpc_cluster.cluster_template = template_cache[tid]
collection.clusters = [Cluster.convert_with_links(p, expand)
for p in rpc_clusters]
collection.next = collection.get_next(limit, url=url, **kwargs)
collection.clusters = [Cluster.convert_with_links(p, expand)
for p in rpc_clusters]
collection.next = collection.get_next(limit, url=url, **kwargs)
Expand Down Expand Up @@ -442,7 +462,9 @@ def get_one(self, cluster_ident):
context.all_tenants = True

cluster = api_utils.get_resource('Cluster', cluster_ident)
policy.enforce(context, 'cluster:get', cluster.as_dict(),
# Compute as_dict() once and reuse it for policy enforcement.
cluster_dict = cluster.as_dict()
policy.enforce(context, 'cluster:get', cluster_dict,
action='cluster:get')

api_cluster = Cluster.convert_with_links(cluster)
Expand Down
31 changes: 27 additions & 4 deletions magnum/common/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@
LOG = logging.getLogger(__name__)
_ENFORCER = None
CONF = cfg.CONF
_TRUSTEE_DOMAIN_ID_CACHE = None


def _reset_trustee_domain_id_cache():
"""Reset the trustee_domain_id Keystone lookup cache.

Intended for use in test teardown only. In production the value is
either read from CONF (free) or fetched once and cached for the process
lifetime.
"""
global _TRUSTEE_DOMAIN_ID_CACHE
_TRUSTEE_DOMAIN_ID_CACHE = None


# we can get a policy enforcer by this init.
Expand Down Expand Up @@ -112,10 +124,21 @@ def enforce(context, rule=None, target=None,

def add_policy_attributes(target):
"""Adds extra information for policy enforcement to raw target object"""
context = importutils.import_module('magnum.common.context')
admin_context = context.make_admin_context()
admin_osc = clients.OpenStackClients(admin_context)
trustee_domain_id = admin_osc.keystone().trustee_domain_id
global _TRUSTEE_DOMAIN_ID_CACHE

# When trustee_domain_id is set - we don't need to do any operations
trustee_domain_id = CONF.trust.trustee_domain_id
if not trustee_domain_id:
# Fallback for deployments that rely on auto-discovery via Keystone.
# Cache the result for the process lifetime so the call happens
# at most once.
if _TRUSTEE_DOMAIN_ID_CACHE is None:
ctx = importutils.import_module('magnum.common.context')
admin_context = ctx.make_admin_context()
admin_osc = clients.OpenStackClients(admin_context)
_TRUSTEE_DOMAIN_ID_CACHE = admin_osc.keystone().trustee_domain_id
trustee_domain_id = _TRUSTEE_DOMAIN_ID_CACHE

target['trustee_domain_id'] = trustee_domain_id
return target

Expand Down
66 changes: 58 additions & 8 deletions magnum/common/rpc_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,74 @@ def create(cls, topic, server, handlers, binary):
return service_obj


# Share a single RPCClient per unique (topic, server, timeout) tuple
# for the lifetime of the worker process. The per-request context is injected
# via RPCClient.prepare(), which returns a lightweight _CallContext that reuses
# the same underlying transport connections.
_RPC_CLIENT_CACHE = {}
_RPC_CLIENT_CACHE_LOCK = None # initialised lazily (post-fork safe)


def _get_cached_client(topic, server, timeout):
"""Return a process-level cached RPCClient for the given target parameters.

The client is created once per (topic, server, timeout) combination and
reused across all requests. This keeps the RabbitMQ connection pool warm
and avoids per-request TCP connect/disconnect cycles.
"""
import threading
global _RPC_CLIENT_CACHE_LOCK
if _RPC_CLIENT_CACHE_LOCK is None:
# Initialise lazily so the lock is created in the worker process after
# werkzeug has forked, avoiding any shared-lock issues.
_RPC_CLIENT_CACHE_LOCK = threading.Lock()

key = (topic, server, timeout)
client = _RPC_CLIENT_CACHE.get(key)
if client is not None:
return client

with _RPC_CLIENT_CACHE_LOCK:
client = _RPC_CLIENT_CACHE.get(key)
if client is None:
target = messaging.Target(topic=topic, server=server)
client = rpc.get_client(
target,
serializer=objects_base.MagnumObjectSerializer(),
timeout=timeout,
)
_RPC_CLIENT_CACHE[key] = client
return client


class API(object):
def __init__(self, context=None, topic=None, server=None,
timeout=None):
self._context = context
if topic is None:
topic = ''
target = messaging.Target(topic=topic, server=server)
self._client = rpc.get_client(
target,
serializer=objects_base.MagnumObjectSerializer(),
timeout=timeout
)
self._topic = topic
self._server = server
self._timeout = timeout
# Fetch (or create) the shared client; do NOT store as a per-instance
# attribute called _client on the class so that subclasses that access
# self._client directly (conductor_api.API OVO indirection methods)
# still work – they use the shared client via the property below.
self.__shared_client = _get_cached_client(topic, server, timeout)

@property
def _client(self):
"""Shared RPCClient; context is injected per-call via prepare()."""
return self.__shared_client

def _call(self, method, *args, **kwargs):
return self._client.call(self._context, method, *args, **kwargs)
# prepare() binds the context without touching transport connections.
return self._client.prepare().call(
self._context, method, *args, **kwargs)

def _cast(self, method, *args, **kwargs):
self._client.cast(self._context, method, *args, **kwargs)
self._client.prepare().cast(
self._context, method, *args, **kwargs)

def echo(self, message):
self._cast('echo', message=message)
16 changes: 11 additions & 5 deletions magnum/db/sqlalchemy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,27 @@ def _add_tenant_filters(self, context, query):
if context.is_admin and context.all_tenants:
return query

admin_context = request_context.make_admin_context(all_tenants=True)
osc = clients.OpenStackClients(admin_context)
kst = osc.keystone()
# Read the trustee domain ID directly from configuration rather than
# authenticating to Keystone on every DB query. The value is
# operator-configured (CONF.trust.trustee_domain_id) and is stable for
# the lifetime of the service.
trustee_domain_id = CONF.trust.trustee_domain_id

# User in a regular project (not in the trustee domain)
if (
context.project_id
and context.user_domain_id != kst.trustee_domain_id
and context.user_domain_id != trustee_domain_id
):
query = query.filter_by(project_id=context.project_id)
# Match project ID component in trustee user's user name against
# cluster's project_id to associate per-cluster trustee users who have
# no project information with the project their clusters/cluster models
# reside in. This is equivalent to the project filtering above.
elif context.user_domain_id == kst.trustee_domain_id:
elif context.user_domain_id == trustee_domain_id:
admin_context = request_context.make_admin_context(
all_tenants=True)
osc = clients.OpenStackClients(admin_context)
kst = osc.keystone()
user_name = kst.client.users.get(context.user_id).name
user_project = user_name.split('_', 2)[1]
query = query.filter_by(project_id=user_project)
Expand Down
56 changes: 47 additions & 9 deletions magnum/objects/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,40 @@ def _from_db_object(cluster, db_cluster):
cluster.obj_reset_changes()
return cluster

# Call-scoped nodegroups cache: None means inactive (no as_dict() in
# progress). Set to a list by as_dict() before computing derived fields
# and cleared afterwards. Outside of as_dict() _get_nodegroups() always
# goes directly to the DB, so nodegroup mutations (create/delete) in
# conductor code and tests are always visible.
_nodegroups_cache = None

def _get_nodegroups(self):
"""Fetch nodegroups, using a call-scoped cache when active.

as_dict() accesses four derived properties (node_count, master_count,
node_addresses, master_addresses) that each call self.nodegroups.
Without caching, a single as_dict() triggers four identical
NodeGroup.list() RPC calls.

Rather than caching for the object lifetime (which would return stale
data if nodegroups are created/deleted after first access, as happens
in conductor and test code), the cache is scoped to a single
as_dict() call: as_dict() populates _nodegroups_cache before
computing derived fields and clears it on exit. At all other times
_nodegroups_cache is None and this method goes directly to the DB.
"""
if self._nodegroups_cache is not None:
return self._nodegroups_cache
return NodeGroup.list(self._context, self.uuid)

def _invalidate_nodegroups_cache(self):
"""Clear the call-scoped nodegroups cache."""
self._nodegroups_cache = None

@property
def nodegroups(self):
# Returns all nodegroups that belong to the cluster.
return NodeGroup.list(self._context, self.uuid)
return self._get_nodegroups()

@property
def default_ng_worker(self):
Expand Down Expand Up @@ -342,12 +372,20 @@ def obj_load_attr(self, attrname):

def as_dict(self):
dict_ = super(Cluster, self).as_dict()
# Update the dict with the attributes coming form
# the cluster's nodegroups.
dict_.update({
'node_count': self.node_count,
'master_count': self.master_count,
'node_addresses': self.node_addresses,
'master_addresses': self.master_addresses
})
# Populate the call-scoped nodegroups cache so that the four derived
# properties below (node_count, master_count, node_addresses,
# master_addresses) all share a single NodeGroup.list() fetch instead
# of each issuing their own RPC call. The cache is cleared on exit
# so that any subsequent access (e.g. from conductor code that has
# mutated nodegroups) sees a fresh DB result.
self._nodegroups_cache = NodeGroup.list(self._context, self.uuid)
try:
dict_.update({
'node_count': self.node_count,
'master_count': self.master_count,
'node_addresses': self.node_addresses,
'master_addresses': self.master_addresses
})
finally:
self._nodegroups_cache = None
return dict_
7 changes: 7 additions & 0 deletions magnum/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from magnum.common import context as magnum_context
from magnum.common import keystone as magnum_keystone
from magnum.common import policy as magnum_policy
from magnum.objects import base as objects_base
from magnum.tests import conf_fixture
from magnum.tests import fake_notifier
Expand Down Expand Up @@ -119,6 +120,12 @@ def make_context(*args, **kwargs):
self.mock_make_trustee_domain_id = q.start()
self.addCleanup(q.stop)

# Reset the module-level trustee_domain_id cache in policy.py so that
# it is re-resolved on the first policy.enforce() call of the next
# test. Without this, a test that exercises the Keystone-discovery
# path could leave a stale value that bypasses mocks in later tests.
self.addCleanup(magnum_policy._reset_trustee_domain_id_cache)

self.useFixture(conf_fixture.ConfFixture())
self.useFixture(fixtures.NestedTempfile())

Expand Down
8 changes: 8 additions & 0 deletions magnum/tests/conf_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,13 @@ def _setUp(self):
CONF.set_default('host', 'fake-mini')
CONF.set_default('connection', "sqlite://", group='database')
CONF.set_default('sqlite_synchronous', False, group='database')
# Set a fixed trustee_domain_id so that policy.add_policy_attributes()
# can read it directly from config without making any Keystone call.
# This matches the value used by the global trustee_domain_id mock in
# tests/base.py and avoids 'An auth plugin is required' errors in
# no-auth test configurations.
CONF.set_default('trustee_domain_id',
'12345678-9012-3456-7890-123456789abc',
group='trust')
config.parse_args([], default_config_files=[])
self.addCleanup(CONF.reset)
Loading