@@ -155,7 +155,7 @@ impl ObjectStorageProvider for GcsConfig {
155
155
}
156
156
157
157
fn register_store_metrics ( & self , handler : & actix_web_prometheus:: PrometheusMetrics ) {
158
- self . register_metrics ( handler)
158
+ self . register_metrics ( handler) ;
159
159
}
160
160
161
161
fn get_object_store ( & self ) -> Arc < dyn ObjectStorage > {
@@ -323,29 +323,16 @@ impl Gcs {
323
323
async fn _upload_file ( & self , key : & str , path : & Path ) -> Result < ( ) , ObjectStorageError > {
324
324
let instant = Instant :: now ( ) ;
325
325
326
- // // TODO: Uncomment this when multipart is fixed
327
- // let should_multipart = std::fs::metadata(path)?.len() > MULTIPART_UPLOAD_SIZE as u64;
326
+ let bytes = tokio:: fs:: read ( path) . await ?;
327
+ let result = self . client . put ( & key. into ( ) , bytes. into ( ) ) . await ?;
328
+ info ! ( "Uploaded file to GCS: {:?}" , result) ;
328
329
329
- let should_multipart = false ;
330
-
331
- let res = if should_multipart {
332
- // self._upload_multipart(key, path).await
333
- // this branch will never get executed
334
- Ok ( ( ) )
335
- } else {
336
- let bytes = tokio:: fs:: read ( path) . await ?;
337
- let result = self . client . put ( & key. into ( ) , bytes. into ( ) ) . await ?;
338
- info ! ( "Uploaded file to GCS: {:?}" , result) ;
339
- Ok ( ( ) )
340
- } ;
341
-
342
- let status = if res. is_ok ( ) { "200" } else { "400" } ;
343
330
let time = instant. elapsed ( ) . as_secs_f64 ( ) ;
344
331
REQUEST_RESPONSE_TIME
345
- . with_label_values ( & [ "UPLOAD_PARQUET" , status ] )
332
+ . with_label_values ( & [ "UPLOAD_PARQUET" , "200" ] )
346
333
. observe ( time) ;
347
334
348
- res
335
+ Ok ( ( ) )
349
336
}
350
337
351
338
async fn _upload_multipart (
@@ -364,15 +351,11 @@ impl Gcs {
364
351
let mut data = Vec :: new ( ) ;
365
352
file. read_to_end ( & mut data) . await ?;
366
353
self . client . put ( location, data. into ( ) ) . await ?;
367
- // async_writer.put_part(data.into()).await?;
368
- // async_writer.complete().await?;
369
354
return Ok ( ( ) ) ;
370
355
} else {
371
356
let mut data = Vec :: new ( ) ;
372
357
file. read_to_end ( & mut data) . await ?;
373
358
374
- // let mut upload_parts = Vec::new();
375
-
376
359
let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0 ;
377
360
let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE ;
378
361
let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 } ;
@@ -393,8 +376,6 @@ impl Gcs {
393
376
394
377
// Upload the part
395
378
async_writer. put_part ( part_data. into ( ) ) . await ?;
396
-
397
- // upload_parts.push(part_number as u64 + 1);
398
379
}
399
380
if let Err ( err) = async_writer. complete ( ) . await {
400
381
error ! ( "Failed to complete multipart upload. {:?}" , err) ;
0 commit comments