Skip to content

Commit efaea2d

Browse files
ShardedDaemonProcess push mode (#7229)
* stashing work * finished first pass at push-based `ShardedDaemonProcess` close #7195 * fixed issues with message propagation * made `ShardedDaemonProcessSettings` support `nullable` * cleanup logging * added standard function for name formatting * Fix unit test * added documentation --------- Co-authored-by: Gregorius Soedharmo <[email protected]>
1 parent 356c911 commit efaea2d

File tree

7 files changed

+295
-44
lines changed

7 files changed

+295
-44
lines changed

docs/articles/clustering/cluster-sharded-daemon-process.md

+17-2
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,20 @@ when starting up:
3232

3333
## Scalability
3434

35-
This cluster tool is intended for small numbers of consumers and will not scale well to a large set. In large clusters
36-
it is recommended to limit the nodes the sharded daemon process will run on using a role.
35+
This cluster tool is intended for small numbers of consumers and will not scale well to a large set. In large clusters it is recommended to limit the nodes the sharded daemon process will run on using a role.
36+
37+
## Push-Based Communication
38+
39+
[`ShardedDaemonProcess`](xref:Akka.Cluster.Sharding.ShardedDaemonProcess) also supports push-based communication not too dissimilar from a round-robin `Router`:
40+
41+
[!code-csharp[IActorRef returned by ShardedDaemonProcess](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessProxySpec.cs?name=PushDaemon)]
42+
43+
The `ShardedDaemonProcess.Init` call returns an `IActorRef` - any messages you send to this `IActorRef` will be routed to one of the `n` worker instances you specified.
44+
45+
## Daemon Proxies
46+
47+
You can also interact with `ShardedDaemonProcess` on nodes that don't use the same [Akka.Cluster role](xref:member-roles) as the `ShardedDaemonProcess` host itself.
48+
49+
[!code-csharp[IActorRef returned by ShardedDaemonProcess](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessProxySpec.cs?name=PushDaemonProxy)]
50+
51+
Under the covers we use a [`ShardRegionProxy`](xref:Akka.Cluster.Sharding.ClusterSharding#Akka_Cluster_Sharding_ClusterSharding_StartProxy_System_String_System_String_Akka_Cluster_Sharding_IMessageExtractor_) to forward any messages you send to the `IActorRef` returned by the `ShardedDaemonProcess.InitProxy` method.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="ShardedDaemonProcessProxySpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
// -----------------------------------------------------------------------
7+
8+
using System.Linq;
9+
using System.Threading.Tasks;
10+
using Akka.Actor;
11+
using Akka.Cluster.Tools.Singleton;
12+
using Akka.Configuration;
13+
using Akka.TestKit;
14+
using Xunit;
15+
using Xunit.Abstractions;
16+
using FluentAssertions;
17+
18+
namespace Akka.Cluster.Sharding.Tests;
19+
20+
public class ShardedDaemonProcessProxySpec : AkkaSpec
21+
{
22+
private static Config GetConfig()
23+
{
24+
return ConfigurationFactory.ParseString(@"
25+
akka.loglevel = DEBUG
26+
akka.actor.provider = cluster
27+
akka.remote.dot-netty.tcp.port = 0
28+
akka.remote.dot-netty.tcp.hostname = localhost
29+
30+
# ping often/start fast for test
31+
akka.cluster.sharded-daemon-process.keep-alive-interval = 1s
32+
akka.cluster.roles = [workers]
33+
akka.coordinated-shutdown.terminate-actor-system = off
34+
akka.coordinated-shutdown.run-by-actor-system-terminate = off")
35+
.WithFallback(ClusterSharding.DefaultConfig())
36+
.WithFallback(ClusterSingletonProxy.DefaultConfig())
37+
.WithFallback(DistributedData.DistributedData.DefaultConfig());
38+
}
39+
40+
private readonly ActorSystem _proxySystem;
41+
42+
public ShardedDaemonProcessProxySpec(ITestOutputHelper output)
43+
: base(GetConfig(), output: output)
44+
{
45+
_proxySystem = ActorSystem.Create(Sys.Name,
46+
ConfigurationFactory
47+
.ParseString("akka.cluster.roles=[proxy]").WithFallback(Sys.Settings.Config));
48+
InitializeLogger(_proxySystem, "PROXY");
49+
}
50+
51+
private class EchoActor : ReceiveActor
52+
{
53+
public static Props EchoProps(int i) => Props.Create(() => new EchoActor());
54+
55+
public EchoActor()
56+
{
57+
ReceiveAny(msg => Sender.Tell(msg));
58+
}
59+
}
60+
61+
[Fact]
62+
public async Task ShardedDaemonProcessProxy_must_start_daemon_process_on_proxy()
63+
{
64+
// form a cluster with one node
65+
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);
66+
Cluster.Get(_proxySystem).Join(Cluster.Get(Sys).SelfAddress);
67+
68+
// validate that we have a 2 node cluster with both members marked as up
69+
await AwaitAssertAsync(() =>
70+
{
71+
Cluster.Get(Sys).State.Members.Count(x => x.Status == MemberStatus.Up).Should().Be(2);
72+
Cluster.Get(_proxySystem).State.Members.Count(x => x.Status == MemberStatus.Up).Should().Be(2);
73+
});
74+
75+
// <PushDaemon>
76+
// start the daemon process on the host
77+
var name = "daemonTest";
78+
var targetRole = "workers";
79+
var numWorkers = 10;
80+
var settings = ShardedDaemonProcessSettings.Create(Sys).WithRole(targetRole);
81+
IActorRef host = ShardedDaemonProcess.Get(Sys).Init(name, numWorkers, EchoActor.EchoProps, settings, PoisonPill.Instance);
82+
83+
// ping some of the workers via the host
84+
for(var i = 0; i < numWorkers; i++)
85+
{
86+
var result = await host.Ask<int>(i);
87+
result.Should().Be(i);
88+
}
89+
// </PushDaemon>
90+
91+
// <PushDaemonProxy>
92+
// start the proxy on the proxy system, which runs on a different role not capable of hosting workers
93+
IActorRef proxy = ShardedDaemonProcess.Get(_proxySystem).InitProxy(name, numWorkers, targetRole);
94+
95+
// ping some of the workers via the proxy
96+
for(var i = 0; i < numWorkers; i++)
97+
{
98+
var result = await proxy.Ask<int>(i);
99+
result.Should().Be(i);
100+
}
101+
// </PushDaemonProxy>
102+
}
103+
104+
protected override void AfterAll()
105+
{
106+
Shutdown(_proxySystem);
107+
base.AfterAll();
108+
}
109+
}

src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardedDaemonProcessSpec.cs

+15-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Akka.Configuration;
1313
using Akka.Event;
1414
using Akka.TestKit;
15+
using FluentAssertions;
1516
using Xunit;
1617

1718
namespace Akka.Cluster.Sharding.Tests
@@ -21,7 +22,10 @@ public class ShardedDaemonProcessSpec : AkkaSpec
2122
private sealed class Stop
2223
{
2324
public static Stop Instance { get; } = new();
24-
private Stop() { }
25+
26+
private Stop()
27+
{
28+
}
2529
}
2630

2731
private sealed class Started
@@ -81,14 +85,16 @@ private static Config GetConfig()
8185

8286
public ShardedDaemonProcessSpec()
8387
: base(GetConfig())
84-
{ }
88+
{
89+
}
8590

8691
[Fact]
8792
public void ShardedDaemonProcess_must_have_a_single_node_cluster_running_first()
8893
{
8994
var probe = CreateTestProbe();
9095
Cluster.Get(Sys).Join(Cluster.Get(Sys).SelfAddress);
91-
probe.AwaitAssert(() => Cluster.Get(Sys).SelfMember.Status.ShouldBe(MemberStatus.Up), TimeSpan.FromSeconds(3));
96+
probe.AwaitAssert(() => Cluster.Get(Sys).SelfMember.Status.ShouldBe(MemberStatus.Up),
97+
TimeSpan.FromSeconds(3));
9298
}
9399

94100
[Fact]
@@ -126,7 +132,9 @@ public void ShardedDaemonProcess_must_not_run_if_the_role_does_not_match_node_ro
126132

127133
var probe = CreateTestProbe();
128134
var settings = ShardedDaemonProcessSettings.Create(Sys).WithRole("workers");
129-
ShardedDaemonProcess.Get(Sys).Init("roles", 3, id => MyActor.Props(id, probe.Ref), settings, PoisonPill.Instance);
135+
var actorRef = ShardedDaemonProcess.Get(Sys).Init("roles", 3, id => MyActor.Props(id, probe.Ref), settings,
136+
PoisonPill.Instance);
137+
actorRef.Should().BeNull();
130138

131139
probe.ExpectNoMsg();
132140
}
@@ -152,9 +160,11 @@ protected override void PreStart()
152160
private void DocExample()
153161
{
154162
#region tag-processing
163+
155164
var tags = new[] { "tag-1", "tag-2", "tag-3" };
156165
ShardedDaemonProcess.Get(Sys).Init("TagProcessors", tags.Length, id => TagProcessor.Props(tags[id]));
166+
157167
#endregion
158168
}
159169
}
160-
}
170+
}

0 commit comments

Comments
 (0)