Skip to content

Commit 76cd44d

Browse files
committed
Auto merge of #716 - Mark-Simulacrum:tmpfile-backing, r=Mark-Simulacrum
Back largest archive by temporary files This hopefully resolves the OOMs we see in production each time we try to write a report by keeping the archive on-disk rather than in-memory while we write it. Mid-to-long term I'd like to avoid the mmap and temporary file and instead stream the writing to S3, but this is what I had time to throw together today. I think it should work, though I'm a bit uncertain about the disk backing for temporary files.
2 parents 434a549 + ff33518 commit 76cd44d

File tree

9 files changed

+451
-156
lines changed

9 files changed

+451
-156
lines changed

Diff for: Cargo.lock

+335-105
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: Cargo.toml

+3-4
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,10 @@ prometheus = "0.13.3"
5656
cargo_metadata = "0.18.1"
5757
indexmap = { version = "2.0.2", features = ["serde"] }
5858
tokio = "1.24"
59-
aws-types = "0.56.1"
60-
aws-credential-types = "0.56.1"
61-
aws-smithy-async = "0.56.1"
62-
aws-sdk-s3 = "0.34"
59+
aws-sdk-s3 = "1.7"
60+
aws-config = { version = "1", features = ["behavior-version-latest"] }
6361
thiserror = "1.0.38"
62+
nix = { version = "0.27.1", features = ["mman"] }
6463

6564
[dev-dependencies]
6665
assert_cmd = "2.0.4"

Diff for: src/report/archives.rs

+79-6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
use std::fs::File;
2+
use std::num::NonZeroUsize;
3+
use std::ptr::NonNull;
4+
15
use crate::config::Config;
26
use crate::crates::Crate;
37
use crate::experiments::Experiment;
@@ -7,6 +11,52 @@ use crate::results::{EncodedLog, EncodingType, ReadResults};
711
use flate2::{write::GzEncoder, Compression};
812
use indexmap::IndexMap;
913
use tar::{Builder as TarBuilder, Header as TarHeader};
14+
use tempfile::tempfile;
15+
16+
#[cfg(unix)]
17+
struct TempfileBackedBuffer {
18+
_file: File,
19+
mmap: NonNull<[u8]>,
20+
}
21+
22+
#[cfg(unix)]
23+
impl TempfileBackedBuffer {
24+
fn new(file: File) -> Fallible<TempfileBackedBuffer> {
25+
let len = file.metadata()?.len().try_into().unwrap();
26+
unsafe {
27+
let base = nix::sys::mman::mmap(
28+
None,
29+
NonZeroUsize::new(len).unwrap(),
30+
nix::sys::mman::ProtFlags::PROT_READ,
31+
nix::sys::mman::MapFlags::MAP_PRIVATE,
32+
Some(&file),
33+
0,
34+
)?;
35+
let Some(base) = NonNull::new(base as *mut u8) else {
36+
panic!("Failed to map file");
37+
};
38+
Ok(TempfileBackedBuffer {
39+
_file: file,
40+
mmap: NonNull::slice_from_raw_parts(base, len),
41+
})
42+
}
43+
}
44+
45+
fn buffer(&self) -> &[u8] {
46+
unsafe { self.mmap.as_ref() }
47+
}
48+
}
49+
50+
#[cfg(unix)]
51+
impl Drop for TempfileBackedBuffer {
52+
fn drop(&mut self) {
53+
unsafe {
54+
if let Err(e) = nix::sys::mman::munmap(self.mmap.as_ptr() as *mut _, self.mmap.len()) {
55+
eprintln!("Failed to unmap temporary file: {:?}", e);
56+
}
57+
}
58+
}
59+
}
1060

1161
#[derive(Serialize)]
1262
pub struct Archive {
@@ -92,6 +142,7 @@ fn iterate<'a, DB: ReadResults + 'a>(
92142
})
93143
}
94144

145+
#[allow(unused_mut)]
95146
fn write_all_archive<DB: ReadResults, W: ReportWriter>(
96147
db: &DB,
97148
ex: &Experiment,
@@ -100,18 +151,37 @@ fn write_all_archive<DB: ReadResults, W: ReportWriter>(
100151
config: &Config,
101152
) -> Fallible<Archive> {
102153
for i in 1..=RETRIES {
103-
let mut all = TarBuilder::new(GzEncoder::new(Vec::new(), Compression::default()));
154+
// We write this large-ish tarball into a tempfile, which moves the I/O to disk operations
155+
// rather than keeping it in memory. This avoids complicating the code by doing incremental
156+
// writes to S3 (requiring buffer management etc) while avoiding keeping the blob entirely
157+
// in memory.
158+
let backing = tempfile()?;
159+
let mut all = TarBuilder::new(GzEncoder::new(backing, Compression::default()));
104160
for entry in iterate(db, ex, crates, config) {
105161
let entry = entry?;
106162
let mut header = entry.header();
107163
all.append_data(&mut header, &entry.path, &entry.log_bytes[..])?;
108164
}
109165

110-
let data = all.into_inner()?.finish()?;
111-
let len = data.len();
166+
let mut data = all.into_inner()?.finish()?;
167+
let mut buffer;
168+
let view;
169+
#[cfg(unix)]
170+
{
171+
buffer = TempfileBackedBuffer::new(data)?;
172+
view = buffer.buffer();
173+
}
174+
#[cfg(not(unix))]
175+
{
176+
use std::io::{Read, Seek};
177+
data.rewind()?;
178+
buffer = Vec::new();
179+
data.read_to_end(&mut buffer)?;
180+
view = &buffer[..];
181+
}
112182
match dest.write_bytes(
113183
"logs-archives/all.tar.gz",
114-
data,
184+
view,
115185
&"application/gzip".parse().unwrap(),
116186
EncodingType::Plain,
117187
) {
@@ -123,7 +193,10 @@ fn write_all_archive<DB: ReadResults, W: ReportWriter>(
123193
std::thread::sleep(std::time::Duration::from_secs(2));
124194
warn!(
125195
"retry ({}/{}) writing logs-archives/all.tar.gz ({} bytes) (error: {:?})",
126-
i, RETRIES, len, e,
196+
i,
197+
RETRIES,
198+
view.len(),
199+
e,
127200
);
128201
continue;
129202
}
@@ -164,7 +237,7 @@ pub fn write_logs_archives<DB: ReadResults, W: ReportWriter>(
164237
let data = archive.into_inner()?.finish()?;
165238
dest.write_bytes(
166239
format!("logs-archives/{comparison}.tar.gz"),
167-
data,
240+
&data,
168241
&"application/gzip".parse().unwrap(),
169242
EncodingType::Plain,
170243
)?;

Diff for: src/report/html.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -298,13 +298,13 @@ pub fn write_html_report<W: ReportWriter>(
298298
info!("copying static assets");
299299
dest.write_bytes(
300300
"report.js",
301-
js_in.content()?.into_owned(),
301+
&js_in.content()?,
302302
js_in.mime(),
303303
EncodingType::Plain,
304304
)?;
305305
dest.write_bytes(
306306
"report.css",
307-
css_in.content()?.into_owned(),
307+
&css_in.content()?,
308308
css_in.mime(),
309309
EncodingType::Plain,
310310
)?;

Diff for: src/report/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ fn write_logs<DB: ReadResults, W: ReportWriter>(
293293
s.spawn(move || {
294294
while let Ok((log_path, data, encoding)) = rx.recv() {
295295
if let Err(e) =
296-
dest.write_bytes(log_path, data, &mime::TEXT_PLAIN_UTF_8, encoding)
296+
dest.write_bytes(log_path, &data, &mime::TEXT_PLAIN_UTF_8, encoding)
297297
{
298298
errors.lock().unwrap().push(e);
299299
}
@@ -548,7 +548,7 @@ pub trait ReportWriter: Send + Sync {
548548
fn write_bytes<P: AsRef<Path>>(
549549
&self,
550550
path: P,
551-
b: Vec<u8>,
551+
b: &[u8],
552552
mime: &Mime,
553553
encoding_type: EncodingType,
554554
) -> Fallible<()>;
@@ -574,7 +574,7 @@ impl ReportWriter for FileWriter {
574574
fn write_bytes<P: AsRef<Path>>(
575575
&self,
576576
path: P,
577-
b: Vec<u8>,
577+
b: &[u8],
578578
_: &Mime,
579579
_: EncodingType,
580580
) -> Fallible<()> {
@@ -619,14 +619,14 @@ impl ReportWriter for DummyWriter {
619619
fn write_bytes<P: AsRef<Path>>(
620620
&self,
621621
path: P,
622-
b: Vec<u8>,
622+
b: &[u8],
623623
mime: &Mime,
624624
_: EncodingType,
625625
) -> Fallible<()> {
626626
self.results
627627
.lock()
628628
.unwrap()
629-
.insert((path.as_ref().to_path_buf(), mime.clone()), b);
629+
.insert((path.as_ref().to_path_buf(), mime.clone()), b.to_vec());
630630
Ok(())
631631
}
632632

Diff for: src/report/s3.rs

+9-7
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,12 @@ impl ReportWriter for S3Writer {
7777
fn write_bytes<P: AsRef<Path>>(
7878
&self,
7979
path: P,
80-
s: Vec<u8>,
80+
body: &[u8],
8181
mime: &Mime,
8282
encoding_type: EncodingType,
8383
) -> Fallible<()> {
8484
// At least 50 MB, then use a multipart upload...
85-
if s.len() >= 50 * 1024 * 1024 {
85+
if body.len() >= 50 * 1024 * 1024 {
8686
let mut request = self
8787
.client
8888
.create_multipart_upload()
@@ -108,12 +108,12 @@ impl ReportWriter for S3Writer {
108108
};
109109

110110
let chunk_size = 20 * 1024 * 1024;
111-
let bytes = bytes::Bytes::from(s);
112111
let mut part = 1;
113112
let mut start = 0;
114113
let mut parts = aws_sdk_s3::types::CompletedMultipartUpload::builder();
115-
while start < bytes.len() {
116-
let chunk = bytes.slice(start..std::cmp::min(start + chunk_size, bytes.len()));
114+
while start < body.len() {
115+
let chunk = &body[start..std::cmp::min(start + chunk_size, body.len())];
116+
let chunk = bytes::Bytes::copy_from_slice(chunk);
117117

118118
let request = self
119119
.client
@@ -160,7 +160,9 @@ impl ReportWriter for S3Writer {
160160
let mut request = self
161161
.client
162162
.put_object()
163-
.body(aws_sdk_s3::primitives::ByteStream::from(s))
163+
.body(aws_sdk_s3::primitives::ByteStream::from(
164+
bytes::Bytes::copy_from_slice(body),
165+
))
164166
.acl(aws_sdk_s3::types::ObjectCannedAcl::PublicRead)
165167
.key(format!(
166168
"{}/{}",
@@ -185,7 +187,7 @@ impl ReportWriter for S3Writer {
185187
}
186188

187189
fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, mime: &Mime) -> Fallible<()> {
188-
self.write_bytes(path, s.into_owned().into_bytes(), mime, EncodingType::Plain)
190+
self.write_bytes(path, s.as_bytes(), mime, EncodingType::Plain)
189191
}
190192
}
191193

Diff for: src/results/dummy.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ impl DummyDB {
2727
pub fn add_dummy_log(&mut self, ex: &Experiment, krate: Crate, tc: Toolchain, log: EncodedLog) {
2828
self.experiments
2929
.entry(ex.name.to_string())
30-
.or_insert_with(DummyData::default)
30+
.or_default()
3131
.logs
3232
.insert((krate, tc), log);
3333
}
@@ -41,7 +41,7 @@ impl DummyDB {
4141
) {
4242
self.experiments
4343
.entry(ex.name.to_string())
44-
.or_insert_with(DummyData::default)
44+
.or_default()
4545
.results
4646
.insert((krate, tc), res);
4747
}

Diff for: src/server/reports.rs

+9-14
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use aws_sdk_s3::config::retry::RetryConfig;
2-
31
use crate::experiments::{Experiment, Status};
42
use crate::prelude::*;
53
use crate::report::{self, Comparison, TestResults};
@@ -17,24 +15,21 @@ use super::tokens::BucketRegion;
1715
const AUTOMATIC_THREAD_WAKEUP: u64 = 600;
1816

1917
fn generate_report(data: &Data, ex: &Experiment, results: &DatabaseDB) -> Fallible<TestResults> {
20-
let mut config = aws_types::SdkConfig::builder();
18+
let mut config = aws_config::from_env();
2119
match &data.tokens.reports_bucket.region {
2220
BucketRegion::S3 { region } => {
23-
config.set_region(Some(aws_types::region::Region::new(region.to_owned())));
21+
config = config.region(aws_sdk_s3::config::Region::new(region.to_owned()));
2422
}
2523
BucketRegion::Custom { url } => {
26-
config.set_region(Some(aws_types::region::Region::from_static("us-east-1")));
27-
config.set_endpoint_url(Some(url.clone()));
24+
config = config.region(aws_sdk_s3::config::Region::from_static("us-east-1"));
25+
config = config.endpoint_url(url.clone());
2826
}
2927
}
30-
config.set_credentials_provider(Some(data.tokens.reports_bucket.to_aws_credentials()));
31-
// https://github.com/awslabs/aws-sdk-rust/issues/586 -- without this, the
32-
// SDK will just completely not retry requests.
33-
config.set_sleep_impl(Some(aws_sdk_s3::config::SharedAsyncSleep::new(
34-
aws_smithy_async::rt::sleep::TokioSleep::new(),
35-
)));
36-
config.set_retry_config(Some(RetryConfig::standard()));
37-
let config = config.build();
28+
config = config.credentials_provider(data.tokens.reports_bucket.to_aws_credentials());
29+
let config = tokio::runtime::Builder::new_current_thread()
30+
.enable_all()
31+
.build()?
32+
.block_on(config.load());
3833
let client = aws_sdk_s3::Client::new(&config);
3934
let writer = report::S3Writer::create(
4035
client,

Diff for: src/server/tokens.rs

+7-11
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,13 @@ pub struct ReportsBucket {
3030
}
3131

3232
impl ReportsBucket {
33-
pub(crate) fn to_aws_credentials(
34-
&self,
35-
) -> aws_credential_types::provider::SharedCredentialsProvider {
36-
aws_credential_types::provider::SharedCredentialsProvider::new(
37-
aws_sdk_s3::config::Credentials::new(
38-
self.access_key.clone(),
39-
self.secret_key.clone(),
40-
None,
41-
None,
42-
"crater-credentials",
43-
),
33+
pub(crate) fn to_aws_credentials(&self) -> aws_sdk_s3::config::Credentials {
34+
aws_sdk_s3::config::Credentials::new(
35+
self.access_key.clone(),
36+
self.secret_key.clone(),
37+
None,
38+
None,
39+
"crater-credentials",
4440
)
4541
}
4642
}

0 commit comments

Comments
 (0)