Skip to content

Commit 90bd8aa

Browse files
Align Bytesable messages to u64 (#614)
* comment out failing doctest * Align Bytesable writes to u64 * Align ContainerBytes implementations with padding * Respond to comments
1 parent ef268e8 commit 90bd8aa

File tree

2 files changed

+83
-14
lines changed

2 files changed

+83
-14
lines changed

timely/src/dataflow/channels/mod.rs

Lines changed: 73 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ impl<T, C: Container> Message<T, C> {
5757
}
5858

5959
// Instructions for serialization of `Message`.
60-
// Intended to swap out the constraint on `C` for `C: Bytesable`.
60+
//
61+
// Serialization of each field is meant to be `u64` aligned, so that each has the ability
62+
// to be decoded using safe transmutation, e.g. `bytemuck`.
6163
impl<T, C> crate::communication::Bytesable for Message<T, C>
6264
where
6365
T: Serialize + for<'a> Deserialize<'a>,
@@ -69,24 +71,28 @@ where
6971
let from: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
7072
let seq: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
7173
let time: T = ::bincode::deserialize_from(&mut slice).expect("bincode::deserialize() failed");
72-
let bytes_read = bytes.len() - slice.len();
74+
let time_size = ::bincode::serialized_size(&time).expect("bincode::serialized_size() failed") as usize;
75+
// We expect to find the `data` payload at `8 + 8 + round_up(time_size)`;
76+
let bytes_read = 8 + 8 + ((time_size + 7) & !7);
7377
bytes.extract_to(bytes_read);
7478
let data: C = ContainerBytes::from_bytes(bytes);
7579
Self { time, data, from, seq }
7680
}
7781

7882
fn length_in_bytes(&self) -> usize {
83+
let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
7984
// 16 comes from the two `u64` fields: `from` and `seq`.
80-
16 +
81-
::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize +
82-
self.data.length_in_bytes()
85+
16 + ((time_size + 7) & !7) + self.data.length_in_bytes()
8386
}
8487

8588
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
8689
use byteorder::WriteBytesExt;
8790
writer.write_u64::<byteorder::LittleEndian>(self.from.try_into().unwrap()).unwrap();
8891
writer.write_u64::<byteorder::LittleEndian>(self.seq.try_into().unwrap()).unwrap();
8992
::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed");
93+
let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
94+
let time_slop = ((time_size + 7) & !7) - time_size;
95+
writer.write(&[0u8; 8][..time_slop]).unwrap();
9096
self.data.into_bytes(&mut *writer);
9197
}
9298
}
@@ -106,6 +112,8 @@ pub trait ContainerBytes {
106112

107113
mod implementations {
108114

115+
use std::io::Write;
116+
109117
use serde::{Serialize, Deserialize};
110118
use crate::dataflow::channels::ContainerBytes;
111119

@@ -115,11 +123,16 @@ mod implementations {
115123
}
116124

117125
fn length_in_bytes(&self) -> usize {
118-
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
126+
let length = ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize;
127+
(length + 7) & !7
119128
}
120129

121-
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
122-
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
130+
fn into_bytes<W: Write>(&self, writer: &mut W) {
131+
let mut counter = WriteCounter::new(writer);
132+
::bincode::serialize_into(&mut counter, &self).expect("bincode::serialize_into() failed");
133+
let written = counter.count;
134+
let written_slop = ((written + 7) & !7) - written;
135+
counter.write(&[0u8; 8][..written_slop]).unwrap();
123136
}
124137
}
125138

@@ -130,11 +143,60 @@ mod implementations {
130143
}
131144

132145
fn length_in_bytes(&self) -> usize {
133-
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
146+
let length = ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize;
147+
(length + 7) & !7
148+
}
149+
150+
fn into_bytes<W: Write>(&self, writer: &mut W) {
151+
let mut counter = WriteCounter::new(writer);
152+
::bincode::serialize_into(&mut counter, &self).expect("bincode::serialize_into() failed");
153+
let written = counter.count;
154+
let written_slop = ((written + 7) & !7) - written;
155+
counter.write(&[0u8; 8][..written_slop]).unwrap();
156+
}
157+
}
158+
159+
use write_counter::WriteCounter;
160+
/// A `Write` wrapper that counts the bytes written.
161+
mod write_counter {
162+
163+
use ::std::io::{Write, IoSlice, Result};
164+
use std::fmt::Arguments;
165+
166+
/// A write wrapper that tracks the bytes written.
167+
pub struct WriteCounter<W> {
168+
inner: W,
169+
pub count: usize,
170+
}
171+
172+
impl<W> WriteCounter<W> {
173+
/// Creates a new counter wrapper from a writer.
174+
pub fn new(inner: W) -> Self {
175+
Self { inner, count: 0 }
176+
}
134177
}
135178

136-
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
137-
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
179+
impl<W: Write> Write for WriteCounter<W> {
180+
fn write(&mut self, buf: &[u8]) -> Result<usize> {
181+
let written = self.inner.write(buf)?;
182+
self.count += written;
183+
Ok(written)
184+
}
185+
fn flush(&mut self) -> Result<()> {
186+
self.inner.flush()
187+
}
188+
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
189+
let written = self.inner.write_vectored(bufs)?;
190+
self.count += written;
191+
Ok(written)
192+
}
193+
fn write_all(&mut self, buf: &[u8]) -> Result<()> {
194+
self.count += buf.len();
195+
self.inner.write_all(buf)
196+
}
197+
fn write_fmt(&mut self, _fmt: Arguments<'_>) -> Result<()> {
198+
unimplemented!()
199+
}
138200
}
139201
}
140202
}

timely/src/lib.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,18 +141,25 @@ mod encoding {
141141
}
142142
}
143143

144+
// We will pad out anything we write to make the result `u64` aligned.
144145
impl<T: Data> Bytesable for Bincode<T> {
145146
fn from_bytes(bytes: Bytes) -> Self {
146147
let typed = ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed");
148+
let typed_size = ::bincode::serialized_size(&typed).expect("bincode::serialized_size() failed") as usize;
149+
assert_eq!(bytes.len(), (typed_size + 7) & !7);
147150
Bincode { payload: typed }
148151
}
149152

150153
fn length_in_bytes(&self) -> usize {
151-
::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize
154+
let typed_size = ::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize;
155+
(typed_size + 7) & !7
152156
}
153157

154-
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
155-
::bincode::serialize_into(writer, &self.payload).expect("bincode::serialize_into() failed");
158+
fn into_bytes<W: ::std::io::Write>(&self, mut writer: &mut W) {
159+
let typed_size = ::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize;
160+
let typed_slop = ((typed_size + 7) & !7) - typed_size;
161+
::bincode::serialize_into(&mut writer, &self.payload).expect("bincode::serialize_into() failed");
162+
writer.write(&[0u8; 8][..typed_slop]).unwrap();
156163
}
157164
}
158165

0 commit comments

Comments
 (0)