Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions service/authorization/v2/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
otdf "github.com/opentdf/platform/sdk"
"github.com/opentdf/platform/service/internal/access/v2"
"github.com/opentdf/platform/service/logger"
"github.com/opentdf/platform/service/pkg/cache"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
Expand Down Expand Up @@ -78,20 +77,14 @@ func NewRegistration() *serviceregistry.Service[authzV2Connect.AuthorizationServ
return as, nil
}

cacheClient, err := srp.NewCacheClient(cache.Options{})
if err != nil || cacheClient == nil {
l.Error("failed to create platform cache client", slog.Any("error", err))
panic(fmt.Errorf("failed to create platform cache client: %w", err))
}

refreshInterval, err := time.ParseDuration(authZCfg.Cache.RefreshInterval)
if err != nil {
l.Error("failed to parse entitlement policy cache refresh interval", slog.Any("error", err))
panic(fmt.Errorf("failed to parse entitlement policy cache refresh interval [%s]: %w", authZCfg.Cache.RefreshInterval, err))
}

retriever := access.NewEntitlementPolicyRetriever(as.sdk)
as.cache, err = NewEntitlementPolicyCache(context.Background(), l, retriever, cacheClient, refreshInterval)
as.cache, err = NewEntitlementPolicyCache(context.Background(), l, retriever, refreshInterval)
if err != nil {
l.Error("failed to create entitlement policy cache", slog.Any("error", err))
panic(fmt.Errorf("failed to create entitlement policy cache: %w", err))
Expand Down
123 changes: 32 additions & 91 deletions service/authorization/v2/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,16 @@ package authorization
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"

"github.com/opentdf/platform/protocol/go/policy"
"github.com/opentdf/platform/service/internal/access/v2"
"github.com/opentdf/platform/service/logger"
"github.com/opentdf/platform/service/pkg/cache"
)

const (
attributesCacheKey = "attributes_cache_key"
subjectMappingsCacheKey = "subject_mappings_cache_key"
registeredResourcesCacheKey = "registered_resources_cache_key"
)

var (
// Cache tags for authorization-related data set in the cache
authzCacheTags = []string{"authorization", "policy", "entitlements"}

// stopTimeout is the maximum time to wait for the periodic refresh goroutine to stop
stopTimeout = 5 * time.Second

Expand All @@ -41,8 +31,9 @@ var (

// EntitlementPolicyCache caches attributes and subject mappings with periodic refresh
type EntitlementPolicyCache struct {
logger *logger.Logger
cacheClient *cache.Cache
logger *logger.Logger
policy access.EntitlementPolicy
mu sync.RWMutex

// SDK-connected retriever to fetch fresh data from policy services
retriever *access.EntitlementPolicyRetriever
Expand All @@ -56,21 +47,12 @@ type EntitlementPolicyCache struct {
isCacheFilled bool
}

// The EntitlementPolicy struct holds all the cached entitlement policy, as generics allow one
// data type per service cache instance.
type EntitlementPolicy struct {
Attributes []*policy.Attribute
SubjectMappings []*policy.SubjectMapping
RegisteredResources []*policy.RegisteredResource
}

// NewEntitlementPolicyCache holds a platform-provided cache client and manages a periodic refresh of
// cached entitlement policy data, fetching fresh data from the policy services at configured interval.
func NewEntitlementPolicyCache(
ctx context.Context,
l *logger.Logger,
retriever *access.EntitlementPolicyRetriever,
cacheClient *cache.Cache,
cacheRefreshInterval time.Duration,
) (*EntitlementPolicyCache, error) {
if cacheRefreshInterval == 0 {
Expand All @@ -82,7 +64,6 @@ func NewEntitlementPolicyCache(

instance := &EntitlementPolicyCache{
logger: l,
cacheClient: cacheClient,
retriever: retriever,
configuredRefreshInterval: cacheRefreshInterval,
stopRefresh: make(chan struct{}),
Expand Down Expand Up @@ -181,23 +162,12 @@ func (c *EntitlementPolicyCache) Refresh(ctx context.Context) error {
return err
}

// If there is an error when Setting with fresh data, mark not filled so IsReady() will re-attempt refresh
err = c.cacheClient.Set(ctx, attributesCacheKey, attributes, authzCacheTags)
if err != nil {
c.isCacheFilled = false
return errors.Join(ErrFailedToSet, err)
}

err = c.cacheClient.Set(ctx, subjectMappingsCacheKey, subjectMappings, authzCacheTags)
if err != nil {
c.isCacheFilled = false
return errors.Join(ErrFailedToSet, err)
}

err = c.cacheClient.Set(ctx, registeredResourcesCacheKey, registeredResources, authzCacheTags)
if err != nil {
c.isCacheFilled = false
return errors.Join(ErrFailedToSet, err)
c.mu.Lock()
defer c.mu.Unlock()
c.policy = access.EntitlementPolicy{
Attributes: attributes,
RegisteredResources: registeredResources,
SubjectMappings: subjectMappings,
}

c.logger.DebugContext(ctx,
Expand All @@ -214,71 +184,42 @@ func (c *EntitlementPolicyCache) Refresh(ctx context.Context) error {
}

// ListAllAttributes returns the cached attributes
func (c *EntitlementPolicyCache) ListAllAttributes(ctx context.Context) ([]*policy.Attribute, error) {
var (
attributes []*policy.Attribute
ok bool
)

cached, err := c.cacheClient.Get(ctx, attributesCacheKey)
if err != nil {
if errors.Is(err, cache.ErrCacheMiss) {
return attributes, nil
}
return nil, fmt.Errorf("%w, attributes: %w", ErrFailedToGet, err)
}
func (c *EntitlementPolicyCache) ListAllAttributes(_ context.Context) ([]*policy.Attribute, error) {
c.mu.RLock()
defer c.mu.RUnlock()

attributes, ok = cached.([]*policy.Attribute)
if !ok {
return nil, fmt.Errorf("%w: %T", ErrCachedTypeNotExpected, attributes)
}
var attributes []*policy.Attribute
attributes = c.policy.Attributes
return attributes, nil
}

// ListAllSubjectMappings returns the cached subject mappings
func (c *EntitlementPolicyCache) ListAllSubjectMappings(ctx context.Context) ([]*policy.SubjectMapping, error) {
var (
subjectMappings []*policy.SubjectMapping
ok bool
)

cached, err := c.cacheClient.Get(ctx, subjectMappingsCacheKey)
if err != nil {
if errors.Is(err, cache.ErrCacheMiss) {
return subjectMappings, nil
}
return nil, fmt.Errorf("%w, subject mappings: %w", ErrFailedToGet, err)
}
func (c *EntitlementPolicyCache) ListAllSubjectMappings(_ context.Context) ([]*policy.SubjectMapping, error) {
c.mu.RLock()
defer c.mu.RUnlock()

subjectMappings, ok = cached.([]*policy.SubjectMapping)
if !ok {
return nil, fmt.Errorf("%w: %T", ErrCachedTypeNotExpected, subjectMappings)
}
var subjectMappings []*policy.SubjectMapping
subjectMappings = c.policy.SubjectMappings
return subjectMappings, nil
}

// ListAllRegisteredResources returns the cached registered resources, or none in the event of a cache miss
func (c *EntitlementPolicyCache) ListAllRegisteredResources(ctx context.Context) ([]*policy.RegisteredResource, error) {
var (
registeredResources []*policy.RegisteredResource
ok bool
)
func (c *EntitlementPolicyCache) ListAllRegisteredResources(_ context.Context) ([]*policy.RegisteredResource, error) {
c.mu.RLock()
defer c.mu.RUnlock()

cached, err := c.cacheClient.Get(ctx, registeredResourcesCacheKey)
if err != nil {
if errors.Is(err, cache.ErrCacheMiss) {
return registeredResources, nil
}
return nil, fmt.Errorf("%w, registered resources: %w", ErrFailedToGet, err)
}

registeredResources, ok = cached.([]*policy.RegisteredResource)
if !ok {
return nil, fmt.Errorf("%w: %T", ErrCachedTypeNotExpected, registeredResources)
}
var registeredResources []*policy.RegisteredResource
registeredResources = c.policy.RegisteredResources
return registeredResources, nil
}

func (c *EntitlementPolicyCache) GetEntitlementPolicy(_ context.Context) (access.EntitlementPolicy, error) {
c.mu.RLock()
defer c.mu.RUnlock()

return c.policy, nil
}

// periodicRefresh refreshes the cache at the specified interval
func (c *EntitlementPolicyCache) periodicRefresh(ctx context.Context) {
waitTimeout := c.configuredRefreshInterval
Expand Down
33 changes: 13 additions & 20 deletions service/authorization/v2/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,19 @@ import (
"time"

"github.com/opentdf/platform/protocol/go/policy"
"github.com/opentdf/platform/service/internal/access/v2"
"github.com/opentdf/platform/service/logger"
"github.com/opentdf/platform/service/pkg/cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
mockCacheExpiry = 5 * time.Minute
l = logger.CreateTestLogger()
)
var l = logger.CreateTestLogger()

func Test_NewEntitlementPolicyCache(t *testing.T) {
ctx := t.Context()
refreshInterval := 10 * time.Second
mockCache, _ := cache.TestCacheClient(mockCacheExpiry)

c, err := NewEntitlementPolicyCache(ctx, l, nil, mockCache, refreshInterval)
c, err := NewEntitlementPolicyCache(ctx, l, nil, refreshInterval)
require.NoError(t, err)
assert.NotNil(t, c)
assert.Equal(t, refreshInterval, c.configuredRefreshInterval)
Expand All @@ -31,13 +27,12 @@ func Test_NewEntitlementPolicyCache(t *testing.T) {
func Test_EntitlementPolicyCache_RefreshInterval(t *testing.T) {
var refreshInterval time.Duration
ctx := t.Context()
mockCache, _ := cache.TestCacheClient(mockCacheExpiry)

_, err := NewEntitlementPolicyCache(ctx, l, nil, mockCache, refreshInterval)
_, err := NewEntitlementPolicyCache(ctx, l, nil, refreshInterval)
require.ErrorIs(t, err, ErrCacheDisabled)

refreshInterval = 10 * time.Second
c, err := NewEntitlementPolicyCache(ctx, l, nil, mockCache, refreshInterval)
c, err := NewEntitlementPolicyCache(ctx, l, nil, refreshInterval)
require.NoError(t, err)
assert.NotNil(t, c)
}
Expand All @@ -48,12 +43,11 @@ func Test_EntitlementPolicyCache_Enabled(t *testing.T) {
err error
ctx = t.Context()
refreshInterval = 10 * time.Second
mockCache, _ = cache.TestCacheClient(mockCacheExpiry)
)
assert.False(t, c.IsEnabled())
assert.False(t, c.IsReady(ctx))

c, err = NewEntitlementPolicyCache(ctx, l, nil, mockCache, refreshInterval)
c, err = NewEntitlementPolicyCache(ctx, l, nil, refreshInterval)
require.NoError(t, err)
assert.NotNil(t, c)
assert.True(t, c.IsEnabled())
Expand All @@ -63,9 +57,8 @@ func Test_EntitlementPolicyCache_Enabled(t *testing.T) {

func Test_EntitlementPolicyCache_CacheMiss(t *testing.T) {
ctx := t.Context()
mockCache, _ := cache.TestCacheClient(mockCacheExpiry)

c, err := NewEntitlementPolicyCache(ctx, l, nil, mockCache, 1*time.Hour)
c, err := NewEntitlementPolicyCache(ctx, l, nil, 1*time.Hour)
require.NoError(t, err)

// No errors, but empty lists on cache misses
Expand All @@ -84,17 +77,17 @@ func Test_EntitlementPolicyCache_CacheMiss(t *testing.T) {

func Test_EntitlementPolicyCache_CacheHits(t *testing.T) {
ctx := t.Context()
mockCache, _ := cache.TestCacheClient(mockCacheExpiry)

attrsList := []*policy.Attribute{{Name: "attr1"}}
subjMappingsList := []*policy.SubjectMapping{{Id: "id-123"}}
resourcesList := []*policy.RegisteredResource{{Name: "res1"}}
_ = mockCache.Set(ctx, attributesCacheKey, attrsList, nil)
_ = mockCache.Set(ctx, subjectMappingsCacheKey, subjMappingsList, nil)
_ = mockCache.Set(ctx, registeredResourcesCacheKey, resourcesList, nil)

c, err := NewEntitlementPolicyCache(ctx, l, nil, mockCache, 1*time.Hour)
c, err := NewEntitlementPolicyCache(ctx, l, nil, 1*time.Hour)
require.NoError(t, err)
c.policy = access.EntitlementPolicy{
Attributes: attrsList,
SubjectMappings: subjMappingsList,
RegisteredResources: resourcesList,
}

// Allow for some concurrency overhead in cache library to prevent flakiness in tests
time.Sleep(10 * time.Millisecond)
Expand Down
16 changes: 5 additions & 11 deletions service/internal/access/v2/just_in_time_pdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,13 @@ func NewJustInTimePDP(
l.DebugContext(ctx, "no EntitlementPolicyStore provided or not yet ready, will retrieve directly from policy services")
store = NewEntitlementPolicyRetriever(sdk)
}

allAttributes, err := store.ListAllAttributes(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list cached attributes: %w", err)
}
allSubjectMappings, err := store.ListAllSubjectMappings(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list cached subject mappings: %w", err)
}
allRegisteredResources, err := store.ListAllRegisteredResources(ctx)
entitlementPolicy, err := store.GetEntitlementPolicy(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch all registered resources: %w", err)
return nil, fmt.Errorf("failed to get entitlement policy from store: %w", err)
}
allAttributes := entitlementPolicy.Attributes
allSubjectMappings := entitlementPolicy.SubjectMappings
allRegisteredResources := entitlementPolicy.RegisteredResources

pdp, err := NewPolicyDecisionPoint(ctx, l, allAttributes, allSubjectMappings, allRegisteredResources)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions service/internal/access/v2/policy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@ type EntitlementPolicyStore interface {
ListAllAttributes(ctx context.Context) ([]*policy.Attribute, error)
ListAllSubjectMappings(ctx context.Context) ([]*policy.SubjectMapping, error)
ListAllRegisteredResources(ctx context.Context) ([]*policy.RegisteredResource, error)
GetEntitlementPolicy(ctx context.Context) (EntitlementPolicy, error)
IsEnabled() bool
IsReady(context.Context) bool
}

// The EntitlementPolicy struct holds all the cached entitlement policy, as generics allow one
// data type per service cache instance.
type EntitlementPolicy struct {
Attributes []*policy.Attribute
SubjectMappings []*policy.SubjectMapping
RegisteredResources []*policy.RegisteredResource
}

var (
ErrFailedToFetchAttributes = errors.New("failed to fetch attributes from policy service")
ErrFailedToFetchSubjectMappings = errors.New("failed to fetch subject mappings from policy service")
Expand Down Expand Up @@ -126,3 +135,25 @@ func (p *EntitlementPolicyRetriever) ListAllRegisteredResources(ctx context.Cont

return rrList, nil
}

func (p *EntitlementPolicyRetriever) GetEntitlementPolicy(ctx context.Context) (EntitlementPolicy, error) {
var ep EntitlementPolicy
var err error

ep.Attributes, err = p.ListAllAttributes(ctx)
if err != nil {
return EntitlementPolicy{}, err
}

ep.SubjectMappings, err = p.ListAllSubjectMappings(ctx)
if err != nil {
return EntitlementPolicy{}, err
}

ep.RegisteredResources, err = p.ListAllRegisteredResources(ctx)
if err != nil {
return EntitlementPolicy{}, err
}

return ep, nil
}
4 changes: 4 additions & 0 deletions service/pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func (c *Cache) Delete(ctx context.Context, key string) error {
return c.manager.cache.Delete(ctx, c.getKey(key))
}

func (c *Cache) Wait() {
c.manager.underlyingStore.Wait()
}

func (c *Cache) getKey(key string) string {
return c.serviceName + ":" + key
}
Expand Down
Loading