Skip to content

[FME-4230] Events - Updated Segments #689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: FME-4229-events-killedFlag
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Split/Events/SplitEventsManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class DefaultSplitEventsManager: SplitEventsManager {
}
}

// MARK: Notifiers
func notifyInternalEvent(_ event: SplitInternalEvent, metadata: EventMetadata? = nil) {
let event = SplitInternalEventWithMetadata(event, metadata: metadata)

Expand All @@ -64,7 +65,8 @@ class DefaultSplitEventsManager: SplitEventsManager {
func notifyInternalEvent(_ event: SplitInternalEvent) {
notifyInternalEvent(event, metadata: nil)
}


// MARK: Registers
func register(event: SplitEventWithMetadata, task: SplitEventActionTask) {
let eventName = event.type.toString()
processQueue.async { [weak self] in
Expand All @@ -83,6 +85,7 @@ class DefaultSplitEventsManager: SplitEventsManager {
register(event: SplitEventWithMetadata(type: event, metadata: nil), task: task)
}

// MARK: Flow
func start() {
dataAccessQueue.sync {
if self.isStarted {
Expand Down
4 changes: 3 additions & 1 deletion Split/Localhost/LocalhostSynchronizer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ class LocalhostSynchronizer: FeatureFlagsSynchronizer {
// Update will remove all records before insert new ones
_ = self.featureFlagsStorage.update(splitChange: change)

self.eventsManager.notifyInternalEvent(.splitsUpdated, metadata: EventMetadata(type: .FLAGS_UPDATED, data: values.map { $0.name ?? "" } ))
// Notify event
let metadata = EventMetadata(type: .FLAGS_UPDATED, data: values.map { $0.name ?? "" } )
self.eventsManager.notifyInternalEvent(.splitsUpdated, metadata: metadata)
}
}
}
15 changes: 8 additions & 7 deletions Split/Network/Streaming/SyncSegmentsUpdateWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class SegmentsUpdateWorker: UpdateWorker<MembershipsUpdateNotification> {
if segments.count > newSegments.count {
mySegmentsStorage.set(SegmentChange(segments: newSegments.asArray()),
forKey: key)
synchronizer.notifyUpdate(forKey: key)
synchronizer.notifyUpdate(forKey: key, EventMetadata(type: .SEGMENTS_UPDATED, data: newSegments.asArray() ))
telemetryProducer?.recordUpdatesFromSse(type: resource)
}
}
Expand All @@ -128,7 +128,7 @@ class SegmentsUpdateWorker: UpdateWorker<MembershipsUpdateNotification> {
if oldSegments.count < newSegments.count {
mySegmentsStorage.set(SegmentChange(segments: newSegments.asArray()),
forKey: userKey)
synchronizer.notifyUpdate(forKey: userKey)
synchronizer.notifyUpdate(forKey: userKey, EventMetadata(type: .SEGMENTS_UPDATED, data: newSegments.asArray() ))
telemetryProducer?.recordUpdatesFromSse(type: .mySegments)
}
return
Expand Down Expand Up @@ -171,7 +171,7 @@ class SegmentsUpdateWorker: UpdateWorker<MembershipsUpdateNotification> {

protocol SegmentsSynchronizerWrapper {
func fetch(byKey: String, changeNumbers: SegmentsChangeNumber, delay: Int64)
func notifyUpdate(forKey: String)
func notifyUpdate(forKey: String, _ metadata: EventMetadata?)
}

class MySegmentsSynchronizerWrapper: SegmentsSynchronizerWrapper {
Expand All @@ -185,12 +185,13 @@ class MySegmentsSynchronizerWrapper: SegmentsSynchronizerWrapper {
synchronizer.forceMySegmentsSync(forKey: key, changeNumbers: changeNumbers, delay: delay)
}

func notifyUpdate(forKey key: String) {
synchronizer.notifySegmentsUpdated(forKey: key)
func notifyUpdate(forKey key: String, _ metadata: EventMetadata? = nil) {
synchronizer.notifySegmentsUpdated(forKey: key, metadata: metadata)
}
}

class MyLargeSegmentsSynchronizerWrapper: SegmentsSynchronizerWrapper {

private let synchronizer: Synchronizer

init(synchronizer: Synchronizer) {
Expand All @@ -201,8 +202,8 @@ class MyLargeSegmentsSynchronizerWrapper: SegmentsSynchronizerWrapper {
synchronizer.forceMySegmentsSync(forKey: key, changeNumbers: changeNumbers, delay: delay)
}

func notifyUpdate(forKey key: String) {
synchronizer.notifyLargeSegmentsUpdated(forKey: key)
func notifyUpdate(forKey key: String, _ metadata: EventMetadata? = nil) {
synchronizer.notifyLargeSegmentsUpdated(forKey: key, metadata: metadata)
}
}

Expand Down
29 changes: 15 additions & 14 deletions Split/Network/Streaming/SyncUpdateWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,10 @@ class SplitsUpdateWorker: UpdateWorker<TargetingRuleUpdateNotification> {

let processedFlags = self.splitChangeProcessor.process(change)

if self.splitsStorage.update(splitChange: processedFlags) {
var updatedFlags: [String] = processedFlags.activeSplits.compactMap(\.name)
updatedFlags += processedFlags.archivedSplits.compactMap(\.name)
self.synchronizer.notifyFeatureFlagsUpdated(flagsList: updatedFlags)

if self.splitsStorage.update(splitChange: processedFlags) {
var updatedFlags: [String] = processedFlags.activeSplits.compactMap(\.name)
updatedFlags += processedFlags.archivedSplits.compactMap(\.name)
self.synchronizer.notifyFeatureFlagsUpdated(flagsList: updatedFlags)
}

self.telemetryProducer?.recordUpdatesFromSse(type: .splits)
Expand All @@ -166,25 +165,27 @@ class SplitsUpdateWorker: UpdateWorker<TargetingRuleUpdateNotification> {
previousChangeNumber: Int64,
changeNumber: Int64) -> Bool {
do {
let rbs = try self.ruleBasedSegmentsPayloadDecoder.decode(
let rbs = try ruleBasedSegmentsPayloadDecoder.decode(
payload: payload,
compressionUtil: self.decomProvider.decompressor(for: compressionType))
compressionUtil: decomProvider.decompressor(for: compressionType))

let change = RuleBasedSegmentChange(segments: [rbs],
since: previousChangeNumber,
till: changeNumber)

Logger.v("RBS update received: \(change)")

let processedChange = ruleBasedSegmentsChangeProcessor.process(change)
let processedSegments = ruleBasedSegmentsChangeProcessor.process(change)

if self.ruleBasedSegmentsStorage.update(toAdd: processedChange.toAdd,
toRemove: processedChange.toRemove,
changeNumber: processedChange.changeNumber) {
self.synchronizer.notifyFeatureFlagsUpdated(flagsList: []) //TODO: RBS Update
if ruleBasedSegmentsStorage.update(toAdd: processedSegments.toAdd,
toRemove: processedSegments.toRemove,
changeNumber: processedSegments.changeNumber) {
var updatedSegments: [String] = processedSegments.activeSegments.compactMap(\.name)
updatedSegments += processedSegments.archivedSegments.compactMap(\.name)
synchronizer.notifyFeatureFlagsUpdated(flagsList: []) //TODO: Make new notify segments updated (new notification method?)
}

self.telemetryProducer?.recordUpdatesFromSse(type: .splits)
telemetryProducer?.recordUpdatesFromSse(type: .splits)
return true
} catch {
Logger.e("Error decoding rule based segments payload from notification: \(error)")
Expand Down
12 changes: 6 additions & 6 deletions Split/Network/Sync/ByKeyFacade.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ protocol ByKeyRegistry {
protocol ByKeySynchronizer {
func loadMySegmentsFromCache(forKey: String)
func loadAttributesFromCache(forKey: String)
func notifyMySegmentsUpdated(forKey: String)
func notifyMyLargeSegmentsUpdated(forKey: String)
func notifyMySegmentsUpdated(forKey: String, metadata: EventMetadata?)
func notifyMyLargeSegmentsUpdated(forKey: String, metadata: EventMetadata?)
func startSync(forKey key: Key)
func startPeriodicSync()
func stopPeriodicSync()
Expand Down Expand Up @@ -120,15 +120,15 @@ class DefaultByKeyFacade: ByKeyFacade {
byKeyComponents.value(forKey: key)?.mySegmentsSynchronizer.synchronizeMySegments()
}

func notifyMySegmentsUpdated(forKey key: String) {
func notifyMySegmentsUpdated(forKey key: String, metadata: EventMetadata?) {
doInAll(forMatchingKey: key) { group in
group.eventsManager.notifyInternalEvent(.mySegmentsUpdated)
group.eventsManager.notifyInternalEvent(.mySegmentsUpdated, metadata: metadata)
}
}

func notifyMyLargeSegmentsUpdated(forKey key: String) {
func notifyMyLargeSegmentsUpdated(forKey key: String, metadata: EventMetadata? = nil) {
doInAll(forMatchingKey: key) { group in
group.eventsManager.notifyInternalEvent(.myLargeSegmentsUpdated)
group.eventsManager.notifyInternalEvent(.myLargeSegmentsUpdated, metadata: metadata)
}
}

Expand Down
12 changes: 6 additions & 6 deletions Split/Network/Sync/Synchronizer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ protocol Synchronizer: ImpressionLogger {
func stopRecordingTelemetry()
func pushEvent(event: EventDTO)
func notifyFeatureFlagsUpdated(flagsList: [String])
func notifySegmentsUpdated(forKey key: String)
func notifyLargeSegmentsUpdated(forKey key: String)
func notifySegmentsUpdated(forKey key: String, metadata: EventMetadata?)
func notifyLargeSegmentsUpdated(forKey key: String, metadata: EventMetadata?)
func notifySplitKilled(flag: String)
func pause()
func resume()
Expand Down Expand Up @@ -211,12 +211,12 @@ class DefaultSynchronizer: Synchronizer {
featureFlagsSynchronizer.notifyUpdated(flagsList: flagsList)
}

func notifySegmentsUpdated(forKey key: String) {
byKeySynchronizer.notifyMySegmentsUpdated(forKey: key)
func notifySegmentsUpdated(forKey key: String, metadata: EventMetadata? = nil) {
byKeySynchronizer.notifyMySegmentsUpdated(forKey: key, metadata: metadata)
}

func notifyLargeSegmentsUpdated(forKey key: String) {
byKeySynchronizer.notifyMyLargeSegmentsUpdated(forKey: key)
func notifyLargeSegmentsUpdated(forKey key: String, metadata: EventMetadata? = nil) {
byKeySynchronizer.notifyMyLargeSegmentsUpdated(forKey: key, metadata: metadata)
}

func notifySplitKilled(flag: String) {
Expand Down
8 changes: 6 additions & 2 deletions SplitTests/Fake/Service/ByKeyFacadeMock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,16 @@ class ByKeyFacadeMock: ByKeyFacade {
}

var notifyMySegmentsUpdatedCalled = false
func notifyMySegmentsUpdated(forKey key: String) {
var updatedSegmentsMetadataForKey = [String : EventMetadata?]()
func notifyMySegmentsUpdated(forKey key: String, metadata: EventMetadata? = nil) {
updatedSegmentsMetadataForKey[key] = metadata
notifyMySegmentsUpdatedCalled = true
}

var notifyMyLargeSegmentsUpdatedCalled = false
func notifyMyLargeSegmentsUpdated(forKey key: String) {
var updatedLargeSegmentsMetadataForKey = [String : EventMetadata?]()
func notifyMyLargeSegmentsUpdated(forKey key: String, metadata: EventMetadata? = nil) {
updatedLargeSegmentsMetadataForKey[key] = metadata
notifyMyLargeSegmentsUpdatedCalled = true
}

Expand Down
12 changes: 8 additions & 4 deletions SplitTests/Fake/Streaming/SynchronizerSpy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,18 @@ class SynchronizerSpy: Synchronizer {
splitSynchronizer.resume()
}

func notifySegmentsUpdated(forKey key: String) {
var updatedSegmentsMetadataForKey = [String : EventMetadata]()
func notifySegmentsUpdated(forKey key: String, metadata: EventMetadata? = nil) {
notifyMySegmentsUpdatedCalled = true
splitSynchronizer.notifySegmentsUpdated(forKey: key)
updatedSegmentsMetadataForKey[key] = metadata
splitSynchronizer.notifySegmentsUpdated(forKey: key, metadata: metadata)
}

func notifyLargeSegmentsUpdated(forKey key: String) {
var updatedLargeSegmentsMetadataForKey = [String : EventMetadata]()
func notifyLargeSegmentsUpdated(forKey key: String, metadata: EventMetadata? = nil) {
updatedLargeSegmentsMetadataForKey[key] = metadata
notifyMyLargeSegmentsUpdatedCalled = true
splitSynchronizer.notifyLargeSegmentsUpdated(forKey: key)
splitSynchronizer.notifyLargeSegmentsUpdated(forKey: key, metadata: metadata)
}

var notifyFeatureFlagsUpdatedCalled = false
Expand Down
8 changes: 6 additions & 2 deletions SplitTests/Fake/Streaming/SynchronizerStub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,20 @@ class SynchronizerStub: Synchronizer {
}

var notifySegmentsUpdatedForKeyCalled = [String: Bool]()
func notifySegmentsUpdated(forKey key: String) {
var updatedSegmentsMetadataForKey = [String: EventMetadata?]()
func notifySegmentsUpdated(forKey key: String, metadata: EventMetadata? = nil) {
notifySegmentsUpdatedForKeyCalled[key] = true
updatedLargeSegmentsMetadataForKey[key] = metadata
if let exp = notifyMySegmentsUpdatedExp[key] {
exp.fulfill()
}
}

var notifyLargeSegmentsUpdatedForKeyCalled = [String: Bool]()
func notifyLargeSegmentsUpdated(forKey key: String) {
var updatedLargeSegmentsMetadataForKey = [String: EventMetadata?]()
func notifyLargeSegmentsUpdated(forKey key: String, metadata: EventMetadata? = nil) {
notifyLargeSegmentsUpdatedForKeyCalled[key] = true
updatedLargeSegmentsMetadataForKey[key] = metadata
if let exp = notifyMyLargeSegmentsUpdatedExp[key] {
exp.fulfill()
}
Expand Down
Loading