Skip to content

Commit 0bbad36

Browse files
authored
Minor: move FallibleRequestStream and FallibleTonicResponseStream to a module (#6258)
* Minor: move FallibleRequestStream and FallibleTonicResponseStream to their own modules * Improve documentation and add links
1 parent 23b6ff9 commit 0bbad36

File tree

4 files changed

+138
-106
lines changed

4 files changed

+138
-106
lines changed

arrow-flight/src/client.rs

Lines changed: 2 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{pin::Pin, task::Poll};
19-
2018
use crate::{
2119
decode::FlightRecordBatchStream,
2220
flight_service_client::FlightServiceClient,
@@ -28,16 +26,15 @@ use crate::{
2826
use arrow_schema::Schema;
2927
use bytes::Bytes;
3028
use futures::{
31-
channel::oneshot::{Receiver, Sender},
3229
future::ready,
33-
ready,
3430
stream::{self, BoxStream},
35-
FutureExt, Stream, StreamExt, TryStreamExt,
31+
Stream, StreamExt, TryStreamExt,
3632
};
3733
use prost::Message;
3834
use tonic::{metadata::MetadataMap, transport::Channel};
3935

4036
use crate::error::{FlightError, Result};
37+
use crate::streams::{FallibleRequestStream, FallibleTonicResponseStream};
4138

4239
/// A "Mid level" [Apache Arrow Flight](https://arrow.apache.org/docs/format/Flight.html) client.
4340
///
@@ -674,103 +671,3 @@ impl FlightClient {
674671
request
675672
}
676673
}
677-
678-
/// Wrapper around fallible stream such that when
679-
/// it encounters an error it uses the oneshot sender to
680-
/// notify the error and stop any further streaming. See `do_put` or
681-
/// `do_exchange` for it's uses.
682-
pub(crate) struct FallibleRequestStream<T, E> {
683-
/// sender to notify error
684-
sender: Option<Sender<E>>,
685-
/// fallible stream
686-
fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> + Send + 'static>>,
687-
}
688-
689-
impl<T, E> FallibleRequestStream<T, E> {
690-
pub(crate) fn new(
691-
sender: Sender<E>,
692-
fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> + Send + 'static>>,
693-
) -> Self {
694-
Self {
695-
sender: Some(sender),
696-
fallible_stream,
697-
}
698-
}
699-
}
700-
701-
impl<T, E> Stream for FallibleRequestStream<T, E> {
702-
type Item = T;
703-
704-
fn poll_next(
705-
self: std::pin::Pin<&mut Self>,
706-
cx: &mut std::task::Context<'_>,
707-
) -> std::task::Poll<Option<Self::Item>> {
708-
let pinned = self.get_mut();
709-
let mut request_streams = pinned.fallible_stream.as_mut();
710-
match ready!(request_streams.poll_next_unpin(cx)) {
711-
Some(Ok(data)) => Poll::Ready(Some(data)),
712-
Some(Err(e)) => {
713-
// in theory this should only ever be called once
714-
// as this stream should not be polled again after returning
715-
// None, however we still check for None to be safe
716-
if let Some(sender) = pinned.sender.take() {
717-
// an error means the other end of the channel is not around
718-
// to receive the error, so ignore it
719-
let _ = sender.send(e);
720-
}
721-
Poll::Ready(None)
722-
}
723-
None => Poll::Ready(None),
724-
}
725-
}
726-
}
727-
728-
/// Wrapper for a tonic response stream that can produce a tonic
729-
/// error. This is tied to a oneshot receiver which can be notified
730-
/// of other errors. When it receives an error through receiver
731-
/// end, it prioritises that error to be sent back. See `do_put` or
732-
/// `do_exchange` for it's uses
733-
struct FallibleTonicResponseStream<T> {
734-
/// Receiver for FlightError
735-
receiver: Receiver<FlightError>,
736-
/// Tonic response stream
737-
response_stream:
738-
Pin<Box<dyn Stream<Item = std::result::Result<T, tonic::Status>> + Send + 'static>>,
739-
}
740-
741-
impl<T> FallibleTonicResponseStream<T> {
742-
fn new(
743-
receiver: Receiver<FlightError>,
744-
response_stream: Pin<
745-
Box<dyn Stream<Item = std::result::Result<T, tonic::Status>> + Send + 'static>,
746-
>,
747-
) -> Self {
748-
Self {
749-
receiver,
750-
response_stream,
751-
}
752-
}
753-
}
754-
755-
impl<T> Stream for FallibleTonicResponseStream<T> {
756-
type Item = Result<T>;
757-
758-
fn poll_next(
759-
self: Pin<&mut Self>,
760-
cx: &mut std::task::Context<'_>,
761-
) -> Poll<Option<Self::Item>> {
762-
let pinned = self.get_mut();
763-
let receiver = &mut pinned.receiver;
764-
// Prioritise sending the error that's been notified over
765-
// polling the response_stream
766-
if let Poll::Ready(Ok(err)) = receiver.poll_unpin(cx) {
767-
return Poll::Ready(Some(Err(err)));
768-
};
769-
770-
match ready!(pinned.response_stream.poll_next_unpin(cx)) {
771-
Some(Ok(res)) => Poll::Ready(Some(Ok(res))),
772-
Some(Err(status)) => Poll::Ready(Some(Err(FlightError::Tonic(status)))),
773-
None => Poll::Ready(None),
774-
}
775-
}
776-
}

arrow-flight/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ pub mod utils;
120120

121121
#[cfg(feature = "flight-sql-experimental")]
122122
pub mod sql;
123+
mod streams;
123124

124125
use flight_descriptor::DescriptorType;
125126

arrow-flight/src/sql/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use std::collections::HashMap;
2424
use std::str::FromStr;
2525
use tonic::metadata::AsciiMetadataKey;
2626

27-
use crate::client::FallibleRequestStream;
2827
use crate::decode::FlightRecordBatchStream;
2928
use crate::encode::FlightDataEncoderBuilder;
3029
use crate::error::FlightError;
@@ -43,6 +42,7 @@ use crate::sql::{
4342
CommandStatementIngest, CommandStatementQuery, CommandStatementUpdate,
4443
DoPutPreparedStatementResult, DoPutUpdateResult, ProstMessageExt, SqlInfo,
4544
};
45+
use crate::streams::FallibleRequestStream;
4646
use crate::trailers::extract_lazy_trailers;
4747
use crate::{
4848
Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse,

arrow-flight/src/streams.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! [`FallibleRequestStream`] and [`FallibleTonicResponseStream`] adapters
19+
20+
use crate::error::FlightError;
21+
use futures::{
22+
channel::oneshot::{Receiver, Sender},
23+
FutureExt, Stream, StreamExt,
24+
};
25+
use std::pin::Pin;
26+
use std::task::{ready, Poll};
27+
28+
/// Wrapper around a fallible stream (one that returns errors) that makes it infallible.
29+
///
30+
/// Any errors encountered in the stream are ignored are sent to the provided
31+
/// oneshot sender.
32+
///
33+
/// This can be used to accept a stream of `Result<_>` from a client API and send
34+
/// them to the remote server that wants only the successful results.
35+
pub(crate) struct FallibleRequestStream<T, E> {
36+
/// sender to notify error
37+
sender: Option<Sender<E>>,
38+
/// fallible stream
39+
fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> + Send + 'static>>,
40+
}
41+
42+
impl<T, E> FallibleRequestStream<T, E> {
43+
pub(crate) fn new(
44+
sender: Sender<E>,
45+
fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> + Send + 'static>>,
46+
) -> Self {
47+
Self {
48+
sender: Some(sender),
49+
fallible_stream,
50+
}
51+
}
52+
}
53+
54+
impl<T, E> Stream for FallibleRequestStream<T, E> {
55+
type Item = T;
56+
57+
fn poll_next(
58+
self: std::pin::Pin<&mut Self>,
59+
cx: &mut std::task::Context<'_>,
60+
) -> std::task::Poll<Option<Self::Item>> {
61+
let pinned = self.get_mut();
62+
let mut request_streams = pinned.fallible_stream.as_mut();
63+
match ready!(request_streams.poll_next_unpin(cx)) {
64+
Some(Ok(data)) => Poll::Ready(Some(data)),
65+
Some(Err(e)) => {
66+
// in theory this should only ever be called once
67+
// as this stream should not be polled again after returning
68+
// None, however we still check for None to be safe
69+
if let Some(sender) = pinned.sender.take() {
70+
// an error means the other end of the channel is not around
71+
// to receive the error, so ignore it
72+
let _ = sender.send(e);
73+
}
74+
Poll::Ready(None)
75+
}
76+
None => Poll::Ready(None),
77+
}
78+
}
79+
}
80+
81+
/// Wrapper for a tonic response stream that maps errors to `FlightError` and
82+
/// returns errors from a oneshot channel into the stream.
83+
///
84+
/// The user of this stream can inject an error into the response stream using
85+
/// the one shot receiver. This is used to propagate errors in
86+
/// [`FlightClient::do_put`] and [`FlightClient::do_exchange`] from the client
87+
/// provided input stream to the response stream.
88+
///
89+
/// # Error Priority
90+
/// Error from the receiver are prioritised over the response stream.
91+
///
92+
/// [`FlightClient::do_put`]: crate::FlightClient::do_put
93+
/// [`FlightClient::do_exchange`]: crate::FlightClient::do_exchange
94+
pub(crate) struct FallibleTonicResponseStream<T> {
95+
/// Receiver for FlightError
96+
receiver: Receiver<FlightError>,
97+
/// Tonic response stream
98+
response_stream: Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + 'static>>,
99+
}
100+
101+
impl<T> FallibleTonicResponseStream<T> {
102+
pub(crate) fn new(
103+
receiver: Receiver<FlightError>,
104+
response_stream: Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + 'static>>,
105+
) -> Self {
106+
Self {
107+
receiver,
108+
response_stream,
109+
}
110+
}
111+
}
112+
113+
impl<T> Stream for FallibleTonicResponseStream<T> {
114+
type Item = Result<T, FlightError>;
115+
116+
fn poll_next(
117+
self: Pin<&mut Self>,
118+
cx: &mut std::task::Context<'_>,
119+
) -> Poll<Option<Self::Item>> {
120+
let pinned = self.get_mut();
121+
let receiver = &mut pinned.receiver;
122+
// Prioritise sending the error that's been notified over
123+
// polling the response_stream
124+
if let Poll::Ready(Ok(err)) = receiver.poll_unpin(cx) {
125+
return Poll::Ready(Some(Err(err)));
126+
};
127+
128+
match ready!(pinned.response_stream.poll_next_unpin(cx)) {
129+
Some(Ok(res)) => Poll::Ready(Some(Ok(res))),
130+
Some(Err(status)) => Poll::Ready(Some(Err(FlightError::Tonic(status)))),
131+
None => Poll::Ready(None),
132+
}
133+
}
134+
}

0 commit comments

Comments
 (0)