@@ -19,10 +19,8 @@ import (
19
19
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
20
20
)
21
21
22
- var (
23
- //go:embed ext/flipt_engine_wasm.wasm
24
- wasm []byte
25
- )
22
+ //go:embed ext/flipt_engine_wasm.wasm
23
+ var wasm []byte
26
24
27
25
const (
28
26
statusSuccess = "success"
@@ -65,11 +63,6 @@ type EvaluationClient struct {
65
63
errChan chan error
66
64
snapshotChan chan snapshot
67
65
68
- // cached WASM functions
69
- allocFunc api.Function
70
- deallocFunc api.Function
71
- snapshotFunc api.Function
72
-
73
66
closeOnce sync.Once
74
67
}
75
68
@@ -178,13 +171,6 @@ func NewEvaluationClient(ctx context.Context, opts ...ClientOption) (_ *Evaluati
178
171
return
179
172
}
180
173
181
- var (
182
- // cache common WASM functions
183
- allocFunc = mod .ExportedFunction (fAllocate )
184
- deallocFunc = mod .ExportedFunction (fDeallocate )
185
- snapshotFunc = mod .ExportedFunction (fSnapshot )
186
- )
187
-
188
174
ctx , cancel := context .WithCancel (ctx )
189
175
190
176
client := & EvaluationClient {
@@ -202,11 +188,6 @@ func NewEvaluationClient(ctx context.Context, opts ...ClientOption) (_ *Evaluati
202
188
cancel : cancel ,
203
189
errChan : make (chan error , 1 ),
204
190
snapshotChan : make (chan snapshot , 1 ),
205
-
206
- // cache WASM functions
207
- allocFunc : allocFunc ,
208
- deallocFunc : deallocFunc ,
209
- snapshotFunc : snapshotFunc ,
210
191
}
211
192
212
193
for _ , opt := range opts {
@@ -250,7 +231,10 @@ func NewEvaluationClient(ctx context.Context, opts ...ClientOption) (_ *Evaluati
250
231
client .httpClient .Timeout = client .requestTimeout
251
232
}
252
233
253
- var initializeEngine = mod .ExportedFunction (fInitializeEngine )
234
+ var (
235
+ initializeEngine = mod .ExportedFunction (fInitializeEngine )
236
+ allocFunc = mod .ExportedFunction (fAllocate )
237
+ )
254
238
255
239
// allocate namespace
256
240
nsPtr , err := allocFunc .Call (ctx , uint64 (len (client .namespace )))
@@ -463,11 +447,11 @@ func (e *EvaluationClient) EvaluateBatch(ctx context.Context, requests []*Evalua
463
447
// ListFlags lists all flags.
464
448
func (e * EvaluationClient ) ListFlags (ctx context.Context ) ([]Flag , error ) {
465
449
e .mu .RLock ()
450
+ defer e .mu .RUnlock ()
451
+
466
452
if e .err != nil && e .errorStrategy == ErrorStrategyFail {
467
- e .mu .RUnlock ()
468
453
return nil , e .err
469
454
}
470
- e .mu .RUnlock ()
471
455
472
456
if e .engine == 0 {
473
457
return nil , errors .New ("engine not initialized" )
@@ -484,27 +468,35 @@ func (e *EvaluationClient) ListFlags(ctx context.Context) ([]Flag, error) {
484
468
}
485
469
486
470
ptr , length := decodePtr (res [0 ])
487
- defer e . deallocFunc . Call ( ctx , uint64 ( ptr ), uint64 ( length ) )
471
+ deallocFunc := e . mod . ExportedFunction ( fDeallocate )
488
472
489
473
b , ok := e .mod .Memory ().Read (ptr , length )
490
474
if ! ok {
475
+ deallocFunc .Call (ctx , uint64 (ptr ), uint64 (length ))
491
476
return nil , fmt .Errorf ("failed to read result from memory" )
492
477
}
493
478
494
- var result * ListFlagsResult
495
- if err := json .Unmarshal (b , & result ); err != nil {
479
+ // make a copy of the result before deallocating
480
+ result := make ([]byte , len (b ))
481
+ copy (result , b )
482
+
483
+ // clean up memory
484
+ deallocFunc .Call (ctx , uint64 (ptr ), uint64 (length ))
485
+
486
+ var listResult * ListFlagsResult
487
+ if err := json .Unmarshal (result , & listResult ); err != nil {
496
488
return nil , fmt .Errorf ("failed to unmarshal flags: %w" , err )
497
489
}
498
490
499
- if result == nil {
491
+ if listResult == nil {
500
492
return nil , errors .New ("failed to unmarshal flags: nil" )
501
493
}
502
494
503
- if result .Status != statusSuccess {
504
- return nil , errors .New (result .ErrorMessage )
495
+ if listResult .Status != statusSuccess {
496
+ return nil , errors .New (listResult .ErrorMessage )
505
497
}
506
498
507
- return * result .Result , nil
499
+ return * listResult .Result , nil
508
500
}
509
501
510
502
// Close cleans up the allocated resources.
@@ -544,47 +536,57 @@ type snapshot struct {
544
536
}
545
537
546
538
func (e * EvaluationClient ) handleUpdates (ctx context.Context ) error {
539
+ var (
540
+ allocFunc = e .mod .ExportedFunction (fAllocate )
541
+ deallocFunc = e .mod .ExportedFunction (fDeallocate )
542
+ snapshotFunc = e .mod .ExportedFunction (fSnapshot )
543
+ )
544
+
547
545
for {
548
546
select {
549
547
case <- ctx .Done ():
550
548
close (e .snapshotChan )
551
549
return nil
552
550
case s , ok := <- e .snapshotChan :
553
551
if ! ok {
554
- // we are likely shutting down
555
552
return nil
556
553
}
557
554
558
555
e .mu .Lock ()
559
556
e .etag = s .etag
560
557
e .mu .Unlock ()
561
558
562
- // skip update if no changes (304 response) or error
563
559
if len (s .payload ) == 0 {
564
560
continue
565
561
}
566
562
563
+ e .mu .Lock ()
567
564
// allocate memory for the new payload
568
- pmPtr , err := e . allocFunc .Call (ctx , uint64 (len (s .payload )))
565
+ pmPtr , err := allocFunc .Call (ctx , uint64 (len (s .payload )))
569
566
if err != nil {
567
+ e .mu .Unlock ()
570
568
return fmt .Errorf ("failed to allocate memory for payload: %w" , err )
571
569
}
572
570
573
571
// write the new payload to memory
574
572
if ! e .mod .Memory ().Write (uint32 (pmPtr [0 ]), s .payload ) {
575
- e .deallocFunc .Call (ctx , uint64 (pmPtr [0 ]), uint64 (len (s .payload )))
573
+ e .mu .Unlock ()
574
+ deallocFunc .Call (ctx , uint64 (pmPtr [0 ]), uint64 (len (s .payload )))
576
575
return fmt .Errorf ("failed to write payload to memory" )
577
576
}
578
577
579
- // update the engine with the new snapshot
580
- _ , err = e .snapshotFunc .Call (ctx , uint64 (e .engine ), pmPtr [0 ], uint64 (len (s .payload )))
578
+ // update the engine with the new snapshot while holding the lock
579
+ res , err := snapshotFunc .Call (ctx , uint64 (e .engine ), pmPtr [0 ], uint64 (len (s .payload )))
580
+
581
+ ptr , length := decodePtr (res [0 ])
582
+ // always deallocate the memory after we're done with it
583
+ deallocFunc .Call (ctx , uint64 (pmPtr [0 ]), uint64 (len (s .payload )))
584
+ deallocFunc .Call (ctx , uint64 (ptr ), uint64 (length ))
585
+
586
+ e .mu .Unlock ()
581
587
if err != nil {
582
- e .deallocFunc .Call (ctx , uint64 (pmPtr [0 ]), uint64 (len (s .payload )))
583
588
return fmt .Errorf ("failed to update engine: %w" , err )
584
589
}
585
-
586
- // clean up the memory we allocated for the payload
587
- e .deallocFunc .Call (ctx , uint64 (pmPtr [0 ]), uint64 (len (s .payload )))
588
590
}
589
591
}
590
592
}
@@ -665,7 +667,7 @@ func (e *EvaluationClient) fetch(ctx context.Context, etag string) (snapshot, er
665
667
}
666
668
667
669
if resp .StatusCode == http .StatusNotModified {
668
- return snapshot {}, nil
670
+ return snapshot {etag : etag }, nil
669
671
}
670
672
671
673
if resp .StatusCode != http .StatusOK {
@@ -734,13 +736,13 @@ func (e *EvaluationClient) startStreaming(ctx context.Context) {
734
736
case <- ctx .Done ():
735
737
return
736
738
default :
737
- // Create a channel to receive the read result
739
+ // create a channel to receive the read result
738
740
readChan := make (chan struct {
739
741
line []byte
740
742
err error
741
743
})
742
744
743
- // Start a goroutine to perform the blocking read
745
+ // start a goroutine to perform the blocking read
744
746
go func () {
745
747
line , err := reader .ReadBytes ('\n' )
746
748
readChan <- struct {
@@ -749,7 +751,7 @@ func (e *EvaluationClient) startStreaming(ctx context.Context) {
749
751
}{line , err }
750
752
}()
751
753
752
- // Wait for either the read to complete or context cancellation
754
+ // wait for either the read to complete or context cancellation
753
755
select {
754
756
case <- ctx .Done ():
755
757
return
@@ -784,42 +786,60 @@ func (e *EvaluationClient) evaluateWASM(ctx context.Context, funcName string, re
784
786
return nil , errors .New ("engine not initialized" )
785
787
}
786
788
789
+ var (
790
+ allocFunc = e .mod .ExportedFunction (fAllocate )
791
+ deallocFunc = e .mod .ExportedFunction (fDeallocate )
792
+ )
793
+
787
794
reqBytes , err := json .Marshal (request )
788
795
if err != nil {
789
796
return nil , fmt .Errorf ("failed to marshal request: %w" , err )
790
797
}
791
798
792
- reqPtr , err := e .allocFunc .Call (ctx , uint64 (len (reqBytes )))
799
+ e .mu .Lock ()
800
+ reqPtr , err := allocFunc .Call (ctx , uint64 (len (reqBytes )))
793
801
if err != nil {
802
+ e .mu .Unlock ()
794
803
return nil , fmt .Errorf ("failed to allocate memory for request: %w" , err )
795
804
}
796
- defer e .deallocFunc .Call (ctx , reqPtr [0 ], uint64 (len (reqBytes )))
797
805
798
806
if ! e .mod .Memory ().Write (uint32 (reqPtr [0 ]), reqBytes ) {
807
+ deallocFunc .Call (ctx , reqPtr [0 ], uint64 (len (reqBytes )))
808
+ e .mu .Unlock ()
799
809
return nil , fmt .Errorf ("failed to write request to memory" )
800
810
}
801
811
802
812
evalFunc := e .mod .ExportedFunction (funcName )
803
813
res , err := evalFunc .Call (ctx , uint64 (e .engine ), reqPtr [0 ], uint64 (len (reqBytes )))
804
814
if err != nil {
815
+ deallocFunc .Call (ctx , reqPtr [0 ], uint64 (len (reqBytes )))
816
+ e .mu .Unlock ()
805
817
return nil , fmt .Errorf ("failed to call %s: %w" , funcName , err )
806
818
}
807
819
820
+ // clean up request memory
821
+ deallocFunc .Call (ctx , reqPtr [0 ], uint64 (len (reqBytes )))
822
+
808
823
if len (res ) < 1 {
824
+ e .mu .Unlock ()
809
825
return nil , fmt .Errorf ("failed to call %s: no result returned" , funcName )
810
826
}
811
827
812
828
ptr , length := decodePtr (res [0 ])
813
- defer e .deallocFunc .Call (ctx , uint64 (ptr ), uint64 (length ))
814
-
815
829
b , ok := e .mod .Memory ().Read (ptr , length )
816
830
if ! ok {
831
+ deallocFunc .Call (ctx , uint64 (ptr ), uint64 (length ))
832
+ e .mu .Unlock ()
817
833
return nil , fmt .Errorf ("failed to read result from memory" )
818
834
}
819
835
820
- // Make a copy of the result before deallocating
836
+ // make a copy of the result before deallocating
821
837
result := make ([]byte , len (b ))
822
838
copy (result , b )
823
839
840
+ // clean up result memory
841
+ deallocFunc .Call (ctx , uint64 (ptr ), uint64 (length ))
842
+ e .mu .Unlock ()
843
+
824
844
return result , nil
825
845
}
0 commit comments