diff --git a/cli/src/session.rs b/cli/src/session.rs index ebfceaf25..7ea90a88e 100644 --- a/cli/src/session.rs +++ b/cli/src/session.rs @@ -12,6 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; +use std::io::BufRead; +use std::path::Path; +use std::sync::Arc; + use anyhow::anyhow; use anyhow::Result; use async_recursion::async_recursion; @@ -26,12 +31,8 @@ use rustyline::config::Builder; use rustyline::error::ReadlineError; use rustyline::history::DefaultHistory; use rustyline::{CompletionType, Editor}; -use std::collections::BTreeMap; -use std::io::BufRead; -use std::path::Path; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; -use std::sync::Arc; use tokio::fs::{remove_file, File}; use tokio::io::AsyncWriteExt; use tokio::task::JoinHandle; diff --git a/core/src/presign.rs b/core/src/presign.rs index b3b6e4c74..72be5ea24 100644 --- a/core/src/presign.rs +++ b/core/src/presign.rs @@ -14,6 +14,7 @@ use std::{collections::BTreeMap, path::Path}; +use log::info; use reqwest::{Body, Client as HttpClient, StatusCode}; use tokio::io::AsyncRead; use tokio::io::AsyncWriteExt; @@ -43,6 +44,7 @@ pub async fn presign_upload_to_stage( data: Reader, size: u64, ) -> Result<()> { + info!("upload to stage with presigned url, size: {}", size); let client = HttpClient::new(); let mut builder = client.put(presigned.url); for (k, v) in presigned.headers { diff --git a/core/tests/core/stage.rs b/core/tests/core/stage.rs index ec1e5330a..7b3c5d0ab 100644 --- a/core/tests/core/stage.rs +++ b/core/tests/core/stage.rs @@ -13,6 +13,7 @@ // limitations under the License. use tokio::fs::File; +use tokio::io::BufReader; use databend_client::APIClient; @@ -32,6 +33,7 @@ async fn insert_with_stage(presign: bool) { let file = File::open("tests/core/data/sample.csv").await.unwrap(); let metadata = file.metadata().await.unwrap(); + let data = BufReader::new(file); let path = chrono::Utc::now().format("%Y%m%d%H%M%S%9f").to_string(); let stage_location = format!("@~/{}/sample.csv", path); @@ -42,7 +44,7 @@ async fn insert_with_stage(presign: bool) { }; client - .upload_to_stage(&stage_location, Box::new(file), metadata.len()) + .upload_to_stage(&stage_location, Box::new(data), metadata.len()) .await .unwrap(); let sql = format!( diff --git a/driver/src/conn.rs b/driver/src/conn.rs index 4eedbedb5..fef14b73e 100644 --- a/driver/src/conn.rs +++ b/driver/src/conn.rs @@ -17,9 +17,10 @@ use std::path::Path; use std::sync::Arc; use async_trait::async_trait; -use databend_driver_core::raw_rows::{RawRow, RawRowIterator}; use once_cell::sync::Lazy; +use tokio::fs::File; use tokio::io::AsyncRead; +use tokio::io::BufReader; use tokio_stream::StreamExt; use url::Url; @@ -29,6 +30,7 @@ use crate::flight_sql::FlightSQLConnection; use databend_client::StageLocation; use databend_client::{presign_download_from_stage, PresignedResponse}; use databend_driver_core::error::{Error, Result}; +use databend_driver_core::raw_rows::{RawRow, RawRowIterator}; use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats}; use databend_driver_core::schema::{DataType, Field, NumberDataType, Schema}; use databend_driver_core::value::{NumberValue, Value}; @@ -152,6 +154,7 @@ pub trait Connection: Send + Sync { file_format_options: Option>, copy_options: Option>, ) -> Result; + async fn load_file( &self, sql: &str, @@ -159,6 +162,7 @@ pub trait Connection: Send + Sync { format_options: Option>, copy_options: Option>, ) -> Result; + async fn stream_load(&self, sql: &str, data: Vec>) -> Result; // PUT file:/// internalStage|externalStage @@ -180,8 +184,9 @@ pub trait Connection: Send + Sync { Error::BadArgument(format!("Invalid local file path: {:?}", entry)) })?; let stage_file = stage_location.file_path(filename); - let data = tokio::fs::File::open(&entry).await?; - let size = data.metadata().await?.len(); + let file = File::open(&entry).await?; + let size = file.metadata().await?.len(); + let data = BufReader::new(file); let (fname, status) = match self .upload_to_stage(&stage_file, Box::new(data), size) .await diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index 0be0ef201..be0bb1fae 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -23,16 +23,16 @@ use std::task::{Context, Poll}; use async_compression::tokio::write::ZstdEncoder; use async_trait::async_trait; -use databend_driver_core::raw_rows::{RawRow, RawRowIterator, RawRowWithStats}; use log::info; use tokio::fs::File; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio_stream::Stream; use databend_client::PresignedResponse; use databend_client::QueryResponse; use databend_client::{APIClient, SchemaField}; use databend_driver_core::error::{Error, Result}; +use databend_driver_core::raw_rows::{RawRow, RawRowIterator, RawRowWithStats}; use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats}; use databend_driver_core::schema::{Schema, SchemaRef}; @@ -191,8 +191,8 @@ impl Connection for RestAPIConnection { ); let file = File::open(fp).await?; let metadata = file.metadata().await?; - let data = Box::new(file); let size = metadata.len(); + let data = BufReader::new(file); let mut format_options = format_options.unwrap_or_else(Self::default_file_format_options); if !format_options.contains_key("type") { let file_type = fp @@ -202,8 +202,14 @@ impl Connection for RestAPIConnection { .ok_or_else(|| Error::BadArgument("file type empty".to_string()))?; format_options.insert("type", file_type); } - self.load_data(sql, data, size, Some(format_options), copy_options) - .await + self.load_data( + sql, + Box::new(data), + size, + Some(format_options), + copy_options, + ) + .await } async fn stream_load(&self, sql: &str, data: Vec>) -> Result { diff --git a/ttc/Cargo.toml b/ttc/Cargo.toml index a2c859151..a9244be47 100644 --- a/ttc/Cargo.toml +++ b/ttc/Cargo.toml @@ -11,9 +11,10 @@ authors = { workspace = true } repository = { workspace = true } [dependencies] +databend-driver = { workspace = true } + bytes = "1" clap = { version = "4.4", features = ["derive", "env"] } -databend-driver = { workspace = true } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] } tokio = { version = "1.34", features = [