Skip to content

Commit 4b41f5a

Browse files
refactor(core)!: remove EitherOutput (#3341)
The trick with this one is to use `futures::Either` everywhere where we may wrap something that implements any of the `futures` traits. This includes the output of `EitherFuture` itself. We also need to implement `StreamMuxer` on `future::Either` because `StreamMuxer`s may be the the `Output` of `InboundUpgrade`.
1 parent 8d6a2fc commit 4b41f5a

File tree

15 files changed

+123
-329
lines changed

15 files changed

+123
-329
lines changed

core/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
- Remove `EitherFuture2` in favor of `EitherFuture`. See [PR 3340].
2929

30+
- Remove `EitherOutput` in favor of `future::Either`. See [PR 3341].
31+
3032
[PR 3031]: https://github.com/libp2p/rust-libp2p/pull/3031
3133
[PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058
3234
[PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097
@@ -36,6 +38,7 @@
3638
[PR 3338]: https://github.com/libp2p/rust-libp2p/pull/3338
3739
[PR 3339]: https://github.com/libp2p/rust-libp2p/pull/3339
3840
[PR 3340]: https://github.com/libp2p/rust-libp2p/pull/3340
41+
[PR 3341]: https://github.com/libp2p/rust-libp2p/pull/3341
3942

4043
# 0.37.0
4144

core/src/either.rs

Lines changed: 46 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -25,167 +25,30 @@ use crate::{
2525
Multiaddr, ProtocolName,
2626
};
2727
use either::Either;
28-
use futures::{
29-
io::{IoSlice, IoSliceMut},
30-
prelude::*,
31-
};
28+
use futures::prelude::*;
3229
use pin_project::pin_project;
33-
use std::{io, pin::Pin, task::Context, task::Poll};
34-
35-
/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to
36-
/// either `First` or `Second`.
37-
#[pin_project(project = EitherOutputProj)]
38-
#[derive(Debug, Copy, Clone)]
39-
pub enum EitherOutput<A, B> {
40-
First(#[pin] A),
41-
Second(#[pin] B),
42-
}
43-
44-
impl<A, B> AsyncRead for EitherOutput<A, B>
45-
where
46-
A: AsyncRead,
47-
B: AsyncRead,
48-
{
49-
fn poll_read(
50-
self: Pin<&mut Self>,
51-
cx: &mut Context<'_>,
52-
buf: &mut [u8],
53-
) -> Poll<io::Result<usize>> {
54-
match self.project() {
55-
EitherOutputProj::First(a) => AsyncRead::poll_read(a, cx, buf),
56-
EitherOutputProj::Second(b) => AsyncRead::poll_read(b, cx, buf),
57-
}
58-
}
30+
use std::{pin::Pin, task::Context, task::Poll};
5931

60-
fn poll_read_vectored(
61-
self: Pin<&mut Self>,
62-
cx: &mut Context<'_>,
63-
bufs: &mut [IoSliceMut<'_>],
64-
) -> Poll<io::Result<usize>> {
65-
match self.project() {
66-
EitherOutputProj::First(a) => AsyncRead::poll_read_vectored(a, cx, bufs),
67-
EitherOutputProj::Second(b) => AsyncRead::poll_read_vectored(b, cx, bufs),
68-
}
69-
}
70-
}
71-
72-
impl<A, B> AsyncWrite for EitherOutput<A, B>
73-
where
74-
A: AsyncWrite,
75-
B: AsyncWrite,
76-
{
77-
fn poll_write(
78-
self: Pin<&mut Self>,
79-
cx: &mut Context<'_>,
80-
buf: &[u8],
81-
) -> Poll<io::Result<usize>> {
82-
match self.project() {
83-
EitherOutputProj::First(a) => AsyncWrite::poll_write(a, cx, buf),
84-
EitherOutputProj::Second(b) => AsyncWrite::poll_write(b, cx, buf),
85-
}
86-
}
87-
88-
fn poll_write_vectored(
89-
self: Pin<&mut Self>,
90-
cx: &mut Context<'_>,
91-
bufs: &[IoSlice<'_>],
92-
) -> Poll<io::Result<usize>> {
93-
match self.project() {
94-
EitherOutputProj::First(a) => AsyncWrite::poll_write_vectored(a, cx, bufs),
95-
EitherOutputProj::Second(b) => AsyncWrite::poll_write_vectored(b, cx, bufs),
96-
}
97-
}
98-
99-
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
100-
match self.project() {
101-
EitherOutputProj::First(a) => AsyncWrite::poll_flush(a, cx),
102-
EitherOutputProj::Second(b) => AsyncWrite::poll_flush(b, cx),
103-
}
104-
}
105-
106-
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
107-
match self.project() {
108-
EitherOutputProj::First(a) => AsyncWrite::poll_close(a, cx),
109-
EitherOutputProj::Second(b) => AsyncWrite::poll_close(b, cx),
110-
}
111-
}
112-
}
113-
114-
impl<A, B, I> Stream for EitherOutput<A, B>
115-
where
116-
A: TryStream<Ok = I>,
117-
B: TryStream<Ok = I>,
118-
{
119-
type Item = Result<I, Either<A::Error, B::Error>>;
120-
121-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122-
match self.project() {
123-
EitherOutputProj::First(a) => {
124-
TryStream::try_poll_next(a, cx).map(|v| v.map(|r| r.map_err(Either::Left)))
125-
}
126-
EitherOutputProj::Second(b) => {
127-
TryStream::try_poll_next(b, cx).map(|v| v.map(|r| r.map_err(Either::Right)))
128-
}
129-
}
130-
}
131-
}
132-
133-
impl<A, B, I> Sink<I> for EitherOutput<A, B>
134-
where
135-
A: Sink<I>,
136-
B: Sink<I>,
137-
{
138-
type Error = Either<A::Error, B::Error>;
139-
140-
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
141-
match self.project() {
142-
EitherOutputProj::First(a) => Sink::poll_ready(a, cx).map_err(Either::Left),
143-
EitherOutputProj::Second(b) => Sink::poll_ready(b, cx).map_err(Either::Right),
144-
}
145-
}
146-
147-
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
148-
match self.project() {
149-
EitherOutputProj::First(a) => Sink::start_send(a, item).map_err(Either::Left),
150-
EitherOutputProj::Second(b) => Sink::start_send(b, item).map_err(Either::Right),
151-
}
152-
}
153-
154-
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155-
match self.project() {
156-
EitherOutputProj::First(a) => Sink::poll_flush(a, cx).map_err(Either::Left),
157-
EitherOutputProj::Second(b) => Sink::poll_flush(b, cx).map_err(Either::Right),
158-
}
159-
}
160-
161-
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
162-
match self.project() {
163-
EitherOutputProj::First(a) => Sink::poll_close(a, cx).map_err(Either::Left),
164-
EitherOutputProj::Second(b) => Sink::poll_close(b, cx).map_err(Either::Right),
165-
}
166-
}
167-
}
168-
169-
impl<A, B> StreamMuxer for EitherOutput<A, B>
32+
impl<A, B> StreamMuxer for future::Either<A, B>
17033
where
17134
A: StreamMuxer,
17235
B: StreamMuxer,
17336
{
174-
type Substream = EitherOutput<A::Substream, B::Substream>;
37+
type Substream = future::Either<A::Substream, B::Substream>;
17538
type Error = Either<A::Error, B::Error>;
17639

17740
fn poll_inbound(
17841
self: Pin<&mut Self>,
17942
cx: &mut Context<'_>,
18043
) -> Poll<Result<Self::Substream, Self::Error>> {
181-
match self.project() {
182-
EitherOutputProj::First(inner) => inner
44+
match as_pin_mut(self) {
45+
future::Either::Left(inner) => inner
18346
.poll_inbound(cx)
184-
.map_ok(EitherOutput::First)
47+
.map_ok(future::Either::Left)
18548
.map_err(Either::Left),
186-
EitherOutputProj::Second(inner) => inner
49+
future::Either::Right(inner) => inner
18750
.poll_inbound(cx)
188-
.map_ok(EitherOutput::Second)
51+
.map_ok(future::Either::Right)
18952
.map_err(Either::Right),
19053
}
19154
}
@@ -194,32 +57,54 @@ where
19457
self: Pin<&mut Self>,
19558
cx: &mut Context<'_>,
19659
) -> Poll<Result<Self::Substream, Self::Error>> {
197-
match self.project() {
198-
EitherOutputProj::First(inner) => inner
60+
match as_pin_mut(self) {
61+
future::Either::Left(inner) => inner
19962
.poll_outbound(cx)
200-
.map_ok(EitherOutput::First)
63+
.map_ok(future::Either::Left)
20164
.map_err(Either::Left),
202-
EitherOutputProj::Second(inner) => inner
65+
future::Either::Right(inner) => inner
20366
.poll_outbound(cx)
204-
.map_ok(EitherOutput::Second)
67+
.map_ok(future::Either::Right)
20568
.map_err(Either::Right),
20669
}
20770
}
20871

20972
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
210-
match self.project() {
211-
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(Either::Left),
212-
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(Either::Right),
73+
match as_pin_mut(self) {
74+
future::Either::Left(inner) => inner.poll_close(cx).map_err(Either::Left),
75+
future::Either::Right(inner) => inner.poll_close(cx).map_err(Either::Right),
21376
}
21477
}
21578

21679
fn poll(
21780
self: Pin<&mut Self>,
21881
cx: &mut Context<'_>,
21982
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
220-
match self.project() {
221-
EitherOutputProj::First(inner) => inner.poll(cx).map_err(Either::Left),
222-
EitherOutputProj::Second(inner) => inner.poll(cx).map_err(Either::Right),
83+
match as_pin_mut(self) {
84+
future::Either::Left(inner) => inner.poll(cx).map_err(Either::Left),
85+
future::Either::Right(inner) => inner.poll(cx).map_err(Either::Right),
86+
}
87+
}
88+
}
89+
90+
/// Convert `Pin<&mut Either<A, B>>` to `Either<Pin<&mut A>, Pin<&mut B>>`,
91+
/// pinned projections of the inner variants.
92+
///
93+
/// Local function until <https://github.com/rust-lang/futures-rs/pull/2691> is merged.
94+
fn as_pin_mut<A, B>(
95+
either: Pin<&mut future::Either<A, B>>,
96+
) -> future::Either<Pin<&mut A>, Pin<&mut B>> {
97+
// SAFETY: `get_unchecked_mut` is fine because we don't move anything.
98+
// We can use `new_unchecked` because the `inner` parts are guaranteed
99+
// to be pinned, as they come from `self` which is pinned, and we never
100+
// offer an unpinned `&mut L` or `&mut R` through `Pin<&mut Self>`. We
101+
// also don't have an implementation of `Drop`, nor manual `Unpin`.
102+
unsafe {
103+
match *Pin::get_unchecked_mut(either) {
104+
future::Either::Left(ref mut inner) => future::Either::Left(Pin::new_unchecked(inner)),
105+
future::Either::Right(ref mut inner) => {
106+
future::Either::Right(Pin::new_unchecked(inner))
107+
}
223108
}
224109
}
225110
}
@@ -238,15 +123,15 @@ where
238123
AFuture: TryFuture<Ok = AInner>,
239124
BFuture: TryFuture<Ok = BInner>,
240125
{
241-
type Output = Result<EitherOutput<AInner, BInner>, Either<AFuture::Error, BFuture::Error>>;
126+
type Output = Result<future::Either<AInner, BInner>, Either<AFuture::Error, BFuture::Error>>;
242127

243128
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
244129
match self.project() {
245130
EitherFutureProj::First(a) => TryFuture::try_poll(a, cx)
246-
.map_ok(EitherOutput::First)
131+
.map_ok(future::Either::Left)
247132
.map_err(Either::Left),
248133
EitherFutureProj::Second(a) => TryFuture::try_poll(a, cx)
249-
.map_ok(EitherOutput::Second)
134+
.map_ok(future::Either::Right)
250135
.map_err(Either::Right),
251136
}
252137
}
@@ -272,7 +157,7 @@ where
272157
B: Transport,
273158
A: Transport,
274159
{
275-
type Output = EitherOutput<A::Output, B::Output>;
160+
type Output = future::Either<A::Output, B::Output>;
276161
type Error = Either<A::Error, B::Error>;
277162
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
278163
type Dial = EitherFuture<A::Dial, B::Dial>;

core/src/transport/choice.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
use crate::either::{EitherFuture, EitherOutput};
21+
use crate::either::EitherFuture;
2222
use crate::transport::{ListenerId, Transport, TransportError, TransportEvent};
2323
use either::Either;
24+
use futures::future;
2425
use multiaddr::Multiaddr;
2526
use std::{pin::Pin, task::Context, task::Poll};
2627

@@ -40,7 +41,7 @@ where
4041
B: Transport,
4142
A: Transport,
4243
{
43-
type Output = EitherOutput<A::Output, B::Output>;
44+
type Output = future::Either<A::Output, B::Output>;
4445
type Error = Either<A::Error, B::Error>;
4546
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
4647
type Dial = EitherFuture<A::Dial, B::Dial>;

core/src/upgrade/either.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
// DEALINGS IN THE SOFTWARE.
2020

2121
use crate::{
22-
either::{EitherFuture, EitherName, EitherOutput},
22+
either::{EitherFuture, EitherName},
2323
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
2424
};
2525
use either::Either;
26+
use futures::future;
2627

2728
impl<A, B> UpgradeInfo for Either<A, B>
2829
where
@@ -48,7 +49,7 @@ where
4849
A: InboundUpgrade<C, Output = TA, Error = EA>,
4950
B: InboundUpgrade<C, Output = TB, Error = EB>,
5051
{
51-
type Output = EitherOutput<TA, TB>;
52+
type Output = future::Either<TA, TB>;
5253
type Error = Either<EA, EB>;
5354
type Future = EitherFuture<A::Future, B::Future>;
5455

@@ -70,7 +71,7 @@ where
7071
A: OutboundUpgrade<C, Output = TA, Error = EA>,
7172
B: OutboundUpgrade<C, Output = TB, Error = EB>,
7273
{
73-
type Output = EitherOutput<TA, TB>;
74+
type Output = future::Either<TA, TB>;
7475
type Error = Either<EA, EB>;
7576
type Future = EitherFuture<A::Future, B::Future>;
7677

core/src/upgrade/select.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020

2121
use crate::either::EitherFuture;
2222
use crate::{
23-
either::{EitherName, EitherOutput},
23+
either::EitherName,
2424
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
2525
};
2626
use either::Either;
27+
use futures::future;
2728

2829
/// Upgrade that combines two upgrades into one. Supports all the protocols supported by either
2930
/// sub-upgrade.
@@ -65,7 +66,7 @@ where
6566
A: InboundUpgrade<C, Output = TA, Error = EA>,
6667
B: InboundUpgrade<C, Output = TB, Error = EB>,
6768
{
68-
type Output = EitherOutput<TA, TB>;
69+
type Output = future::Either<TA, TB>;
6970
type Error = Either<EA, EB>;
7071
type Future = EitherFuture<A::Future, B::Future>;
7172

@@ -82,7 +83,7 @@ where
8283
A: OutboundUpgrade<C, Output = TA, Error = EA>,
8384
B: OutboundUpgrade<C, Output = TB, Error = EB>,
8485
{
85-
type Output = EitherOutput<TA, TB>;
86+
type Output = future::Either<TA, TB>;
8687
type Error = Either<EA, EB>;
8788
type Future = EitherFuture<A::Future, B::Future>;
8889

protocols/dcutr/src/handler/relayed.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
2323
use crate::protocol;
2424
use either::Either;
25+
use futures::future;
2526
use futures::future::{BoxFuture, FutureExt};
2627
use instant::Instant;
27-
use libp2p_core::either::EitherOutput;
2828
use libp2p_core::multiaddr::Multiaddr;
2929
use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError};
3030
use libp2p_core::ConnectedPoint;
@@ -174,7 +174,7 @@ impl Handler {
174174
>,
175175
) {
176176
match output {
177-
EitherOutput::First(inbound_connect) => {
177+
future::Either::Left(inbound_connect) => {
178178
let remote_addr = match &self.endpoint {
179179
ConnectedPoint::Dialer { address, role_override: _ } => address.clone(),
180180
ConnectedPoint::Listener { ..} => unreachable!("`<Handler as ConnectionHandler>::listen_protocol` denies all incoming substreams as a listener."),
@@ -187,7 +187,7 @@ impl Handler {
187187
));
188188
}
189189
// A connection listener denies all incoming substreams, thus none can ever be fully negotiated.
190-
EitherOutput::Second(output) => void::unreachable(output),
190+
future::Either::Right(output) => void::unreachable(output),
191191
}
192192
}
193193

0 commit comments

Comments
 (0)