Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,14 @@ var targets: [PackageDescription.Target] = [
.product(name: "ArgumentParser", package: "swift-argument-parser"),
]
),

.executableTarget(
name: "swift-clusterd",
dependencies: [
"DistributedCluster",
.product(name: "ArgumentParser", package: "swift-argument-parser"),
],
path: "Sources/Clusterd"
),
// ==== ------------------------------------------------------------------------------------------------------------
// MARK: Multi Node Tests

Expand Down
47 changes: 47 additions & 0 deletions Sources/Clusterd/boot+ClusterD.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2020-2024 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import ArgumentParser
import DistributedCluster
import Logging

@main
struct ClusterDBoot: AsyncParsableCommand {
@Option(name: .shortAndLong, help: "The port to bind the cluster daemon on.")
var port: Int = ClusterDaemon.defaultEndpoint.port

@Option(help: "The host address to bid the cluster daemon on.")
var host: String = ClusterDaemon.defaultEndpoint.host

func run() async throws {
let daemon = await ClusterSystem.startClusterDaemon(configuredWith: self.configure)

#if DEBUG
daemon.system.log.warning("RUNNING ClusterD DEBUG binary, operation is likely to be negatively affected. Please build/run the ClusterD process using '-c release' configuration!")
#endif

try daemon.system.park()
}

func configure(_ settings: inout ClusterSystemSettings) {
// other nodes will be discovering us, not the opposite
settings.discovery = .init(static: [])

settings.endpoint = Cluster.Endpoint(
systemName: "clusterd",
host: self.host,
port: self.port
)
}
}
6 changes: 6 additions & 0 deletions Sources/DistributedCluster/Cluster/DiscoveryShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ final class DiscoveryShell {

var behavior: _Behavior<Message> {
.setup { context in
// FIXME: should have a behavior to bridge the async world...
context.log.info("Initializing discovery: \(self.settings.implementation)")
// Try to initialise clusterd if needed
self.settings.initializeClusterd(context.system)
context.log.info("Initializing discovery, done.")

self.subscription = self.settings.subscribe(
onNext: { result in
switch result {
Expand Down
88 changes: 88 additions & 0 deletions Sources/DistributedCluster/ClusterSystem+Clusterd.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Atomics
import Backtrace
import CDistributedActorsMailbox
import Dispatch
import Distributed
import DistributedActorsConcurrencyHelpers
import Foundation // for UUID
import Logging
import NIO

extension ClusterSystem {
public static func startClusterDaemon(configuredWith configureSettings: (inout ClusterSystemSettings) -> Void = { _ in () }) async -> ClusterDaemon {
let system = await ClusterSystem("clusterd") { settings in
settings.endpoint = ClusterDaemon.defaultEndpoint
configureSettings(&settings)
}

return ClusterDaemon(system: system)
}
}

public struct ClusterDaemon {
public let system: ClusterSystem
public var settings: ClusterSystemSettings {
self.system.settings
}

public init(system: ClusterSystem) {
self.system = system
}
}

extension ClusterDaemon {
/// Suspends until the ``ClusterSystem`` is terminated by a call to ``shutdown()``.
public var terminated: Void {
get async throws {
try await self.system.terminated
}
}

/// Returns `true` if the system was already successfully terminated (i.e. awaiting ``terminated`` would resume immediately).
public var isTerminated: Bool {
self.system.isTerminated
}

/// Forcefully stops this actor system and all actors that live within it.
/// This is an asynchronous operation and will be executed on a separate thread.
///
/// You can use `shutdown().wait()` to synchronously await on the system's termination,
/// or provide a callback to be executed after the system has completed it's shutdown.
///
/// - Returns: A `Shutdown` value that can be waited upon until the system has completed the shutdown.
@discardableResult
public func shutdown() throws -> ClusterSystem.Shutdown {
try self.system.shutdown()
}
}

extension ClusterDaemon {
/// The default endpoint
public static let defaultEndpoint = Cluster.Endpoint(host: "127.0.0.1", port: 3137)
}

internal distributed actor ClusterDaemonServant {
typealias ActorSystem = ClusterSystem

@ActorID.Metadata(\.wellKnown)
public var wellKnownName: String

init(system: ClusterSystem) async {
self.actorSystem = system
self.wellKnownName = "$cluster-daemon-servant"
}
}
86 changes: 66 additions & 20 deletions Sources/DistributedCluster/ClusterSystemSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -405,18 +405,19 @@ protocol ClusterSystemInstrumentationProvider {
/// all the nodes of an existing cluster.
public struct ServiceDiscoverySettings {
let implementation: ServiceDiscoveryImplementation
private let _subscribe: (@escaping (Result<[Cluster.Endpoint], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken?

public init<Discovery, S>(_ implementation: Discovery, service: S)
where
Discovery: ServiceDiscovery,
Discovery.Instance == Cluster.Endpoint,
S == Discovery.Service
{
self.implementation = .dynamic(AnyServiceDiscovery(implementation))
self._subscribe = { onNext, onComplete in
implementation.subscribe(to: service, onNext: onNext, onComplete: onComplete)
}
self.implementation = .dynamic(
serviceDiscovery: AnyServiceDiscovery(implementation),
subscribe: { onNext, onComplete in
implementation.subscribe(to: service, onNext: onNext, onComplete: onComplete)
}
)
}

public init<Discovery, S>(_ implementation: Discovery, service: S, mapInstanceToNode transformer: @escaping (Discovery.Instance) throws -> Cluster.Endpoint)
Expand All @@ -425,33 +426,78 @@ public struct ServiceDiscoverySettings {
S == Discovery.Service
{
let mappedDiscovery: MapInstanceServiceDiscovery<Discovery, Cluster.Endpoint> = implementation.mapInstance(transformer)
self.implementation = .dynamic(AnyServiceDiscovery(mappedDiscovery))
self._subscribe = { onNext, onComplete in
mappedDiscovery.subscribe(to: service, onNext: onNext, onComplete: onComplete)
self.implementation = .dynamic(
serviceDiscovery: AnyServiceDiscovery(mappedDiscovery),
subscribe: { onNext, onComplete in
mappedDiscovery.subscribe(to: service, onNext: onNext, onComplete: onComplete)
}
)
}

init(clusterdEndpoint: Cluster.Endpoint) {
self.implementation = .clusterDaemon(
endpoint: clusterdEndpoint,
initialize: { system in
system.log.info("Joining [clusterd] at \(clusterdEndpoint)")
system.cluster.join(endpoint: clusterdEndpoint)
}
)
}

/// Locate the default `ClusterD` process and use it for discovering cluster nodes.
public static var clusterd: Self {
get {
Self.clusterd(endpoint: nil)
}
}

public static func clusterd(endpoint: Cluster.Endpoint?) -> Self {
ServiceDiscoverySettings(clusterdEndpoint: endpoint ?? ClusterDaemon.defaultEndpoint)
}

public static func seed(nodes: Set<Cluster.Endpoint>) -> Self {
.init(static: nodes)
}

public init(static nodes: Set<Cluster.Endpoint>) {
self.implementation = .static(nodes)
self._subscribe = { onNext, _ in
// Call onNext once and never again since the list of nodes doesn't change
onNext(.success(Array(nodes)))
// Ignore onComplete because static service discovery never terminates

// No cancellation token
return nil
}
self.implementation = .static(
endpoints: nodes,
subscribe: { onNext, _ in
// Call onNext once and never again since the list of nodes doesn't change
onNext(.success(Array(nodes)))
// Ignore onComplete because static service discovery never terminates

// No cancellation token
return nil
}
)
}

/// Similar to `ServiceDiscovery.subscribe` however it allows the handling of the listings to be generic and handled by the cluster system.
/// This function is only intended for internal use by the `DiscoveryShell`.
func subscribe(onNext nextResultHandler: @escaping (Result<[Cluster.Endpoint], Error>) -> Void, onComplete completionHandler: @escaping (CompletionReason) -> Void) -> CancellationToken? {
self._subscribe(nextResultHandler, completionHandler)
switch self.implementation {
case .static(_, let subscribe),
.dynamic(_, let subscribe):
subscribe(nextResultHandler, completionHandler)
case .clusterDaemon:
.none
}
}

func initializeClusterd(_ system: ClusterSystem) {
switch self.implementation {
case .clusterDaemon(_, let initialize):
initialize(system)
default:
break
}
}

enum ServiceDiscoveryImplementation {
case `static`(Set<Cluster.Endpoint>)
case dynamic(AnyServiceDiscovery)
case `static`(endpoints: Set<Cluster.Endpoint>, subscribe: (@escaping (Result<[Cluster.Endpoint], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken?)
case dynamic(serviceDiscovery: AnyServiceDiscovery, subscribe: (@escaping (Result<[Cluster.Endpoint], Error>) -> Void, @escaping (CompletionReason) -> Void) -> CancellationToken?)
case clusterDaemon(endpoint: Cluster.Endpoint, initialize: (ClusterSystem) -> Void)
Comment on lines +498 to +500
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to put subscribe and initialize closures here, as otherwise it's quite confusing to have optional values in settings itself.

}
}

Expand Down
7 changes: 5 additions & 2 deletions Sources/MultiNodeTestKit/MultiNodeTestKit.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public struct MultiNodeTest {
public let crashRegex: String?
public let runTest: (any MultiNodeTestControlProtocol) async throws -> Void
public let configureActorSystem: (inout ClusterSystemSettings) -> Void
public let startNode: (ClusterSystemSettings) async throws -> ClusterSystem
public let configureMultiNodeTest: (inout MultiNodeTestSettings) -> Void
public let makeControl: (String) -> any MultiNodeTestControlProtocol

Expand All @@ -51,6 +52,7 @@ public struct MultiNodeTest {
}

self.configureActorSystem = TestSuite.configureActorSystem
self.startNode = TestSuite.startNode
self.configureMultiNodeTest = TestSuite.configureMultiNodeTest

self.makeControl = { nodeName -> Control<TestSuite.Nodes> in
Expand Down Expand Up @@ -80,6 +82,7 @@ public protocol MultiNodeTestSuite {
init()
associatedtype Nodes: MultiNodeNodes
static func configureActorSystem(settings: inout ClusterSystemSettings)
static func startNode(settings: ClusterSystemSettings) async throws -> ClusterSystem
static func configureMultiNodeTest(settings: inout MultiNodeTestSettings)
}

Expand All @@ -88,8 +91,8 @@ extension MultiNodeTestSuite {
"\(Self.self)".split(separator: ".").last.map(String.init) ?? ""
}

public func configureActorSystem(settings: inout ClusterSystemSettings) {
// do nothing by default
public static func startNode(settings: ClusterSystemSettings) async throws -> ClusterSystem {
await ClusterSystem(settings: settings)
}

var nodeNames: [String] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,34 @@ extension MultiNodeTestKitRunnerBoot {
)
}

let actorSystem = await ClusterSystem(nodeName) { settings in
settings.bindHost = myNode.host
settings.bindPort = myNode.port

/// Configure a nicer logger, that pretty prints metadata and also includes source location of logs
if multiNodeSettings.installPrettyLogger {
settings.logging.baseLogger = Logger(
label: nodeName,
factory: { label in
PrettyMultiNodeLogHandler(nodeName: label, settings: multiNodeSettings.logCapture)
}
)
}
var settings = ClusterSystemSettings(name: nodeName)
settings.bindHost = myNode.host
settings.bindPort = myNode.port

/// Configure a nicer logger, that pretty prints metadata and also includes source location of logs
if multiNodeSettings.installPrettyLogger {
settings.logging.baseLogger = Logger(
label: nodeName,
factory: { label in
PrettyMultiNodeLogHandler(nodeName: label, settings: multiNodeSettings.logCapture)
}
)
}

// we use the singleton to implement a simple Coordinator
// TODO: if the node hosting the coordinator dies we'd potentially have some races at hand
// there's a few ways to solve this... but for now this is good enough.
settings += ClusterSingletonPlugin()
// we use the singleton to implement a simple Coordinator
// TODO: if the node hosting the coordinator dies we'd potentially have some races at hand
// there's a few ways to solve this... but for now this is good enough.
settings += ClusterSingletonPlugin()

multiNodeTest.configureActorSystem(&settings)
}
multiNodeTest.configureActorSystem(&settings)

// we use the singleton to implement a simple Coordinator
// TODO: if the node hosting the coordinator dies we'd potentially have some races at hand
// there's a few ways to solve this... but for now this is good enough.
settings += ClusterSingletonPlugin()
multiNodeTest.configureActorSystem(&settings)

let actorSystem = try await multiNodeTest.startNode(settings)
control._actorSystem = actorSystem

let signalQueue = DispatchQueue(label: "multi.node.\(multiNodeTest.testSuiteName).\(multiNodeTest.testName).\(nodeName).SignalHandlerQueue")
Expand Down
Loading
Loading