@@ -38,6 +38,10 @@ pub(crate) fn register_callout(token_id: u32) {
38
38
DISPATCHER . with ( |dispatcher| dispatcher. register_callout ( token_id) ) ;
39
39
}
40
40
41
+ pub ( crate ) fn register_grpc_callout ( token_id : u32 ) {
42
+ DISPATCHER . with ( |dispatcher| dispatcher. register_grpc_callout ( token_id) ) ;
43
+ }
44
+
41
45
struct NoopRoot ;
42
46
43
47
impl Context for NoopRoot { }
@@ -52,6 +56,7 @@ struct Dispatcher {
52
56
http_streams : RefCell < HashMap < u32 , Box < dyn HttpContext > > > ,
53
57
active_id : Cell < u32 > ,
54
58
callouts : RefCell < HashMap < u32 , u32 > > ,
59
+ grpc_callouts : RefCell < HashMap < u32 , u32 > > ,
55
60
}
56
61
57
62
impl Dispatcher {
@@ -65,6 +70,7 @@ impl Dispatcher {
65
70
http_streams : RefCell :: new ( HashMap :: new ( ) ) ,
66
71
active_id : Cell :: new ( 0 ) ,
67
72
callouts : RefCell :: new ( HashMap :: new ( ) ) ,
73
+ grpc_callouts : RefCell :: new ( HashMap :: new ( ) ) ,
68
74
}
69
75
}
70
76
@@ -91,6 +97,17 @@ impl Dispatcher {
91
97
}
92
98
}
93
99
100
+ fn register_grpc_callout ( & self , token_id : u32 ) {
101
+ if self
102
+ . grpc_callouts
103
+ . borrow_mut ( )
104
+ . insert ( token_id, self . active_id . get ( ) )
105
+ . is_some ( )
106
+ {
107
+ panic ! ( "duplicate token_id" )
108
+ }
109
+ }
110
+
94
111
fn create_root_context ( & self , context_id : u32 ) {
95
112
let new_context = match self . new_root . get ( ) {
96
113
Some ( f) => f ( context_id) ,
@@ -381,6 +398,50 @@ impl Dispatcher {
381
398
root. on_http_call_response ( token_id, num_headers, body_size, num_trailers)
382
399
}
383
400
}
401
+
402
+ fn on_grpc_receive ( & self , token_id : u32 , response_size : usize ) {
403
+ let context_id = self
404
+ . grpc_callouts
405
+ . borrow_mut ( )
406
+ . remove ( & token_id)
407
+ . expect ( "invalid token_id" ) ;
408
+
409
+ if let Some ( http_stream) = self . http_streams . borrow_mut ( ) . get_mut ( & context_id) {
410
+ self . active_id . set ( context_id) ;
411
+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
412
+ http_stream. on_grpc_call_response ( token_id, 0 , response_size) ;
413
+ } else if let Some ( stream) = self . streams . borrow_mut ( ) . get_mut ( & context_id) {
414
+ self . active_id . set ( context_id) ;
415
+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
416
+ stream. on_grpc_call_response ( token_id, 0 , response_size) ;
417
+ } else if let Some ( root) = self . roots . borrow_mut ( ) . get_mut ( & context_id) {
418
+ self . active_id . set ( context_id) ;
419
+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
420
+ root. on_grpc_call_response ( token_id, 0 , response_size) ;
421
+ }
422
+ }
423
+
424
+ fn on_grpc_close ( & self , token_id : u32 , status_code : u32 ) {
425
+ let context_id = self
426
+ . grpc_callouts
427
+ . borrow_mut ( )
428
+ . remove ( & token_id)
429
+ . expect ( "invalid token_id" ) ;
430
+
431
+ if let Some ( http_stream) = self . http_streams . borrow_mut ( ) . get_mut ( & context_id) {
432
+ self . active_id . set ( context_id) ;
433
+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
434
+ http_stream. on_grpc_call_response ( token_id, status_code, 0 ) ;
435
+ } else if let Some ( stream) = self . streams . borrow_mut ( ) . get_mut ( & context_id) {
436
+ self . active_id . set ( context_id) ;
437
+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
438
+ stream. on_grpc_call_response ( token_id, status_code, 0 ) ;
439
+ } else if let Some ( root) = self . roots . borrow_mut ( ) . get_mut ( & context_id) {
440
+ self . active_id . set ( context_id) ;
441
+ hostcalls:: set_effective_context ( context_id) . unwrap ( ) ;
442
+ root. on_grpc_call_response ( token_id, status_code, 0 ) ;
443
+ }
444
+ }
384
445
}
385
446
386
447
#[ no_mangle]
@@ -509,3 +570,13 @@ pub extern "C" fn proxy_on_http_call_response(
509
570
dispatcher. on_http_call_response ( token_id, num_headers, body_size, num_trailers)
510
571
} )
511
572
}
573
+
574
+ #[ no_mangle]
575
+ pub extern "C" fn proxy_on_grpc_receive ( _context_id : u32 , token_id : u32 , response_size : usize ) {
576
+ DISPATCHER . with ( |dispatcher| dispatcher. on_grpc_receive ( token_id, response_size) )
577
+ }
578
+
579
+ #[ no_mangle]
580
+ pub extern "C" fn proxy_on_grpc_close ( _context_id : u32 , token_id : u32 , status_code : u32 ) {
581
+ DISPATCHER . with ( |dispatcher| dispatcher. on_grpc_close ( token_id, status_code) )
582
+ }
0 commit comments