Skip to content

Commit c078c7b

Browse files
authored
Enforce worker capacity limit across both client and server tcp connections (#1448)
1 parent 1dc7016 commit c078c7b

File tree

9 files changed

+170
-32
lines changed

9 files changed

+170
-32
lines changed

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
*/
1616
package io.aklivity.zilla.runtime.binding.tcp.internal;
1717

18+
import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKER_CAPACITY;
1819
import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT;
1920
import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER;
2021

2122
import java.util.EnumMap;
2223
import java.util.Map;
2324

25+
import org.agrona.collections.MutableInteger;
26+
2427
import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpClientFactory;
2528
import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpServerFactory;
2629
import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpStreamFactory;
@@ -38,9 +41,10 @@ final class TcpBindingContext implements BindingContext
3841
TcpConfiguration config,
3942
EngineContext context)
4043
{
44+
MutableInteger capacity = new MutableInteger(ENGINE_WORKER_CAPACITY.getAsInt(config));
4145
Map<KindConfig, TcpStreamFactory> factories = new EnumMap<>(KindConfig.class);
42-
factories.put(SERVER, new TcpServerFactory(config, context));
43-
factories.put(CLIENT, new TcpClientFactory(config, context));
46+
factories.put(SERVER, new TcpServerFactory(config, context, capacity));
47+
factories.put(CLIENT, new TcpClientFactory(config, context, capacity));
4448

4549
this.factories = factories;
4650
}

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.agrona.CloseHelper;
4141
import org.agrona.DirectBuffer;
4242
import org.agrona.MutableDirectBuffer;
43+
import org.agrona.collections.MutableInteger;
4344
import org.agrona.concurrent.UnsafeBuffer;
4445

4546
import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig;
@@ -98,9 +99,10 @@ public class TcpClientFactory implements TcpStreamFactory
9899

99100
public TcpClientFactory(
100101
TcpConfiguration config,
101-
EngineContext context)
102+
EngineContext context,
103+
MutableInteger capacity)
102104
{
103-
this.router = new TcpClientRouter(context);
105+
this.router = new TcpClientRouter(context, capacity);
104106
this.writeBuffer = context.writeBuffer();
105107
this.writeByteBuffer = ByteBuffer.allocateDirect(writeBuffer.capacity()).order(nativeOrder());
106108
this.bufferPool = context.bufferPool();
@@ -207,6 +209,7 @@ private void closeNet(
207209
SocketChannel network)
208210
{
209211
CloseHelper.quietClose(network);
212+
router.close();
210213
}
211214

212215
private final class TcpClient

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java

+35-15
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import java.util.Optional;
2626
import java.util.function.Function;
2727
import java.util.function.Predicate;
28-
import java.util.stream.Collectors;
2928

3029
import org.agrona.collections.Long2ObjectHashMap;
30+
import org.agrona.collections.MutableInteger;
3131

3232
import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig;
3333
import io.aklivity.zilla.runtime.binding.tcp.internal.TcpEventContext;
@@ -49,13 +49,16 @@ public final class TcpClientRouter
4949
private final byte[] ipv6ros = new byte[16];
5050

5151
private final Function<String, InetAddress[]> resolveHost;
52+
private final MutableInteger capacity;
5253
private final Long2ObjectHashMap<TcpBindingConfig> bindings;
5354
private final TcpEventContext event;
5455

5556
public TcpClientRouter(
56-
EngineContext context)
57+
EngineContext context,
58+
MutableInteger capacity)
5759
{
5860
this.resolveHost = context::resolveHost;
61+
this.capacity = capacity;
5962
this.bindings = new Long2ObjectHashMap<>();
6063
this.event = new TcpEventContext(context);
6164
}
@@ -83,8 +86,14 @@ public InetSocketAddress resolve(
8386

8487
InetSocketAddress resolved = null;
8588

89+
resolve:
8690
try
8791
{
92+
if (capacity.get() <= 0)
93+
{
94+
break resolve;
95+
}
96+
8897
if (beginEx == null)
8998
{
9099
InetAddress[] addresses = options != null ? resolveHost(options.host) : null;
@@ -113,7 +122,7 @@ else if (binding.routes == TcpBindingConfig.DEFAULT_CLIENT_ROUTES)
113122
final List<InetSocketAddress> authorities = Arrays
114123
.stream(resolveHost(authorityInfo.authority().asString()))
115124
.map(a -> new InetSocketAddress(a, port))
116-
.collect(Collectors.toList());
125+
.toList();
117126

118127
for (InetSocketAddress authority : authorities)
119128
{
@@ -144,7 +153,7 @@ else if (binding.routes == TcpBindingConfig.DEFAULT_CLIENT_ROUTES)
144153
final List<InetSocketAddress> host = Arrays
145154
.stream(resolveHost(options.host))
146155
.map(a -> new InetSocketAddress(a, port))
147-
.collect(Collectors.toList());
156+
.toList();
148157

149158
for (TcpRouteConfig route : binding.routes)
150159
{
@@ -167,20 +176,13 @@ else if (binding.routes == TcpBindingConfig.DEFAULT_CLIENT_ROUTES)
167176
{
168177
event.dnsFailed(traceId, binding.id, ex.hostname);
169178
}
170-
return resolved;
171-
}
172179

173-
private InetAddress[] resolveHost(
174-
String hostname)
175-
{
176-
try
177-
{
178-
return resolveHost.apply(hostname);
179-
}
180-
catch (Throwable ex)
180+
if (resolved != null)
181181
{
182-
throw new TcpDnsFailedException(ex, hostname);
182+
capacity.decrementAndGet();
183183
}
184+
185+
return resolved;
184186
}
185187

186188
public void detach(
@@ -189,12 +191,30 @@ public void detach(
189191
bindings.remove(bindingId);
190192
}
191193

194+
public void close()
195+
{
196+
capacity.decrementAndGet();
197+
}
198+
192199
@Override
193200
public String toString()
194201
{
195202
return String.format("%s %s", getClass().getSimpleName(), bindings);
196203
}
197204

205+
private InetAddress[] resolveHost(
206+
String hostname)
207+
{
208+
try
209+
{
210+
return resolveHost.apply(hostname);
211+
}
212+
catch (Throwable ex)
213+
{
214+
throw new TcpDnsFailedException(ex, hostname);
215+
}
216+
}
217+
198218
private InetSocketAddress resolve(
199219
ProxyAddressFW address,
200220
long authorization,

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.agrona.DirectBuffer;
4141
import org.agrona.LangUtil;
4242
import org.agrona.MutableDirectBuffer;
43+
import org.agrona.collections.MutableInteger;
4344
import org.agrona.concurrent.UnsafeBuffer;
4445

4546
import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig;
@@ -101,9 +102,10 @@ public class TcpServerFactory implements TcpStreamFactory
101102

102103
public TcpServerFactory(
103104
TcpConfiguration config,
104-
EngineContext context)
105+
EngineContext context,
106+
MutableInteger capacity)
105107
{
106-
this.router = new TcpServerRouter(config, context, this::handleAccept);
108+
this.router = new TcpServerRouter(context, this::handleAccept, capacity);
107109
this.writeBuffer = context.writeBuffer();
108110
this.writeByteBuffer = ByteBuffer.allocateDirect(writeBuffer.capacity()).order(nativeOrder());
109111
this.bufferPool = context.bufferPool();

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java

+9-11
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package io.aklivity.zilla.runtime.binding.tcp.internal.stream;
1717

18-
import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKER_CAPACITY;
1918
import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER;
2019
import static java.nio.channels.SelectionKey.OP_ACCEPT;
2120

@@ -28,8 +27,8 @@
2827

2928
import org.agrona.CloseHelper;
3029
import org.agrona.collections.Long2ObjectHashMap;
30+
import org.agrona.collections.MutableInteger;
3131

32-
import io.aklivity.zilla.runtime.binding.tcp.internal.TcpConfiguration;
3332
import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig;
3433
import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpServerBindingConfig;
3534
import io.aklivity.zilla.runtime.engine.EngineContext;
@@ -41,20 +40,20 @@ public final class TcpServerRouter
4140
private final ToIntFunction<PollerKey> acceptHandler;
4241
private final Function<SelectableChannel, PollerKey> supplyPollerKey;
4342
private final Long2ObjectHashMap<TcpServerBindingConfig> serversById;
43+
private final MutableInteger capacity;
4444

45-
private int capacity;
4645
private boolean unbound;
4746

4847
public TcpServerRouter(
49-
TcpConfiguration config,
5048
EngineContext context,
51-
ToIntFunction<PollerKey> acceptHandler)
49+
ToIntFunction<PollerKey> acceptHandler,
50+
MutableInteger capacity)
5251
{
53-
this.capacity = ENGINE_WORKER_CAPACITY.getAsInt(config);
5452
this.bindings = new Long2ObjectHashMap<>();
5553
this.supplyPollerKey = context::supplyPollerKey;
5654
this.acceptHandler = acceptHandler;
5755
this.serversById = new Long2ObjectHashMap<>();
56+
this.capacity = capacity;
5857
}
5958

6059
public void attach(
@@ -90,17 +89,17 @@ public SocketChannel accept(
9089
{
9190
SocketChannel channel = null;
9291

93-
if (capacity > 0)
92+
if (capacity.get() > 0)
9493
{
9594
channel = server.accept();
9695

9796
if (channel != null)
9897
{
99-
capacity--;
98+
capacity.decrementAndGet();
10099
}
101100
}
102101

103-
if (!unbound && capacity <= 0)
102+
if (!unbound && capacity.get() <= 0)
104103
{
105104
bindings.values().stream()
106105
.filter(b -> b.kind == SERVER)
@@ -115,9 +114,8 @@ public void close(
115114
SocketChannel channel)
116115
{
117116
CloseHelper.quietClose(channel);
118-
capacity++;
119117

120-
if (unbound && capacity > 0)
118+
if (unbound && capacity.incrementAndGet() > 0)
121119
{
122120
bindings.values().stream()
123121
.filter(b -> b.kind == SERVER)

runtime/binding-tcp/src/test/java/io/aklivity/zilla/runtime/binding/tcp/internal/streams/ClientIT.java

+29
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,35 @@ public void shouldWriteDataAfterReceiveEnd() throws Exception
342342
}
343343
}
344344

345+
@Test
346+
@Configuration("client.host.yaml")
347+
@Specification({
348+
"${app}/max.connections.reset/client"
349+
})
350+
@Configure(
351+
name = "zilla.engine.worker.capacity",
352+
value = "2")
353+
public void shouldResetWhenConnectionsExceeded() throws Exception
354+
{
355+
try (ServerSocketChannel server = ServerSocketChannel.open())
356+
{
357+
server.setOption(SO_REUSEADDR, true);
358+
server.bind(new InetSocketAddress("127.0.0.1", 12345));
359+
360+
k3po.start();
361+
362+
for (int i = 1; i < 3; i++)
363+
{
364+
try (SocketChannel channel = server.accept())
365+
{
366+
k3po.notifyBarrier("CONNECTION_ACCEPTED_" + i);
367+
}
368+
}
369+
370+
k3po.finish();
371+
}
372+
}
373+
345374
public static InetAddress[] resolveHost(
346375
String host) throws UnknownHostException
347376
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#
2+
# Copyright 2021-2024 Aklivity Inc.
3+
#
4+
# Aklivity licenses this file to you under the Apache License,
5+
# version 2.0 (the "License"); you may not use this file except in compliance
6+
# with the License. You may obtain a copy of the License at:
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations
14+
# under the License.
15+
#
16+
17+
connect "zilla://streams/app0"
18+
option zilla:window 8192
19+
option zilla:transmission "duplex"
20+
connected
21+
write await CONNECTION_ACCEPTED_1
22+
write close
23+
read closed
24+
25+
connect "zilla://streams/app0"
26+
option zilla:window 8192
27+
option zilla:transmission "duplex"
28+
connected
29+
write await CONNECTION_ACCEPTED_2
30+
write close
31+
read closed
32+
33+
connect "zilla://streams/app0"
34+
option zilla:window 8192
35+
option zilla:transmission "duplex"
36+
connect aborted
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#
2+
# Copyright 2021-2024 Aklivity Inc.
3+
#
4+
# Aklivity licenses this file to you under the Apache License,
5+
# version 2.0 (the "License"); you may not use this file except in compliance
6+
# with the License. You may obtain a copy of the License at:
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
# License for the specific language governing permissions and limitations
14+
# under the License.
15+
#
16+
17+
property serverInitialWindow 8192
18+
19+
accept "zilla://streams/app0"
20+
option zilla:window ${serverInitialWindow}
21+
option zilla:transmission "duplex"
22+
accepted
23+
connected
24+
read notify CONNECTION_ACCEPTED_1
25+
read closed
26+
write close
27+
write notify CLOSED
28+
29+
30+
accepted
31+
connected
32+
read notify CONNECTION_ACCEPTED_2
33+
read closed
34+
write close
35+
write notify CLOSED
36+
37+
rejected

specs/binding-tcp.spec/src/test/java/io/aklivity/zilla/specs/binding/tcp/streams/ApplicationIT.java

+9
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,15 @@ public void maxConnections() throws Exception
212212
k3po.finish();
213213
}
214214

215+
@Test
216+
@Specification({
217+
"${app}/max.connections.reset/client",
218+
"${app}/max.connections.reset/server" })
219+
public void maxConnectionsReset() throws Exception
220+
{
221+
k3po.finish();
222+
}
223+
215224
@Test
216225
@Specification({
217226
"${app}/connection.established/client",

0 commit comments

Comments
 (0)