Skip to content

Commit

Permalink
fix: disable auto compression, which break stream load (#568)
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Jan 8, 2025
1 parent 73d765c commit e266f44
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 26 deletions.
1 change: 0 additions & 1 deletion driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ databend-driver-macros = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true, optional = true }

async-compression = { version = "0.4", features = ["tokio", "zstd"] }
async-trait = "0.1"
chrono = { version = "0.4.35", default-features = false, features = ["clock"] }
csv = "1.3"
Expand Down
27 changes: 2 additions & 25 deletions driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@

use std::collections::{BTreeMap, VecDeque};
use std::future::Future;
use std::io::Cursor;
use std::marker::PhantomData;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use async_compression::tokio::write::ZstdEncoder;
use async_trait::async_trait;
use log::info;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::io::BufReader;
use tokio_stream::Stream;

use databend_client::PresignedResponse;
Expand Down Expand Up @@ -145,31 +143,10 @@ impl Connection for RestAPIConnection {
.ok_or_else(|| Error::IO("Failed to get current timestamp".to_string()))?;
let stage = format!("@~/client/load/{}", now);

let mut file_format_options =
let file_format_options =
file_format_options.unwrap_or_else(Self::default_file_format_options);
let copy_options = copy_options.unwrap_or_else(Self::default_copy_options);

let mut data = data;
let mut size = size;

if !file_format_options.contains_key("compression") {
let mut buffer = Vec::new();
let real_size = data.read_to_end(&mut buffer).await?;
if real_size != size as usize && size != 0 {
return Err(Error::IO(format!(
"Failed to read all data, expected: {}, read: {}",
size, real_size
)));
}
let mut encoder = ZstdEncoder::new(Vec::new());
encoder.write_all(&buffer).await?;
encoder.shutdown().await?;
file_format_options.insert("compression", "ZSTD");
let output = encoder.into_inner();
size = output.len() as u64;
data = Box::new(Cursor::new(output))
}

self.upload_to_stage(&stage, data, size).await?;
let resp = self
.client
Expand Down

0 comments on commit e266f44

Please sign in to comment.