Skip to content

Commit d266898

Browse files
committed
feat(settlement/api): Upgrade the message service
This service is responsible for catching any messages sent to the Settlement ILP Address and forward them to the engine
1 parent 31b8574 commit d266898

File tree

1 file changed

+93
-75
lines changed

1 file changed

+93
-75
lines changed

crates/interledger-settlement/src/api/message_service.rs

Lines changed: 93 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
use crate::core::types::{SettlementAccount, SE_ILP_ADDRESS};
2-
use futures::{
3-
future::{err, Either},
4-
Future, Stream,
5-
};
2+
use async_trait::async_trait;
3+
use futures::{compat::Future01CompatExt, TryFutureExt};
64
use interledger_packet::{ErrorCode, FulfillBuilder, RejectBuilder};
7-
use interledger_service::{Account, BoxedIlpFuture, IncomingRequest, IncomingService};
5+
use interledger_service::{Account, IlpResult, IncomingRequest, IncomingService};
86
use log::error;
9-
use reqwest::r#async::Client;
7+
use reqwest::Client;
108
use std::marker::PhantomData;
119
use tokio_retry::{strategy::ExponentialBackoff, Retry};
1210

@@ -33,14 +31,13 @@ where
3331
}
3432
}
3533

34+
#[async_trait]
3635
impl<I, A> IncomingService<A> for SettlementMessageService<I, A>
3736
where
3837
I: IncomingService<A> + Send,
39-
A: SettlementAccount + Account,
38+
A: SettlementAccount + Account + Send + Sync,
4039
{
41-
type Future = BoxedIlpFuture;
42-
43-
fn handle_request(&mut self, request: IncomingRequest<A>) -> Self::Future {
40+
async fn handle_request(&mut self, request: IncomingRequest<A>) -> IlpResult {
4441
// Only handle the request if the destination address matches the ILP address
4542
// of the settlement engine being used for this account
4643
if let Some(settlement_engine_details) = request.from.settlement_engine_details() {
@@ -67,73 +64,94 @@ where
6764
.header("Idempotency-Key", idempotency_uuid.clone())
6865
.body(message.clone())
6966
.send()
67+
.compat() // Wrap to a 0.1 future
7068
};
69+
// TODO: futures-retry is still not on futures 0.3. As a result, we wrap our action in a
70+
// 0.1 future, and then wrap the Retry future in a 0.3 future to use async/await.
7171

72-
return Box::new(Retry::spawn(ExponentialBackoff::from_millis(10).take(10), action)
73-
.map_err(move |error| {
74-
error!("Error sending message to settlement engine: {:?}", error);
75-
RejectBuilder {
76-
code: ErrorCode::T00_INTERNAL_ERROR,
77-
message: b"Error sending message to settlement engine",
78-
data: &[],
79-
triggered_by: Some(&SE_ILP_ADDRESS),
80-
}.build()
81-
})
82-
.and_then(move |response| {
83-
let status = response.status();
84-
if status.is_success() {
85-
Either::A(response.into_body().concat2().map_err(move |err| {
86-
error!("Error concatenating settlement engine response body: {:?}", err);
72+
let response = Retry::spawn(ExponentialBackoff::from_millis(10).take(10), action)
73+
.compat()
74+
.map_err(move |error| {
75+
error!("Error sending message to settlement engine: {:?}", error);
76+
RejectBuilder {
77+
code: ErrorCode::T00_INTERNAL_ERROR,
78+
message: b"Error sending message to settlement engine",
79+
data: &[],
80+
triggered_by: Some(&SE_ILP_ADDRESS),
81+
}
82+
.build()
83+
})
84+
.await?;
85+
let status = response.status();
86+
if status.is_success() {
87+
let body = response
88+
.bytes()
89+
.map_err(|err| {
90+
error!(
91+
"Error concatenating settlement engine response body: {:?}",
92+
err
93+
);
8794
RejectBuilder {
8895
code: ErrorCode::T00_INTERNAL_ERROR,
8996
message: b"Error getting settlement engine response",
9097
data: &[],
9198
triggered_by: Some(&SE_ILP_ADDRESS),
92-
}.build()
99+
}
100+
.build()
93101
})
94-
.and_then(|body| {
95-
Ok(FulfillBuilder {
96-
fulfillment: &PEER_FULFILLMENT,
97-
data: body.as_ref(),
98-
}.build())
99-
}))
102+
.await?;
103+
104+
return Ok(FulfillBuilder {
105+
fulfillment: &PEER_FULFILLMENT,
106+
data: body.as_ref(),
107+
}
108+
.build());
109+
} else {
110+
error!(
111+
"Settlement engine rejected message with HTTP error code: {}",
112+
response.status()
113+
);
114+
let code = if status.is_client_error() {
115+
ErrorCode::F00_BAD_REQUEST
100116
} else {
101-
error!("Settlement engine rejected message with HTTP error code: {}", response.status());
102-
let code = if status.is_client_error() {
103-
ErrorCode::F00_BAD_REQUEST
104-
} else {
105-
ErrorCode::T00_INTERNAL_ERROR
106-
};
107-
Either::B(err(RejectBuilder {
108-
code,
109-
message: format!("Settlement engine rejected request with error code: {}", response.status()).as_str().as_ref(),
110-
data: &[],
111-
triggered_by: Some(&SE_ILP_ADDRESS),
112-
}.build()))
117+
ErrorCode::T00_INTERNAL_ERROR
118+
};
119+
120+
return Err(RejectBuilder {
121+
code,
122+
message: format!(
123+
"Settlement engine rejected request with error code: {}",
124+
response.status()
125+
)
126+
.as_str()
127+
.as_ref(),
128+
data: &[],
129+
triggered_by: Some(&SE_ILP_ADDRESS),
113130
}
114-
}));
131+
.build());
132+
}
115133
}
116134
}
117-
Box::new(self.next.handle_request(request))
135+
self.next.handle_request(request).await
118136
}
119137
}
120138

121139
#[cfg(test)]
122140
mod tests {
123141
use super::*;
124142
use crate::api::fixtures::{BODY, DATA, SERVICE_ADDRESS, TEST_ACCOUNT_0};
125-
use crate::api::test_helpers::{block_on, mock_message, test_service};
143+
use crate::api::test_helpers::{mock_message, test_service};
126144
use interledger_packet::{Address, Fulfill, PrepareBuilder, Reject};
127145
use std::str::FromStr;
128146
use std::time::SystemTime;
129147

130-
#[test]
131-
fn settlement_ok() {
148+
#[tokio::test]
149+
async fn settlement_ok() {
132150
// happy case
133151
let m = mock_message(200).create();
134152
let mut settlement = test_service();
135-
let fulfill: Fulfill = block_on(
136-
settlement.handle_request(IncomingRequest {
153+
let fulfill: Fulfill = settlement
154+
.handle_request(IncomingRequest {
137155
from: TEST_ACCOUNT_0.clone(),
138156
prepare: PrepareBuilder {
139157
amount: 0,
@@ -143,22 +161,22 @@ mod tests {
143161
execution_condition: &[0; 32],
144162
}
145163
.build(),
146-
}),
147-
)
148-
.unwrap();
164+
})
165+
.await
166+
.unwrap();
149167

150168
m.assert();
151169
assert_eq!(fulfill.data(), BODY.as_bytes());
152170
assert_eq!(fulfill.fulfillment(), &[0; 32]);
153171
}
154172

155-
#[test]
156-
fn gets_forwarded_if_destination_not_engine_() {
173+
#[tokio::test]
174+
async fn gets_forwarded_if_destination_not_engine_() {
157175
let m = mock_message(200).create().expect(0);
158176
let mut settlement = test_service();
159177
let destination = Address::from_str("example.some.address").unwrap();
160-
let reject: Reject = block_on(
161-
settlement.handle_request(IncomingRequest {
178+
let reject: Reject = settlement
179+
.handle_request(IncomingRequest {
162180
from: TEST_ACCOUNT_0.clone(),
163181
prepare: PrepareBuilder {
164182
amount: 0,
@@ -168,24 +186,24 @@ mod tests {
168186
execution_condition: &[0; 32],
169187
}
170188
.build(),
171-
}),
172-
)
173-
.unwrap_err();
189+
})
190+
.await
191+
.unwrap_err();
174192

175193
m.assert();
176194
assert_eq!(reject.code(), ErrorCode::F02_UNREACHABLE);
177195
assert_eq!(reject.triggered_by().unwrap(), SERVICE_ADDRESS.clone());
178196
assert_eq!(reject.message(), b"No other incoming handler!" as &[u8],);
179197
}
180198

181-
#[test]
182-
fn account_does_not_have_settlement_engine() {
199+
#[tokio::test]
200+
async fn account_does_not_have_settlement_engine() {
183201
let m = mock_message(200).create().expect(0);
184202
let mut settlement = test_service();
185203
let mut acc = TEST_ACCOUNT_0.clone();
186204
acc.no_details = true; // Hide the settlement engine data from the account
187-
let reject: Reject = block_on(
188-
settlement.handle_request(IncomingRequest {
205+
let reject: Reject = settlement
206+
.handle_request(IncomingRequest {
189207
from: acc.clone(),
190208
prepare: PrepareBuilder {
191209
amount: 0,
@@ -195,25 +213,25 @@ mod tests {
195213
execution_condition: &[0; 32],
196214
}
197215
.build(),
198-
}),
199-
)
200-
.unwrap_err();
216+
})
217+
.await
218+
.unwrap_err();
201219

202220
m.assert();
203221
assert_eq!(reject.code(), ErrorCode::F02_UNREACHABLE);
204222
assert_eq!(reject.triggered_by().unwrap(), SERVICE_ADDRESS.clone());
205223
assert_eq!(reject.message(), b"No other incoming handler!");
206224
}
207225

208-
#[test]
209-
fn settlement_engine_rejects() {
226+
#[tokio::test]
227+
async fn settlement_engine_rejects() {
210228
// for whatever reason the engine rejects our request with a 500 code
211229
let error_code = 500;
212230
let error_str = "Internal Server Error";
213231
let m = mock_message(error_code).create();
214232
let mut settlement = test_service();
215-
let reject: Reject = block_on(
216-
settlement.handle_request(IncomingRequest {
233+
let reject: Reject = settlement
234+
.handle_request(IncomingRequest {
217235
from: TEST_ACCOUNT_0.clone(),
218236
prepare: PrepareBuilder {
219237
amount: 0,
@@ -223,9 +241,9 @@ mod tests {
223241
execution_condition: &[0; 32],
224242
}
225243
.build(),
226-
}),
227-
)
228-
.unwrap_err();
244+
})
245+
.await
246+
.unwrap_err();
229247

230248
m.assert();
231249
assert_eq!(reject.code(), ErrorCode::T00_INTERNAL_ERROR);

0 commit comments

Comments
 (0)