Skip to content

Commit b873cdb

Browse files
Add stream
1 parent 0f9b20d commit b873cdb

18 files changed

+318
-65
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using System.Runtime.Serialization;
2+
3+
namespace Nameless.Orleans.Client.Contracts;
4+
5+
[DataContract]
6+
public record CustomerAccount {
7+
[DataMember]
8+
public Guid AccountId { get; init; }
9+
}

Nameless.Orleans.Client/Program.cs

+41-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Azure.Data.Tables;
22
using Nameless.Orleans.Client.Contracts;
3+
using Nameless.Orleans.Core;
34
using Nameless.Orleans.Grains.Abstractions;
45
using Orleans.Configuration;
56

@@ -11,12 +12,12 @@ public static void Main(string[] args) {
1112

1213
builder.Host.UseOrleansClient((_, client) => {
1314
client.UseAzureStorageClustering(options => {
14-
options.TableServiceClient = new TableServiceClient("UseDevelopmentStorage=true;");
15+
options.TableServiceClient = new TableServiceClient(Constants.ConnectionString);
1516
});
1617

1718
client.Configure<ClusterOptions>(options => {
18-
options.ClusterId = "Nameless_Orleans_Cluster";
19-
options.ServiceId = "Nameless_Orleans_Service";
19+
options.ClusterId = Constants.ClusterId;
20+
options.ServiceId = Constants.ServiceId;
2021
});
2122

2223
client.UseTransactions();
@@ -31,7 +32,10 @@ public static void Main(string[] args) {
3132

3233
await atmGrain.InitializeAsync(createAtm.OpeningBalance);
3334

34-
return TypedResults.Created($"/atm/{atmId}", new { AtmId = atmId });
35+
return TypedResults.Created($"/atm/{atmId}", new {
36+
AtmId = atmId,
37+
createAtm.OpeningBalance
38+
});
3539
});
3640

3741
// Retrieves the ATM current balance
@@ -49,11 +53,18 @@ await transactionClient.RunTransaction(TransactionOption.Create, async () => {
4953
});
5054

5155
// Withdraw from ATM
52-
app.MapPost("/atm/{atmId}/withdraw", async (Guid atmId, AtmWithdraw atmWithdraw, IClusterClient clusterClient) => {
56+
app.MapPost("/atm/{atmId}/withdraw", async (Guid atmId,
57+
AtmWithdraw atmWithdraw,
58+
IClusterClient clusterClient,
59+
ITransactionClient transactionClient) => {
5360
var atmGrain = clusterClient.GetGrain<IAtmGrain>(atmId);
5461
var accountGrain = clusterClient.GetGrain<IAccountGrain>(atmWithdraw.AccountId);
5562

56-
await atmGrain.WithdrawAsync(atmWithdraw.AccountId, atmWithdraw.Amount);
63+
await transactionClient.RunTransaction(TransactionOption.Create, async () => {
64+
await atmGrain.WithdrawAsync(atmWithdraw.AccountId, atmWithdraw.Amount);
65+
await accountGrain.DebitAsync(atmWithdraw.Amount);
66+
});
67+
5768
var currentAtmBalance = await atmGrain.GetCurrentBalanceAsync();
5869
var currentAccountBalance = await accountGrain.GetCurrentBalanceAsync();
5970

@@ -140,6 +151,30 @@ await transactionClient.RunTransaction(TransactionOption.Create, async () => {
140151
});
141152
});
142153

154+
// Get customer net worth
155+
app.MapGet("/customer/{customerId}/net_worth", async (Guid customerId, IClusterClient clusterClient) => {
156+
var customerGrain = clusterClient.GetGrain<ICustomerGrain>(customerId);
157+
158+
var netWorth = await customerGrain.GetNetWorthAsync(customerId);
159+
160+
return TypedResults.Ok(new {
161+
CustomerId = customerId,
162+
NetWorth = netWorth
163+
});
164+
});
165+
166+
// Get customer net worth
167+
app.MapPost("/customer/{customerId:guid}/add_account", async (Guid customerId, CustomerAccount customerAccount, IClusterClient clusterClient) => {
168+
var customerGrain = clusterClient.GetGrain<ICustomerGrain>(customerId);
169+
170+
await customerGrain.AddAccountAsync(customerAccount.AccountId);
171+
172+
return TypedResults.Ok(new {
173+
CustomerId = customerId,
174+
customerAccount.AccountId
175+
});
176+
});
177+
143178
app.Run();
144179
}
145180
}

Nameless.Orleans.Client/atm.http

+47-7
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,18 @@ GET {{HostAddress}}/atm/{{CreateAtm.response.body.$.AtmId}}/balance
1616

1717
###
1818

19+
# Withdraw from ATM
20+
21+
POST {{HostAddress}}/atm/{{CreateAtm.response.body.$.AtmId}}/withdraw
22+
Content-Type: application/json
23+
24+
{
25+
"AccountId": "{{CreateAccount.response.body.$.AccountId}}",
26+
"Amount": 50.00
27+
}
28+
29+
###
30+
1931
# Create Account
2032
# @name CreateAccount
2133

@@ -28,32 +40,60 @@ Content-Type: application/json
2840

2941
###
3042

31-
# Get Account current balance
43+
# Retrieves Account current balance
3244

3345
GET {{HostAddress}}/account/{{CreateAccount.response.body.$.AccountId}}/balance
3446

3547
###
3648

37-
# Withdraw from ATM
49+
# Creates a debit in an account
3850

39-
POST {{HostAddress}}/atm/{{CreateAtm.response.body.$.AtmId}}/withdraw
51+
POST {{HostAddress}}/account/{{CreateAccount.response.body.$.AccountId}}/debit
4052
Content-Type: application/json
4153

4254
{
43-
"AccountId": "{{CreateAccount.response.body.$.AccountId}}",
44-
"Amount": 50.00
55+
"Amount": 15.00
4556
}
4657

4758
###
4859

49-
# Add recurring payment for Account
60+
# Creates a credit into an account
61+
62+
POST {{HostAddress}}/account/{{CreateAccount.response.body.$.AccountId}}/credit
63+
Content-Type: application/json
64+
65+
{
66+
"Amount": 30.00
67+
}
68+
69+
###
70+
71+
# Creates a recurring payment on an Account
5072

5173
POST {{HostAddress}}/account/{{CreateAccount.response.body.$.AccountId}}/recurring_payment
5274
Content-Type: application/json
5375

5476
{
55-
"Amount": 5.00,
77+
"Amount": 10.00,
5678
"PeriodInMinutes": 1
5779
}
5880

81+
###
82+
83+
# Get customer net worth
84+
# For this example's sake, the customer ID will be the same as account ID
85+
86+
GET {{HostAddress}}/customer/{{CreateAccount.response.body.$.AccountId}}/net_worth
87+
88+
###
89+
90+
# Add account to customer.
91+
92+
POST {{HostAddress}}/customer/{{CreateAccount.response.body.$.AccountId}}/add_account
93+
Content-Type: application/json
94+
95+
{
96+
"AccountId": "{{CreateAccount.response.body.$.AccountId}}"
97+
}
98+
5999
###

Nameless.Orleans.Core/Constants.cs

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace Nameless.Orleans.Core;
2+
3+
public static class Constants {
4+
public const string ClusterId = "nameless-orleans-cluster";
5+
public const string ServiceId = "nameless-orleans-sercice";
6+
public const string ConnectionString = "UseDevelopmentStorage=true;";
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net9.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
</PropertyGroup>
8+
9+
</Project>

Nameless.Orleans.Core/StorageNames.cs

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace Nameless.Orleans.Core;
2+
3+
public static class StorageNames {
4+
public const string TableStorage = nameof(TableStorage);
5+
public const string BlobStorage = nameof(BlobStorage);
6+
public const string PubSubStore = nameof(PubSubStore);
7+
}

Nameless.Orleans.Core/StreamNames.cs

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
namespace Nameless.Orleans.Core;
2+
3+
public static class StreamNames {
4+
public const string StreamProvider = nameof(StreamProvider);
5+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace Nameless.Orleans.Grains.Abstractions;
2+
3+
public interface ICustomerGrain : IGrainWithGuidKey {
4+
Task AddAccountAsync(Guid accountId);
5+
6+
Task<decimal> GetNetWorthAsync(Guid accountId);
7+
}

Nameless.Orleans.Grains/AccountGrain.cs

+27-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using Nameless.Orleans.Grains.Abstractions;
1+
using Nameless.Orleans.Core;
2+
using Nameless.Orleans.Grains.Abstractions;
3+
using Nameless.Orleans.Grains.Events;
24
using Nameless.Orleans.Grains.States;
35
using Orleans.Concurrency;
46
using Orleans.Transactions.Abstractions;
@@ -17,7 +19,7 @@ public AccountGrain(
1719
[TransactionalState(nameof(BalanceState))]
1820
ITransactionalState<BalanceState> balanceState,
1921

20-
[PersistentState(nameof(AccountState), "blobStorage")]
22+
[PersistentState(nameof(AccountState), StorageNames.BlobStorage)]
2123
IPersistentState<AccountState> accountState) {
2224
_transactionClient = transactionClient;
2325
_balanceState = balanceState;
@@ -42,16 +44,22 @@ await _balanceState.PerformUpdate(state => {
4244
public Task<decimal> GetCurrentBalanceAsync()
4345
=> _balanceState.PerformRead(state => state.Amount);
4446

45-
public Task DebitAsync(decimal value)
46-
=> _balanceState.PerformUpdate(state => {
47+
public async Task DebitAsync(decimal value) {
48+
await _balanceState.PerformUpdate(state => {
4749
state.Amount -= value;
4850
});
4951

50-
public Task CreditAsync(decimal value)
51-
=> _balanceState.PerformUpdate(state => {
52+
await PublishBalanceChangeEventAsync();
53+
}
54+
55+
public async Task CreditAsync(decimal value) {
56+
await _balanceState.PerformUpdate(state => {
5257
state.Amount += value;
5358
});
5459

60+
await PublishBalanceChangeEventAsync();
61+
}
62+
5563
public async Task AddRecurringPaymentAsync(decimal amount, int periodInMinutes) {
5664
var recurringPayment = new RecurringPayment {
5765
Id = Guid.NewGuid(),
@@ -83,4 +91,17 @@ public Task ReceiveReminder(string reminderName, TickStatus status) {
8391
return _transactionClient.RunTransaction(transactionOption: TransactionOption.Create,
8492
transactionDelegate: () => DebitAsync(recurringPayment.Amount));
8593
}
94+
95+
private async Task PublishBalanceChangeEventAsync() {
96+
var accountId = this.GetGrainId().GetGuidKey();
97+
var streamProvider = this.GetStreamProvider(StreamNames.StreamProvider);
98+
var streamId = StreamId.Create(Constants.BalanceStreamNamespace, accountId);
99+
var stream = streamProvider.GetStream<BalanceChangeEvent>(streamId);
100+
var currentBalance = await GetCurrentBalanceAsync();
101+
102+
await stream.OnNextAsync(new BalanceChangeEvent {
103+
AccountId = accountId,
104+
CurrentBalance = currentBalance
105+
});
106+
}
86107
}

Nameless.Orleans.Grains/Constants.cs

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
namespace Nameless.Orleans.Grains;
2+
3+
internal static class Constants {
4+
internal const string BalanceStreamNamespace = "BalanceStream";
5+
}
+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
using Nameless.Orleans.Core;
2+
using Nameless.Orleans.Grains.Abstractions;
3+
using Nameless.Orleans.Grains.Events;
4+
using Nameless.Orleans.Grains.States;
5+
using Orleans.Streams;
6+
7+
namespace Nameless.Orleans.Grains;
8+
9+
public class CustomerGrain : Grain, ICustomerGrain, IAsyncObserver<BalanceChangeEvent> {
10+
private readonly IPersistentState<CustomerState> _customerState;
11+
12+
public CustomerGrain(
13+
[PersistentState(nameof(CustomerState), StorageNames.TableStorage)]
14+
IPersistentState<CustomerState> customerState) {
15+
_customerState = customerState;
16+
}
17+
18+
public async Task AddAccountAsync(Guid accountId) {
19+
_customerState.State.BalanceByAccountId[accountId] = 0M;
20+
21+
var streamProvider = this.GetStreamProvider(StreamNames.StreamProvider);
22+
var streamId = StreamId.Create(Constants.BalanceStreamNamespace, accountId);
23+
var stream = streamProvider.GetStream<BalanceChangeEvent>(streamId);
24+
25+
await stream.SubscribeAsync(this);
26+
27+
await _customerState.WriteStateAsync();
28+
}
29+
30+
public Task<decimal> GetNetWorthAsync(Guid accountId) {
31+
_customerState.State
32+
.BalanceByAccountId
33+
.TryGetValue(accountId, out var balance);
34+
35+
return Task.FromResult(balance);
36+
}
37+
38+
public Task OnNextAsync(BalanceChangeEvent item, StreamSequenceToken? token = null) {
39+
_customerState.State.BalanceByAccountId[item.AccountId] = item.CurrentBalance;
40+
41+
return _customerState.WriteStateAsync();
42+
}
43+
44+
public Task OnErrorAsync(Exception ex) => Task.CompletedTask;
45+
46+
public override async Task OnActivateAsync(CancellationToken cancellationToken) {
47+
var streamProvider = this.GetStreamProvider(StreamNames.StreamProvider);
48+
49+
foreach (var accountId in _customerState.State.BalanceByAccountId.Keys) {
50+
var streamId = StreamId.Create(Constants.BalanceStreamNamespace, accountId);
51+
var stream = streamProvider.GetStream<BalanceChangeEvent>(streamId);
52+
var handles = await stream.GetAllSubscriptionHandles();
53+
54+
foreach (var handle in handles) {
55+
await handle.ResumeAsync(this);
56+
}
57+
}
58+
59+
await base.OnActivateAsync(cancellationToken);
60+
}
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace Nameless.Orleans.Grains.Events;
2+
3+
[GenerateSerializer]
4+
public record BalanceChangeEvent {
5+
[Id(0)]
6+
public Guid AccountId { get; init; }
7+
8+
[Id(1)]
9+
public decimal CurrentBalance { get; init; }
10+
}

Nameless.Orleans.Grains/Nameless.Orleans.Grains.csproj

+5
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
<PackageReference Include="Microsoft.Orleans.Reminders.AzureStorage" Version="9.1.2" />
1212
<PackageReference Include="Microsoft.Orleans.Runtime" Version="9.1.2" />
1313
<PackageReference Include="Microsoft.Orleans.Sdk" Version="9.1.2" />
14+
<PackageReference Include="Microsoft.Orleans.Streaming" Version="9.1.2" />
1415
<PackageReference Include="Microsoft.Orleans.Transactions" Version="9.1.2" />
1516
</ItemGroup>
1617

18+
<ItemGroup>
19+
<ProjectReference Include="..\Nameless.Orleans.Core\Nameless.Orleans.Core.csproj" />
20+
</ItemGroup>
21+
1722
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace Nameless.Orleans.Grains.States;
2+
3+
[GenerateSerializer]
4+
public record CustomerState {
5+
[Id(0)]
6+
public Dictionary<Guid, decimal> BalanceByAccountId { get; set; } = [];
7+
}

0 commit comments

Comments
 (0)