Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ let system = await ClusterSystem("System") { settings in
}

Task {
for await event in system.cluster.events {
for await event in system.cluster.events() {
system.log.info("Event: \(event)")
}
}
Expand Down
38 changes: 26 additions & 12 deletions Sources/DistributedCluster/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@ public struct ClusterControl {
/// Settings the cluster node is configured with.
public let settings: ClusterSystemSettings

let events: ClusterEventStream

/// Sequence of cluster events.
///
/// This sequence begins with a snapshot of the current cluster state and continues with events representing changes
/// since the snapshot.
public let events: ClusterEventStream
/// since the snapshot.
public func events(file: String = #fileID, line: UInt = #line) -> ClusterEventStream {
var events = self.events
events.sourceFileLoc = file
events.sourceLineLoc = line
return events
}

/// Offers a snapshot of membership, which may be used to perform ad-hoc tests against the membership.
/// Note that this view may be immediately outdated after checking if, if e.g. a membership change is just being processed.
Expand Down Expand Up @@ -232,7 +239,9 @@ public struct ClusterControl {
/// If the expected status is `.down` or `.removed`, and the node is already known to have been removed from the cluster
/// a synthesized `Cluster/MemberStatus/removed` (and `.unreachable`) member is returned.
@discardableResult
public func waitFor(_ node: UniqueNode, _ status: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member {
public func waitFor(_ node: UniqueNode, _ status: Cluster.MemberStatus, within: Duration,
file: String = #fileID, line: UInt = #line) async throws -> Cluster.Member
{
try await self.waitForMembershipEventually(within: within) { membership in
if status == .down || status == .removed {
if let cluster = self.cluster, cluster.getExistingAssociationTombstone(with: node) != nil {
Expand All @@ -245,11 +254,11 @@ public struct ClusterControl {
// so we're seeing an already removed member, this can indeed happen and is okey
return Cluster.Member(node: node, status: .removed).asUnreachable
}
throw Cluster.MembershipError(.notFound(node, in: membership))
throw Cluster.MembershipError(.notFound(node, in: membership), file: file, line: line)
}

if status != foundMember.status {
throw Cluster.MembershipError(.statusRequirementNotMet(expected: status, found: foundMember))
throw Cluster.MembershipError(.statusRequirementNotMet(expected: status, found: foundMember), file: file, line: line)
}
return foundMember
}
Expand All @@ -266,17 +275,19 @@ public struct ClusterControl {
/// If the expected status is `.down` or `.removed`, and the node is already known to have been removed from the cluster
/// a synthesized `Cluster/MemberStatus/removed` (and `.unreachable`) member is returned.
@discardableResult
public func waitFor(_ node: Node, _ status: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member? {
public func waitFor(_ node: Node, _ status: Cluster.MemberStatus, within: Duration,
file: String = #fileID, line: UInt = #line) async throws -> Cluster.Member?
{
try await self.waitForMembershipEventually(Cluster.Member?.self, within: within) { membership in
guard let foundMember = membership.member(node) else {
if status == .down || status == .removed {
return nil
}
throw Cluster.MembershipError(.notFoundAny(node, in: membership))
throw Cluster.MembershipError(.notFoundAny(node, in: membership), file: file, line: line)
}

if status != foundMember.status {
throw Cluster.MembershipError(.statusRequirementNotMet(expected: status, found: foundMember))
throw Cluster.MembershipError(.statusRequirementNotMet(expected: status, found: foundMember), file: file, line: line)
}
return foundMember
}
Expand All @@ -293,7 +304,9 @@ public struct ClusterControl {
/// If the expected status is at least `.down` or `.removed`, and either a tombstone exists for the node or the associated
/// membership is not found, the `Cluster.Member` returned would have `.removed` status and *unreachable*.
@discardableResult
public func waitFor(_ node: UniqueNode, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration) async throws -> Cluster.Member {
public func waitFor(_ node: UniqueNode, atLeast atLeastStatus: Cluster.MemberStatus, within: Duration,
file: String = #fileID, line: UInt = #line) async throws -> Cluster.Member
{
try await self.waitForMembershipEventually(within: within) { membership in
if atLeastStatus == .down || atLeastStatus == .removed {
if let cluster = self.cluster, cluster.getExistingAssociationTombstone(with: node) != nil {
Expand All @@ -306,11 +319,11 @@ public struct ClusterControl {
// so we're seeing an already removed member, this can indeed happen and is okey
return Cluster.Member(node: node, status: .removed).asUnreachable
}
throw Cluster.MembershipError(.notFound(node, in: membership))
throw Cluster.MembershipError(.notFound(node, in: membership), file: file, line: line)
}

if atLeastStatus <= foundMember.status {
throw Cluster.MembershipError(.atLeastStatusRequirementNotMet(expectedAtLeast: atLeastStatus, found: foundMember))
throw Cluster.MembershipError(.atLeastStatusRequirementNotMet(expectedAtLeast: atLeastStatus, found: foundMember), file: file, line: line)
}
return foundMember
}
Expand All @@ -319,6 +332,7 @@ public struct ClusterControl {
private func waitForMembershipEventually<T>(_: T.Type = T.self,
within: Duration,
interval: Duration = .milliseconds(100),
file: String = #fileID, line: UInt = #line,
_ block: (Cluster.Membership) async throws -> T) async throws -> T
{
let deadline = ContinuousClock.Instant.fromNow(within)
Expand All @@ -335,6 +349,6 @@ public struct ClusterControl {
}
}

throw Cluster.MembershipError(.awaitStatusTimedOut(within, lastError))
throw Cluster.MembershipError(.awaitStatusTimedOut(within, lastError), file: file, line: line)
}
}
70 changes: 60 additions & 10 deletions Sources/DistributedCluster/Cluster/ClusterEventStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public struct ClusterEventStream: AsyncSequence {

private let actor: ClusterEventStreamActor?

internal var sourceFileLoc: String = ""
internal var sourceLineLoc: UInt = 0

internal init(_ system: ClusterSystem, customName: String? = nil) {
var props = ClusterEventStreamActor.props
if let customName = customName {
Expand Down Expand Up @@ -69,11 +72,16 @@ public struct ClusterEventStream: AsyncSequence {
}
}

private func subscribe(_ oid: ObjectIdentifier, eventHandler: @escaping (Cluster.Event) -> Void) async {
private func subscribe(
_ oid: ObjectIdentifier,
subscriberFile subLocFile: String,
subscriberLine subLocLine: UInt,
eventHandler: @escaping (Cluster.Event) -> Void
) async {
guard let actor = self.actor else { return }

await actor.whenLocal { __secretlyKnownToBeLocal in // TODO(distributed): this is annoying, we must track "known to be local" in typesystem instead
__secretlyKnownToBeLocal.subscribe(oid, eventHandler: eventHandler)
__secretlyKnownToBeLocal.subscribe(oid, subscriberFile: subLocFile, subscriberLine: subLocLine, eventHandler: eventHandler)
}
}

Expand All @@ -94,17 +102,19 @@ public struct ClusterEventStream: AsyncSequence {
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(self)
AsyncIterator(self, subscriberFile: self.sourceFileLoc, subscriberLine: self.sourceLineLoc)
}

public class AsyncIterator: AsyncIteratorProtocol {
var underlying: AsyncStream<Cluster.Event>.Iterator!

init(_ eventStream: ClusterEventStream) {
init(_ eventStream: ClusterEventStream,
subscriberFile subLocFile: String, subscriberLine subLocLine: UInt)
{
let id = ObjectIdentifier(self)
self.underlying = AsyncStream<Cluster.Event> { continuation in
Task {
await eventStream.subscribe(id) { event in
await eventStream.subscribe(id, subscriberFile: subLocFile, subscriberLine: subLocLine) { event in
continuation.yield(event)
}
}
Expand All @@ -128,7 +138,7 @@ internal distributed actor ClusterEventStreamActor: LifecycleWatch {

static var props: _Props {
var ps = _Props()
ps._knownActorName = "clustEventStream"
ps._knownActorName = "clusterEventStream"
ps._systemActor = true
ps._wellKnown = true
return ps
Expand All @@ -145,7 +155,44 @@ internal distributed actor ClusterEventStreamActor: LifecycleWatch {
private var snapshot = Cluster.Membership.empty

private var subscribers: [ActorID: _ActorRef<Cluster.Event>] = [:]
private var asyncSubscribers: [ObjectIdentifier: (Cluster.Event) -> Void] = [:]
private var asyncSubscribers: [SubscriberID: (Cluster.Event) -> Void] = [:]

struct SubscriberID: Hashable, CustomStringConvertible {
let oid: ObjectIdentifier
let file: String
let line: UInt

init(oid: ObjectIdentifier, file: String, line: UInt) {
self.oid = oid
self.file = file
self.line = line
}

init(forRemovalOf oid: ObjectIdentifier) {
self.oid = oid
self.file = ""
self.line = 0
}

var description: String {
if self.line > 0 {
return "SubscriberID(oid: \(self.oid), loc: \(self.file):\(self.line))"
} else {
return "SubscriberID(oid: \(self.oid))"
}
}

func hash(into hasher: inout Hasher) {
hasher.combine(self.oid)
}

static func == (lhs: SubscriberID, rhs: SubscriberID) -> Bool {
if lhs.oid != rhs.oid {
return false
}
return true
}
}

private lazy var log = Logger(actor: self)

Expand All @@ -167,14 +214,17 @@ internal distributed actor ClusterEventStreamActor: LifecycleWatch {
}
}

func subscribe(_ oid: ObjectIdentifier, eventHandler: @escaping (Cluster.Event) -> Void) {
self.asyncSubscribers[oid] = eventHandler
func subscribe(_ oid: ObjectIdentifier,
subscriberFile subLocFile: String, subscriberLine subLocLine: UInt,
eventHandler: @escaping (Cluster.Event) -> Void)
{
self.asyncSubscribers[.init(oid: oid, file: subLocFile, line: subLocLine)] = eventHandler
self.log.trace("Successfully added async subscriber [\(oid)], offering membership snapshot")
eventHandler(.snapshot(self.snapshot))
}

func unsubscribe(_ oid: ObjectIdentifier) {
if self.asyncSubscribers.removeValue(forKey: oid) != nil {
if self.asyncSubscribers.removeValue(forKey: .init(forRemovalOf: oid)) != nil {
self.log.trace("Successfully removed async subscriber [\(oid)]")
} else {
self.log.warning("Received async `.unsubscribe` for non-subscriber [\(oid)]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ internal distributed actor DowningStrategyShell {
self.eventsListeningTask = Task { [weak self] in
guard let __secretlyKnownToBeLocal = self else { return } // FIXME(local): we really need `local` here

for await event in system.cluster.events {
for await event in system.cluster.events() {
try __secretlyKnownToBeLocal.receiveClusterEvent(event)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public distributed actor OpLogDistributedReceptionist: DistributedReceptionist,
assert(self.id.path.description == "/system/receptionist") // TODO(distributed): remove when we remove paths entirely

self.eventsListeningTask = Task { [weak self, system] in
for try await event in system.cluster.events {
for try await event in system.cluster.events() {
guard let __secretlyKnownToBeLocal = self else { return }
__secretlyKnownToBeLocal.onClusterEvent(event: event)
}
Expand Down
8 changes: 4 additions & 4 deletions Sources/DistributedCluster/Concurrency/_Condition.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ public final class _Condition {
}

@inlinable
public func wait(_ mutex: _Mutex) {
public func wait(_ mutex: _Mutex, file: String = #fileID, line: UInt = #line) {
let error = pthread_cond_wait(&self.condition, &mutex.mutex)

switch error {
case 0:
return
case EPERM:
fatalError("Wait failed, mutex is not owned by this thread")
fatalError("[\(file):\(line)] Wait failed, mutex is not owned by this thread")
case EINVAL:
fatalError("Wait failed, condition is not valid")
fatalError("[\(file):\(line)] Wait failed, condition is not valid")
default:
fatalError("Wait failed with unspecified error: \(error)")
fatalError("[\(file):\(line)] Wait failed with unspecified error: \(error)")
}
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/DistributedCluster/Docs.docc/Clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ A cluster member goes through the following phases in its lifecycle:
You can listen to cluster events by subscribing to their async sequence available on the cluster control object, like this:

```swift
for await event in system.cluster.events {
for await event in system.cluster.events() {
switch event {
case .snapshot(let membership):
// handle a snapshot of the current state of the cluster,
Expand Down Expand Up @@ -157,7 +157,7 @@ by applying all the incoming events one by one:
```swift
var membership = Cluster.Membership.empty

for await event in system.cluster.events {
for await event in system.cluster.events() {
if case .membershipChanged(let change) = event {
guard change.node == system.cluster.uniqueNode else {
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ internal distributed actor ClusterSingletonBoss<Act: ClusterSingleton>: ClusterS
if system.settings.enabled {
self.clusterEventsSubscribeTask = Task {
// Subscribe to ``Cluster/Event`` in order to update `targetNode`
for await event in system.cluster.events {
for await event in system.cluster.events() {
try await self.receiveClusterEvent(event)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ final class ClusterSingletonPluginClusteredTests: ClusteredActorSystemsXCTestCas
try await self.assertSingletonRequestReply(third, singleton: ref3, greetingName: "Charlie", expectedPrefix: "Hello-1 Charlie!")
}

override var alwaysPrintCaptureLogs: Bool {
true
}

func test_singleton_lifecycle() async throws {
var singletonSettings = ClusterSingletonSettings()
singletonSettings.allocationStrategy = .byLeadership
Expand Down