Skip to content

Commit ec3ddd2

Browse files
authored
Add support for nested gRPC callouts. (#240)
Signed-off-by: andytesti <[email protected]>
1 parent 176e12d commit ec3ddd2

File tree

1 file changed

+42
-35
lines changed

1 file changed

+42
-35
lines changed

src/dispatcher.rs

+42-35
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,8 @@ impl Dispatcher {
453453
}
454454

455455
fn on_grpc_receive(&self, token_id: u32, response_size: usize) {
456-
if let Some(context_id) = self.grpc_callouts.borrow_mut().remove(&token_id) {
456+
let context_id = self.grpc_callouts.borrow_mut().remove(&token_id);
457+
if let Some(context_id) = context_id {
457458
if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
458459
self.active_id.set(context_id);
459460
hostcalls::set_effective_context(context_id).unwrap();
@@ -467,24 +468,26 @@ impl Dispatcher {
467468
hostcalls::set_effective_context(context_id).unwrap();
468469
root.on_grpc_call_response(token_id, 0, response_size);
469470
}
470-
} else if let Some(context_id) = self.grpc_streams.borrow_mut().get(&token_id) {
471-
let context_id = *context_id;
472-
if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
473-
self.active_id.set(context_id);
474-
hostcalls::set_effective_context(context_id).unwrap();
475-
http_stream.on_grpc_stream_message(token_id, response_size);
476-
} else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
477-
self.active_id.set(context_id);
478-
hostcalls::set_effective_context(context_id).unwrap();
479-
stream.on_grpc_stream_message(token_id, response_size);
480-
} else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
481-
self.active_id.set(context_id);
482-
hostcalls::set_effective_context(context_id).unwrap();
483-
root.on_grpc_stream_message(token_id, response_size);
484-
}
485471
} else {
486-
// TODO: change back to a panic once underlying issue is fixed.
487-
trace!("on_grpc_receive_initial_metadata: invalid token_id");
472+
let context_id = self.grpc_streams.borrow().get(&token_id).cloned();
473+
if let Some(context_id) = context_id {
474+
if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
475+
self.active_id.set(context_id);
476+
hostcalls::set_effective_context(context_id).unwrap();
477+
http_stream.on_grpc_stream_message(token_id, response_size);
478+
} else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
479+
self.active_id.set(context_id);
480+
hostcalls::set_effective_context(context_id).unwrap();
481+
stream.on_grpc_stream_message(token_id, response_size);
482+
} else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
483+
self.active_id.set(context_id);
484+
hostcalls::set_effective_context(context_id).unwrap();
485+
root.on_grpc_stream_message(token_id, response_size);
486+
}
487+
} else {
488+
// TODO: change back to a panic once underlying issue is fixed.
489+
trace!("on_grpc_receive_initial_metadata: invalid token_id");
490+
}
488491
}
489492
}
490493

@@ -514,7 +517,8 @@ impl Dispatcher {
514517
}
515518

516519
fn on_grpc_close(&self, token_id: u32, status_code: u32) {
517-
if let Some(context_id) = self.grpc_callouts.borrow_mut().remove(&token_id) {
520+
let context_id = self.grpc_callouts.borrow_mut().remove(&token_id);
521+
if let Some(context_id) = context_id {
518522
if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
519523
self.active_id.set(context_id);
520524
hostcalls::set_effective_context(context_id).unwrap();
@@ -528,23 +532,26 @@ impl Dispatcher {
528532
hostcalls::set_effective_context(context_id).unwrap();
529533
root.on_grpc_call_response(token_id, status_code, 0);
530534
}
531-
} else if let Some(context_id) = self.grpc_streams.borrow_mut().remove(&token_id) {
532-
if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
533-
self.active_id.set(context_id);
534-
hostcalls::set_effective_context(context_id).unwrap();
535-
http_stream.on_grpc_stream_close(token_id, status_code)
536-
} else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
537-
self.active_id.set(context_id);
538-
hostcalls::set_effective_context(context_id).unwrap();
539-
stream.on_grpc_stream_close(token_id, status_code)
540-
} else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
541-
self.active_id.set(context_id);
542-
hostcalls::set_effective_context(context_id).unwrap();
543-
root.on_grpc_stream_close(token_id, status_code)
544-
}
545535
} else {
546-
// TODO: change back to a panic once underlying issue is fixed.
547-
trace!("on_grpc_close: invalid token_id, a non-connected stream has closed");
536+
let context_id = self.grpc_streams.borrow_mut().remove(&token_id);
537+
if let Some(context_id) = context_id {
538+
if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
539+
self.active_id.set(context_id);
540+
hostcalls::set_effective_context(context_id).unwrap();
541+
http_stream.on_grpc_stream_close(token_id, status_code)
542+
} else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
543+
self.active_id.set(context_id);
544+
hostcalls::set_effective_context(context_id).unwrap();
545+
stream.on_grpc_stream_close(token_id, status_code)
546+
} else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
547+
self.active_id.set(context_id);
548+
hostcalls::set_effective_context(context_id).unwrap();
549+
root.on_grpc_stream_close(token_id, status_code)
550+
}
551+
} else {
552+
// TODO: change back to a panic once underlying issue is fixed.
553+
trace!("on_grpc_close: invalid token_id, a non-connected stream has closed");
554+
}
548555
}
549556
}
550557
}

0 commit comments

Comments
 (0)