Skip to content

Commit 862280a

Browse files
committed
Merge #160: fix: fix batch ordering issue
a3488f4 ci: bump actions/cache to v4 (valued mammal) 0ee0a6a test: add batch response ordering test (marshallyale) 6721929 fix: fix batch ordering issue (marshallyale) Pull request description: Fixes issue #75 This is my first open source pull request so I apologize for any formatting issues. Additionally, I don't know the repository as well as others so there may be a better way to implement the fix. I believe I found the root cause of this. I added a pull request to fix, but I'm going to copy/paste what I believe is causing the error. The main issue in the code is inside raw_client.rs inside the `recv` method implementation (snippet below): https://github.com/bitcoindevkit/rust-electrum-client/blob/805ea0af307e6465f23c2d7f25a32d7ff61fe7ec/src/raw_client.rs#L671-L685 When this is first called, the `self._reader_thread` will run. Inside the `self._reader_thread`, if the request id matches the response id, everything works fine. However, if the request id does not match the response id, we run the following code: https://github.com/bitcoindevkit/rust-electrum-client/blob/805ea0af307e6465f23c2d7f25a32d7ff61fe7ec/src/raw_client.rs#L602-L612 The channel that the response is sent back into is not unique, but rather all the channels share the same sender.clone() and receiver. The only validation that is done is to check that the request id is still being searched for inside `self.waiting_map`. This means that the receiver channel receives whatever the next response is into the channel without any validation that it matches the request id which happens here `match receiver.recv()?`. This is fixed by implementing unique channels for every request id. This fix can be verified with the code johnzweng used to show the issue If you run this with the initial code, it will error out after 1-10 cycles normally. However, after the fix this runs indefinitely. ACKs for top commit: ValuedMammal: reACK a3488f4 Tree-SHA512: c56d572c0d9e709352fde0c0438103fe4c0338e4b591d5290468b1658d6d73dbc818044e1b7ea6307e449a8d4380d9deba6adf2b89eb1dcbc119cec277fd721c
2 parents 372eda9 + a3488f4 commit 862280a

File tree

2 files changed

+36
-16
lines changed

2 files changed

+36
-16
lines changed

.github/workflows/cont_integration.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ jobs:
1717
- name: Checkout
1818
uses: actions/checkout@v4
1919
- name: Cache
20-
uses: actions/cache@v2
20+
uses: actions/cache@v4
2121
with:
2222
path: |
2323
~/.cargo/registry

src/raw_client.rs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! This module contains the definition of the raw client that wraps the transport method
44
55
use std::borrow::Borrow;
6-
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
6+
use std::collections::{BTreeMap, HashMap, VecDeque};
77
use std::io::{BufRead, BufReader, Read, Write};
88
use std::mem::drop;
99
use std::net::{TcpStream, ToSocketAddrs};
@@ -539,11 +539,10 @@ impl<S: Read + Write> RawClient<S> {
539539

540540
if let Some(until_message) = until_message {
541541
// If we are trying to start a reader thread but the corresponding sender is
542-
// missing from the map, exit immediately. This can happen with batch calls,
543-
// since the sender is shared for all the individual queries in a call. We
544-
// might have already received a response for that id, but we don't know it
545-
// yet. Exiting here forces the calling code to fallback to the sender-receiver
546-
// method, and it should find a message there waiting for it.
542+
// missing from the map, exit immediately. We might have already received a
543+
// response for that id, but we don't know it yet. Exiting here forces the
544+
// calling code to fallback to the sender-receiver method, and it should find
545+
// a message there waiting for it.
547546
if self.waiting_map.lock()?.get(&until_message).is_none() {
548547
return Err(Error::CouldntLockReader);
549548
}
@@ -762,22 +761,23 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
762761
fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
763762
let mut raw = Vec::new();
764763

765-
let mut missing_responses = BTreeSet::new();
764+
let mut missing_responses = Vec::new();
766765
let mut answers = BTreeMap::new();
767766

768-
// Add our listener to the map before we send the request, Here we will clone the sender
769-
// for every request id, so that we only have to monitor one receiver.
770-
let (sender, receiver) = channel();
767+
// Add our listener to the map before we send the request
771768

772769
for (method, params) in batch.iter() {
773770
let req = Request::new_id(
774771
self.last_id.fetch_add(1, Ordering::SeqCst),
775772
method,
776773
params.to_vec(),
777774
);
778-
missing_responses.insert(req.id);
775+
// Add distinct channel to each request so when we remove our request id (and sender) from the waiting_map
776+
// we can be sure that the response gets sent to the correct channel in self.recv
777+
let (sender, receiver) = channel();
778+
missing_responses.push((req.id, receiver));
779779

780-
self.waiting_map.lock()?.insert(req.id, sender.clone());
780+
self.waiting_map.lock()?.insert(req.id, sender);
781781

782782
raw.append(&mut serde_json::to_vec(&req)?);
783783
raw.extend_from_slice(b"\n");
@@ -796,16 +796,16 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
796796

797797
self.increment_calls();
798798

799-
for req_id in missing_responses.iter() {
800-
match self.recv(&receiver, *req_id) {
799+
for (req_id, receiver) in missing_responses.iter() {
800+
match self.recv(receiver, *req_id) {
801801
Ok(mut resp) => answers.insert(req_id, resp["result"].take()),
802802
Err(e) => {
803803
// In case of error our sender could still be left in the map, depending on where
804804
// the error happened. Just in case, try to remove it here
805805
warn!("got error for req_id {}: {:?}", req_id, e);
806806
warn!("removing all waiting req of this batch");
807807
let mut guard = self.waiting_map.lock()?;
808-
for req_id in missing_responses.iter() {
808+
for (req_id, _) in missing_responses.iter() {
809809
guard.remove(req_id);
810810
}
811811
return Err(e);
@@ -1190,6 +1190,26 @@ mod test {
11901190
assert_eq!(resp.hash_function, Some("sha256".into()));
11911191
assert_eq!(resp.pruning, None);
11921192
}
1193+
1194+
#[test]
1195+
#[ignore = "depends on a live server"]
1196+
fn test_batch_response_ordering() {
1197+
// The electrum.blockstream.info:50001 node always sends back ordered responses which will make this always pass.
1198+
// However, many servers do not, so we use one of those servers for this test.
1199+
let client = RawClient::new("exs.dyshek.org:50001", None).unwrap();
1200+
let heights: Vec<u32> = vec![1, 4, 8, 12, 222, 6666, 12];
1201+
let result_times = [
1202+
1231469665, 1231470988, 1231472743, 1231474888, 1231770653, 1236456633, 1231474888,
1203+
];
1204+
// Check ordering 10 times. This usually fails within 5 if ordering is incorrect.
1205+
for _ in 0..10 {
1206+
let results = client.batch_block_header(&heights).unwrap();
1207+
for (index, result) in results.iter().enumerate() {
1208+
assert_eq!(result_times[index], result.time);
1209+
}
1210+
}
1211+
}
1212+
11931213
#[test]
11941214
fn test_relay_fee() {
11951215
let client = RawClient::new(get_test_server(), None).unwrap();

0 commit comments

Comments
 (0)