Skip to content

Commit 5bababb

Browse files
committed
split up the limiter from the tracker
1 parent 2fba68d commit 5bababb

File tree

6 files changed

+124
-92
lines changed

6 files changed

+124
-92
lines changed

packages/orchestrator/internal/metrics/tracker.go

Lines changed: 14 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,13 @@ import (
1414

1515
"github.com/fsnotify/fsnotify"
1616
"go.uber.org/zap"
17-
"golang.org/x/sync/semaphore"
1817

1918
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox"
20-
featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags"
2119
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
22-
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
2320
)
2421

2522
type Tracker struct {
26-
featureFlags *featureflags.Client
27-
watcher *fsnotify.Watcher
28-
startingSandboxes *semaphore.Weighted
23+
watcher *fsnotify.Watcher
2924

3025
selfPath string
3126
selfSandboxResources *smap.Map[sandbox.Config]
@@ -42,7 +37,7 @@ func (t *Tracker) OnRemove(sandboxID string) {
4237
t.selfSandboxResources.Remove(sandboxID)
4338
}
4439

45-
func NewTracker(maxStartingInstancesPerNode int64, directory string, selfWriteInterval time.Duration, featureFlags *featureflags.Client) (*Tracker, error) {
40+
func NewTracker(directory string, selfWriteInterval time.Duration) (*Tracker, error) {
4641
filename := fmt.Sprintf("%d.json", os.Getpid())
4742
selfPath := filepath.Join(directory, filename)
4843

@@ -59,17 +54,27 @@ func NewTracker(maxStartingInstancesPerNode int64, directory string, selfWriteIn
5954
}
6055

6156
return &Tracker{
62-
featureFlags: featureFlags,
6357
watcher: watcher,
6458
otherMetrics: map[int]Allocations{},
6559

6660
selfPath: selfPath,
6761
selfWriteInterval: selfWriteInterval,
6862
selfSandboxResources: smap.New[sandbox.Config](),
69-
startingSandboxes: semaphore.NewWeighted(maxStartingInstancesPerNode),
7063
}, nil
7164
}
7265

66+
func (t *Tracker) TotalRunningCount() int {
67+
count := t.selfSandboxResources.Count()
68+
69+
t.otherLock.RLock()
70+
for _, item := range t.otherMetrics {
71+
count += int(item.Sandboxes)
72+
}
73+
t.otherLock.RUnlock()
74+
75+
return count
76+
}
77+
7378
func (t *Tracker) getSelfAllocated() Allocations {
7479
var allocated Allocations
7580
for _, item := range t.selfSandboxResources.Items() {
@@ -185,55 +190,6 @@ func (t *Tracker) handleOtherWrite(name string) error {
185190
return nil
186191
}
187192

188-
var ErrTooManyStarting = errors.New("too many starting sandboxes")
189-
190-
type TooManySandboxesRunningError struct {
191-
Current, Max int
192-
}
193-
194-
func (t TooManySandboxesRunningError) Error() string {
195-
return fmt.Sprintf("max number of running sandboxes on node reached (%d), please retry", t.Max)
196-
}
197-
198-
var _ error = TooManySandboxesRunningError{}
199-
200-
type TooManySandboxesStartingError struct {
201-
Current, Max int
202-
}
203-
204-
var _ error = TooManySandboxesStartingError{}
205-
206-
func (t TooManySandboxesStartingError) Error() string {
207-
return fmt.Sprintf("max number of starting sandboxes on node reached (%d), please retry", t.Max)
208-
}
209-
210-
func (t *Tracker) AcquireStarting(ctx context.Context) error {
211-
maxRunningSandboxesPerNode, err := t.featureFlags.IntFlag(ctx, featureflags.MaxSandboxesPerNode)
212-
if err != nil {
213-
zap.L().Error("Failed to get MaxSandboxesPerNode flag", zap.Error(err))
214-
}
215-
216-
runningSandboxes := t.selfSandboxResources.Count()
217-
if runningSandboxes >= maxRunningSandboxesPerNode {
218-
telemetry.ReportEvent(ctx, "max number of running sandboxes reached")
219-
220-
return TooManySandboxesRunningError{runningSandboxes, maxRunningSandboxesPerNode}
221-
}
222-
223-
// Check if we've reached the max number of starting instances on this node
224-
acquired := t.startingSandboxes.TryAcquire(1)
225-
if !acquired {
226-
telemetry.ReportEvent(ctx, "too many starting sandboxes on node")
227-
return ErrTooManyStarting
228-
}
229-
230-
return nil
231-
}
232-
233-
func (t *Tracker) ReleaseStarting() {
234-
defer t.startingSandboxes.Release(1)
235-
}
236-
237193
type Allocations struct {
238194
DiskBytes uint64 `json:"disk_bytes"`
239195
MemoryBytes uint64 `json:"memory_bytes"`

packages/orchestrator/internal/metrics/tracker_test.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,12 @@ import (
1212
"github.com/stretchr/testify/require"
1313

1414
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox"
15-
featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags"
1615
)
1716

1817
func TestTrackerRoundTrip(t *testing.T) {
1918
tempDir := t.TempDir()
2019

21-
flags, err := featureflags.NewClient()
22-
require.NoError(t, err)
23-
24-
tracker, err := NewTracker(1, tempDir, time.Millisecond*100, flags)
20+
tracker, err := NewTracker(tempDir, time.Millisecond*100)
2521
require.NoError(t, err)
2622

2723
ctx, cancel := context.WithCancel(t.Context())
@@ -152,10 +148,7 @@ func TestTrackerRoundTrip(t *testing.T) {
152148
func TestTracker_handleWriteSelf(t *testing.T) {
153149
tempDir := t.TempDir()
154150

155-
flags, err := featureflags.NewClient()
156-
require.NoError(t, err)
157-
158-
tracker, err := NewTracker(1, tempDir, 10*time.Second, flags)
151+
tracker, err := NewTracker(tempDir, 10*time.Second)
159152
require.NoError(t, err)
160153

161154
tracker.OnInsert(&sandbox.Sandbox{
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"go.uber.org/zap"
9+
"golang.org/x/sync/semaphore"
10+
11+
"github.com/e2b-dev/infra/packages/orchestrator/internal/metrics"
12+
featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags"
13+
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
14+
)
15+
16+
type Limiter struct {
17+
featureFlags *featureflags.Client
18+
startingSandboxes *semaphore.Weighted
19+
metricsTracker *metrics.Tracker
20+
}
21+
22+
func NewLimiter(
23+
maxStartingSandboxes int64,
24+
featureFlags *featureflags.Client,
25+
metricsTracker *metrics.Tracker,
26+
) *Limiter {
27+
return &Limiter{
28+
featureFlags: featureFlags,
29+
metricsTracker: metricsTracker,
30+
startingSandboxes: semaphore.NewWeighted(maxStartingSandboxes),
31+
}
32+
}
33+
34+
var ErrTooManyStarting = errors.New("too many starting sandboxes")
35+
36+
type TooManySandboxesRunningError struct {
37+
Current, Max int
38+
}
39+
40+
func (t TooManySandboxesRunningError) Error() string {
41+
return fmt.Sprintf("max number of running sandboxes on node reached (%d), please retry", t.Max)
42+
}
43+
44+
var _ error = TooManySandboxesRunningError{}
45+
46+
type TooManySandboxesStartingError struct {
47+
Current, Max int
48+
}
49+
50+
var _ error = TooManySandboxesStartingError{}
51+
52+
func (t TooManySandboxesStartingError) Error() string {
53+
return fmt.Sprintf("max number of starting sandboxes on node reached (%d), please retry", t.Max)
54+
}
55+
56+
func (t *Limiter) AcquireStarting(ctx context.Context) error {
57+
maxRunningSandboxesPerNode, err := t.featureFlags.IntFlag(ctx, featureflags.MaxSandboxesPerNode)
58+
if err != nil {
59+
zap.L().Error("Failed to get MaxSandboxesPerNode flag", zap.Error(err))
60+
}
61+
62+
runningSandboxes := t.metricsTracker.TotalRunningCount()
63+
if runningSandboxes >= maxRunningSandboxesPerNode {
64+
telemetry.ReportEvent(ctx, "max number of running sandboxes reached")
65+
66+
return TooManySandboxesRunningError{runningSandboxes, maxRunningSandboxesPerNode}
67+
}
68+
69+
// Check if we've reached the max number of starting instances on this node
70+
acquired := t.startingSandboxes.TryAcquire(1)
71+
if !acquired {
72+
telemetry.ReportEvent(ctx, "too many starting sandboxes on node")
73+
return ErrTooManyStarting
74+
}
75+
76+
return nil
77+
}
78+
79+
func (t *Limiter) ReleaseStarting() {
80+
defer t.startingSandboxes.Release(1)
81+
}

packages/orchestrator/internal/server/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
type server struct {
2727
orchestrator.UnimplementedSandboxServiceServer
2828

29+
limiter *Limiter
2930
metricsTracker *metrics.Tracker
3031
sandboxFactory *sandbox.Factory
3132
info *service.ServiceInfo
@@ -62,6 +63,7 @@ type ServiceConfig struct {
6263
Persistence storage.StorageProvider
6364
FeatureFlags *featureflags.Client
6465
SbxEventsService events.EventsService[event.SandboxEvent]
66+
Limiter *Limiter
6567
}
6668

6769
func New(cfg ServiceConfig) *Service {
@@ -82,6 +84,7 @@ func New(cfg ServiceConfig) *Service {
8284
featureFlags: cfg.FeatureFlags,
8385
sbxEventsService: cfg.SbxEventsService,
8486
metricsTracker: cfg.MetricsTracker,
87+
limiter: cfg.Limiter,
8588
}
8689

8790
meter := cfg.Tel.MeterProvider.Meter("orchestrator.sandbox")

packages/orchestrator/internal/server/sandboxes.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"google.golang.org/protobuf/types/known/timestamppb"
1919

2020
clickhouse "github.com/e2b-dev/infra/packages/clickhouse/pkg"
21-
"github.com/e2b-dev/infra/packages/orchestrator/internal/metrics"
2221
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox"
2322
"github.com/e2b-dev/infra/packages/shared/pkg/events/event"
2423
featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags"
@@ -32,10 +31,7 @@ import (
3231

3332
var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/orchestrator/internal/server")
3433

35-
const (
36-
requestTimeout = 60 * time.Second
37-
maxStartingInstancesPerNode = 3
38-
)
34+
const requestTimeout = 60 * time.Second
3935

4036
func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequest) (*orchestrator.SandboxCreateResponse, error) {
4137
// set max request timeout for this request
@@ -69,9 +65,9 @@ func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ
6965
)
7066

7167
// Check if we've reached the max number of starting instances on this node
72-
if err := s.metricsTracker.AcquireStarting(ctx); err != nil {
73-
var tooManyRunning metrics.TooManySandboxesRunningError
74-
var tooManyStarting metrics.TooManySandboxesStartingError
68+
if err := s.limiter.AcquireStarting(ctx); err != nil {
69+
var tooManyRunning TooManySandboxesRunningError
70+
var tooManyStarting TooManySandboxesStartingError
7571
switch {
7672
case errors.As(err, &tooManyRunning):
7773
telemetry.ReportEvent(ctx, "max number of running sandboxes reached")
@@ -84,7 +80,7 @@ func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ
8480
}
8581
}
8682
defer func() {
87-
s.metricsTracker.ReleaseStarting()
83+
s.limiter.ReleaseStarting()
8884
}()
8985

9086
template, err := s.templateCache.GetTemplate(

packages/orchestrator/main.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -229,12 +229,12 @@ func run(config cfg.Config) (success bool) {
229229
zap.L().Fatal("failed to create feature flags client", zap.Error(err))
230230
}
231231

232-
limiter, err := limit.New(ctx, featureFlags)
232+
googleStorageLimiter, err := limit.New(ctx, featureFlags)
233233
if err != nil {
234234
zap.L().Fatal("failed to create limiter", zap.Error(err))
235235
}
236236

237-
persistence, err := storage.GetTemplateStorageProvider(ctx, limiter)
237+
persistence, err := storage.GetTemplateStorageProvider(ctx, googleStorageLimiter)
238238
if err != nil {
239239
zap.L().Fatal("failed to create template storage provider", zap.Error(err))
240240
}
@@ -334,6 +334,20 @@ func run(config cfg.Config) (success bool) {
334334

335335
sandboxFactory := sandbox.NewFactory(networkPool, devicePool, featureFlags, defaultAllowSandboxInternet)
336336

337+
metricsTracker, err := metrics.NewTracker(config.MetricsDirectory, config.MetricsWriteInterval)
338+
if err != nil {
339+
zap.L().Fatal("failed to create metrics tracker", zap.Error(err))
340+
}
341+
sandboxes.Subscribe(metricsTracker)
342+
g.Go(func() error {
343+
if err := metricsTracker.Run(ctx); err != nil {
344+
zap.L().Error("metrics tracker failed", zap.Error(err))
345+
}
346+
return nil
347+
})
348+
349+
limiter := server.NewLimiter(config.MaxStartingInstances, featureFlags, metricsTracker)
350+
337351
server.New(server.ServiceConfig{
338352
SandboxFactory: sandboxFactory,
339353
GRPC: grpcSrv,
@@ -347,6 +361,7 @@ func run(config cfg.Config) (success bool) {
347361
Persistence: persistence,
348362
FeatureFlags: featureFlags,
349363
SbxEventsService: sbxEventsService,
364+
Limiter: limiter,
350365
})
351366

352367
tmplSbxLoggerExternal := sbxlogger.NewLogger(
@@ -379,7 +394,7 @@ func run(config cfg.Config) (success bool) {
379394
sandboxProxy,
380395
featureFlags,
381396
sandboxObserver,
382-
limiter,
397+
googleStorageLimiter,
383398
sandboxEventBatcher,
384399
)
385400

@@ -396,7 +411,7 @@ func run(config cfg.Config) (success bool) {
396411
sandboxes,
397412
templateCache,
398413
persistence,
399-
limiter,
414+
googleStorageLimiter,
400415
serviceInfo,
401416
)
402417
if err != nil {
@@ -406,18 +421,6 @@ func run(config cfg.Config) (success bool) {
406421
closers = append([]Closeable{tmpl}, closers...)
407422
}
408423

409-
metricsTracker, err := metrics.NewTracker(config.MaxStartingInstances, config.MetricsDirectory, config.MetricsWriteInterval, featureFlags)
410-
if err != nil {
411-
zap.L().Fatal("failed to create metrics tracker", zap.Error(err))
412-
}
413-
sandboxes.Subscribe(metricsTracker)
414-
g.Go(func() error {
415-
if err := metricsTracker.Run(ctx); err != nil {
416-
zap.L().Error("metrics tracker failed", zap.Error(err))
417-
}
418-
return nil
419-
})
420-
421424
service.NewInfoService(ctx, grpcSrv.GRPCServer(), serviceInfo, metricsTracker)
422425

423426
g.Go(func() error {

0 commit comments

Comments
 (0)