1
+ using System . Collections . Concurrent ;
1
2
using System . Net ;
2
3
using System . Net . Sockets ;
3
4
using Microsoft . Extensions . Hosting ;
@@ -12,18 +13,15 @@ namespace NexusMods.SingleProcess;
12
13
/// A long-running service that listens for incoming connections from clients and executes them as if they ran
13
14
/// on as CLI command.
14
15
/// </summary>
15
- public class CliServer : IHostedService , IDisposable
16
+ public sealed class CliServer : IHostedService , IDisposable
16
17
{
17
18
private readonly CancellationTokenSource _cancellationTokenSource = new ( ) ;
18
19
private CancellationToken Token => _cancellationTokenSource . Token ;
19
20
20
21
private bool _started ;
21
22
22
23
private TcpListener ? _tcpListener ;
23
- private Task ? _listenerTask ;
24
-
25
- private readonly List < Task > _runningClients = [ ] ;
26
- private readonly CliSettings _settings ;
24
+ private readonly ConcurrentDictionary < Guid , Task > _runningClients = [ ] ;
27
25
28
26
private readonly IServiceProvider _serviceProvider ;
29
27
private readonly ILogger < CliServer > _logger ;
@@ -45,7 +43,7 @@ public CliServer(
45
43
_configurator = configurator ;
46
44
_syncFile = syncFile ;
47
45
48
- _settings = settingsManager . Get < CliSettings > ( ) ;
46
+ settingsManager . Get < CliSettings > ( ) ;
49
47
}
50
48
51
49
/// <summary>
@@ -65,7 +63,7 @@ private Task StartTcpListenerAsync()
65
63
{
66
64
_tcpListener = new TcpListener ( IPAddress . Loopback , 0 ) ;
67
65
_tcpListener . Start ( ) ;
68
- _listenerTask = Task . Run ( async ( ) => await StartListeningAsync ( ) , _cancellationTokenSource . Token ) ;
66
+ Task . Run ( async ( ) => await StartListeningAsync ( ) , _cancellationTokenSource . Token ) ;
69
67
var port = ( ( IPEndPoint ) _tcpListener . LocalEndpoint ) . Port ;
70
68
71
69
if ( ! _syncFile . TrySetMain ( port ) )
@@ -84,83 +82,60 @@ private async Task StartListeningAsync()
84
82
{
85
83
try
86
84
{
87
- CleanClosedConnections ( ) ;
88
-
89
- // Create a timeout token, and combine it with the main cancellation token
90
- var timeout = new CancellationTokenSource ( delay : _settings . ListenTimeout ) ;
91
- var combined = CancellationTokenSource . CreateLinkedTokenSource ( Token , timeout . Token ) ;
92
-
93
- var found = await _tcpListener ! . AcceptTcpClientAsync ( combined . Token ) ;
85
+ var found = await _tcpListener ! . AcceptTcpClientAsync ( Token ) ;
94
86
found . NoDelay = true ; // Disable Nagle's algorithm to reduce delay.
95
- _runningClients . Add ( Task . Run ( ( ) => HandleClientAsync ( found ) , Token ) ) ;
96
87
97
- _logger . LogInformation ( "Accepted TCP connection from {RemoteEndPoint}" ,
98
- ( ( IPEndPoint ) found . Client . RemoteEndPoint ! ) . Port
99
- ) ;
88
+ var id = Guid . NewGuid ( ) ;
89
+ var task = Task . Run ( ( ) => HandleClientAsync ( id , found ) , Token ) ;
90
+ _ = _runningClients . GetOrAdd ( id , task ) ;
91
+
92
+ _logger . LogInformation ( "Accepted TCP connection from {RemoteEndPoint}" , ( ( IPEndPoint ) found . Client . RemoteEndPoint ! ) . Port ) ;
100
93
}
101
94
catch ( OperationCanceledException )
102
95
{
103
- // The cancellation could be from the timeout, or the main cancellation token, if it's the
104
- // timeout, then we should just continue, if it's the main cancellation token, then we should stop
105
- if ( ! Token . IsCancellationRequested )
106
- continue ;
107
96
_logger . LogInformation ( "TCP listener was cancelled, stopping" ) ;
108
97
return ;
109
98
}
110
99
catch ( Exception ex )
111
100
{
112
101
_logger . LogError ( ex , "Got an exception while accepting a client connection" ) ;
113
102
}
114
-
115
103
}
116
104
}
117
-
105
+
118
106
/// <summary>
119
107
/// Handle a client connection
120
108
/// </summary>
121
- /// <param name="client"></param>
122
- /// <returns></returns>
123
- /// <exception cref="NotImplementedException"></exception>
124
- private async Task HandleClientAsync ( TcpClient client )
109
+ private async Task HandleClientAsync ( Guid id , TcpClient client )
125
110
{
126
- var stream = client . GetStream ( ) ;
111
+ try
112
+ {
113
+ var stream = client . GetStream ( ) ;
127
114
128
- var ( arguments , renderer ) = await ProxiedRenderer . Create ( _serviceProvider , stream ) ;
129
- await _configurator . RunAsync ( arguments , renderer , Token ) ;
130
- client . Dispose ( ) ;
131
- }
132
-
133
- /// <summary>
134
- /// Clears up any closed connections in the <see cref="_runningClients"/> dictionary
135
- /// </summary>
136
- /// <exception cref="NotImplementedException"></exception>
137
- private void CleanClosedConnections ( )
138
- {
139
- // Snapshot the dictionary before we modify it
140
- foreach ( var task in _runningClients . ToArray ( ) )
115
+ var ( arguments , renderer ) = await ProxiedRenderer . Create ( _serviceProvider , stream ) ;
116
+ await _configurator . RunAsync ( arguments , renderer , Token ) ;
117
+ }
118
+ finally
141
119
{
142
- if ( task . IsCompleted )
143
- _runningClients . Remove ( task ) ;
120
+ client . Dispose ( ) ;
121
+ _runningClients . Remove ( id , out _ ) ;
144
122
}
145
123
}
146
-
124
+
147
125
/// <inheritdoc />
148
- public Task StartAsync ( CancellationToken cancellationToken )
149
- {
150
- return Task . CompletedTask ;
151
- }
126
+ public Task StartAsync ( CancellationToken cancellationToken ) => Task . CompletedTask ;
152
127
153
128
/// <inheritdoc />
154
129
public async Task StopAsync ( CancellationToken cancellationToken )
155
130
{
156
131
if ( ! _started ) return ;
157
-
132
+
158
133
// Ditch this value and don't wait on it because it otherwise blocks the shutdown even when *no-one* is
159
134
// waiting on the token
160
135
_ = _cancellationTokenSource . CancelAsync ( ) ;
161
-
136
+
162
137
_tcpListener ? . Stop ( ) ;
163
- await Task . WhenAll ( _runningClients ) ;
138
+ await Task . WhenAll ( _runningClients . Values . ToArray ( ) ) ;
164
139
_started = false ;
165
140
}
166
141
0 commit comments