18
18
import static java .util .concurrent .TimeUnit .MILLISECONDS ;
19
19
import static java .util .concurrent .TimeUnit .NANOSECONDS ;
20
20
import static java .util .concurrent .TimeUnit .SECONDS ;
21
+ import static org .agrona .BitUtil .findNextPositivePowerOfTwo ;
21
22
22
23
import java .io .File ;
23
24
import java .lang .invoke .MethodHandle ;
24
25
import java .lang .invoke .MethodHandles ;
25
26
import java .lang .invoke .MethodType ;
27
+ import java .lang .management .ManagementFactory ;
26
28
import java .net .InetAddress ;
27
29
import java .net .MalformedURLException ;
28
30
import java .net .URI ;
37
39
38
40
import org .agrona .LangUtil ;
39
41
42
+ import com .sun .management .OperatingSystemMXBean ;
43
+
40
44
import io .aklivity .zilla .runtime .engine .internal .layouts .BudgetsLayout ;
41
45
42
46
public class EngineConfiguration extends Configuration
@@ -55,6 +59,7 @@ public class EngineConfiguration extends Configuration
55
59
public static final PropertyDef <Path > ENGINE_CACHE_DIRECTORY ;
56
60
public static final PropertyDef <HostResolver > ENGINE_HOST_RESOLVER ;
57
61
public static final IntPropertyDef ENGINE_WORKER_CAPACITY ;
62
+ public static final DoublePropertyDef ENGINE_MEMORY_PERCENTAGE ;
58
63
public static final IntPropertyDef ENGINE_BUFFER_POOL_CAPACITY ;
59
64
public static final IntPropertyDef ENGINE_BUFFER_SLOT_CAPACITY ;
60
65
public static final IntPropertyDef ENGINE_STREAMS_BUFFER_CAPACITY ;
@@ -101,13 +106,13 @@ public class EngineConfiguration extends Configuration
101
106
ENGINE_CACHE_DIRECTORY = config .property (Path .class , "cache.directory" , EngineConfiguration ::cacheDirectory , "cache" );
102
107
ENGINE_HOST_RESOLVER = config .property (HostResolver .class , "host.resolver" ,
103
108
EngineConfiguration ::decodeHostResolver , EngineConfiguration ::defaultHostResolver );
104
- ENGINE_WORKER_CAPACITY = config .property ("worker.capacity" , 64 );
109
+ ENGINE_MEMORY_PERCENTAGE = config .property ("memory.percentage" , 0.75 );
110
+ ENGINE_WORKER_CAPACITY = config .property ("worker.capacity" , EngineConfiguration ::defaultWorkersCapacity );
105
111
ENGINE_BUFFER_POOL_CAPACITY = config .property ("buffer.pool.capacity" , EngineConfiguration ::defaultBufferPoolCapacity );
106
112
ENGINE_BUFFER_SLOT_CAPACITY = config .property ("buffer.slot.capacity" , 64 * 1024 );
107
113
ENGINE_STREAMS_BUFFER_CAPACITY = config .property ("streams.buffer.capacity" ,
108
114
EngineConfiguration ::defaultStreamsBufferCapacity );
109
- ENGINE_EVENTS_BUFFER_CAPACITY = config .property ("events.buffer.capacity" ,
110
- EngineConfiguration ::defaultEventsBufferCapacity );
115
+ ENGINE_EVENTS_BUFFER_CAPACITY = config .property ("events.buffer.capacity" , 4 * 64 * 1024 );
111
116
ENGINE_BUDGETS_BUFFER_CAPACITY = config .property ("budgets.buffer.capacity" ,
112
117
EngineConfiguration ::defaultBudgetsBufferCapacity );
113
118
ENGINE_COUNTERS_BUFFER_CAPACITY = config .property ("counters.buffer.capacity" , 1024 * 1024 );
@@ -356,17 +361,37 @@ private static int defaultStreamsBufferCapacity(
356
361
return ENGINE_BUFFER_SLOT_CAPACITY .get (config ) * ENGINE_WORKER_CAPACITY .getAsInt (config );
357
362
}
358
363
359
- private static int defaultEventsBufferCapacity (
364
+ private static int defaultBudgetsBufferCapacity (
360
365
Configuration config )
361
366
{
362
- return ENGINE_BUFFER_SLOT_CAPACITY .get (config ) * ENGINE_WORKER_CAPACITY .getAsInt (config );
367
+ // more consistent with original defaults
368
+ return BudgetsLayout .SIZEOF_BUDGET_ENTRY * 512 * ENGINE_WORKER_CAPACITY .getAsInt (config );
363
369
}
364
370
365
- private static int defaultBudgetsBufferCapacity (
371
+ private static int defaultWorkersCapacity (
366
372
Configuration config )
367
373
{
368
- // more consistent with original defaults
369
- return BudgetsLayout .SIZEOF_BUDGET_ENTRY * 512 * ENGINE_WORKER_CAPACITY .getAsInt (config );
374
+ OperatingSystemMXBean osBean =
375
+ (OperatingSystemMXBean ) ManagementFactory .getOperatingSystemMXBean ();
376
+
377
+ final int numberOfCores = osBean .getAvailableProcessors ();
378
+ final long totalMemorySize = osBean .getTotalMemorySize ();
379
+
380
+ final int slotCapacity = ENGINE_BUFFER_SLOT_CAPACITY .get (config );
381
+ final double percentMemory = ENGINE_MEMORY_PERCENTAGE .get (config );
382
+
383
+ long maxAllowedForBuffers = (long ) (percentMemory * totalMemorySize );
384
+
385
+ // Streams + Pool
386
+ long bufferCapacity = slotCapacity + slotCapacity ;
387
+ long eventsBufferCapacity = ENGINE_EVENTS_BUFFER_CAPACITY .get (config );
388
+ long budgetBufferCapacity = BudgetsLayout .SIZEOF_BUDGET_ENTRY * 512L ;
389
+ long totalBufferCapacity = numberOfCores * (bufferCapacity + budgetBufferCapacity + eventsBufferCapacity );
390
+ int newWorkersCapacity = (int ) (maxAllowedForBuffers / totalBufferCapacity );
391
+
392
+ newWorkersCapacity = findNextPositivePowerOfTwo (newWorkersCapacity );
393
+
394
+ return newWorkersCapacity ;
370
395
}
371
396
372
397
private static URL configURL (
0 commit comments