Skip to content

Commit cc16287

Browse files
committed
Complete integrations code
1 parent 6efddcc commit cc16287

File tree

3 files changed

+92
-142
lines changed

3 files changed

+92
-142
lines changed

datafusion-cli/src/catalog.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use std::any::Any;
1919
use std::sync::{Arc, Weak};
2020

21+
use crate::hf_store::HFOptions;
2122
use crate::object_storage::{get_object_store, AwsOptions, GcpOptions};
2223

2324
use datafusion::catalog::schema::SchemaProvider;
@@ -183,6 +184,9 @@ impl SchemaProvider for DynamicFileSchemaProvider {
183184
"gs" | "gcs" => {
184185
state = state.add_table_options_extension(GcpOptions::default())
185186
}
187+
"hf" => {
188+
state = state.add_table_options_extension(HFOptions::default());
189+
}
186190
_ => {}
187191
};
188192
let store = get_object_store(

datafusion-cli/src/hf_store.rs

Lines changed: 72 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,19 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use async_trait::async_trait;
19-
use bytes::Bytes;
2018
use datafusion::common::{config_err, Result};
2119
use datafusion::config::{
2220
ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, Visit,
2321
};
2422
use datafusion::error::DataFusionError;
25-
use futures::stream::BoxStream;
2623
use http::{header, HeaderMap};
2724
use object_store::http::{HttpBuilder, HttpStore};
28-
use object_store::path::Path;
29-
use object_store::{
30-
ClientOptions, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult
31-
};
25+
use object_store::ClientOptions;
26+
use url::Url;
3227
use std::any::Any;
3328
use std::env;
3429
use std::fmt::Display;
3530
use std::str::FromStr;
36-
use std::sync::Arc;
37-
use tokio::io::AsyncWrite;
3831

3932
pub const DEFAULT_ENDPOINT: &str = "https://huggingface.co";
4033

@@ -66,7 +59,6 @@ impl FromStr for HFConfigKey {
6659

6760
#[derive(Debug, Clone)]
6861
pub struct ParsedHFUrl {
69-
endpoint: Option<String>,
7062
path: Option<String>,
7163
repository: Option<String>,
7264
revision: Option<String>,
@@ -76,7 +68,6 @@ pub struct ParsedHFUrl {
7668
impl Default for ParsedHFUrl {
7769
fn default() -> Self {
7870
Self {
79-
endpoint: Some(DEFAULT_ENDPOINT.to_string()),
8071
path: None,
8172
repository: None,
8273
revision: Some("main".to_string()),
@@ -95,7 +86,7 @@ impl ParsedHFUrl {
9586
/// If the endpoint is not provided, it defaults to `https://huggingface.co`.
9687
///
9788
/// url: The HuggingFace URL to parse.
98-
pub fn parse(url: String, hf_options: HFOptions) -> Result<Self> {
89+
pub fn parse(url: String) -> Result<Self> {
9990
if !url.starts_with(Self::SCHEMA) {
10091
return config_err!(
10192
"Invalid HuggingFace URL: {}, only 'hf://' URLs are supported",
@@ -104,10 +95,6 @@ impl ParsedHFUrl {
10495
}
10596

10697
let mut parsed_url = Self::default();
107-
if let Some(endpoint) = hf_options.endpoint {
108-
parsed_url.endpoint = Some(endpoint);
109-
}
110-
11198
let mut last_delim = 5;
11299

113100
// parse repository type.
@@ -176,32 +163,29 @@ impl ParsedHFUrl {
176163
Ok(parsed_url)
177164
}
178165

179-
pub fn file_url(&self) -> Result<String> {
180-
let mut url = self.endpoint.clone().unwrap();
181-
url.push_str("/");
182-
url.push_str(self.repo_type.as_deref().unwrap());
183-
url.push_str("/");
166+
pub fn file_path(&self) -> String {
167+
let mut url = self.repo_type.clone().unwrap();
168+
url.push('/');
184169
url.push_str(self.repository.as_deref().unwrap());
185170
url.push_str("/resolve/");
186171
url.push_str(self.revision.as_deref().unwrap());
187-
url.push_str("/");
172+
url.push('/');
188173
url.push_str(self.path.as_deref().unwrap());
189174

190-
Ok(url)
175+
url
191176
}
192177

193-
pub fn tree_url(&self) -> Result<String> {
194-
let mut url = self.endpoint.clone().unwrap();
195-
url.push_str("/api/");
178+
pub fn tree_path(&self) -> String {
179+
let mut url = "api/".to_string();
196180
url.push_str(self.repo_type.as_deref().unwrap());
197-
url.push_str("/");
181+
url.push('/');
198182
url.push_str(self.repository.as_deref().unwrap());
199183
url.push_str("/tree/");
200184
url.push_str(self.revision.as_deref().unwrap());
201-
url.push_str("/");
185+
url.push('/');
202186
url.push_str(self.path.as_deref().unwrap());
203187

204-
Ok(url)
188+
url
205189
}
206190
}
207191

@@ -290,20 +274,27 @@ impl ExtensionOptions for HFOptions {
290274
pub struct HFStoreBuilder {
291275
endpoint: Option<String>,
292276
user_access_token: Option<String>,
277+
parsed_url: Option<ParsedHFUrl>,
293278
}
294279

295280
impl HFStoreBuilder {
296281
pub fn new() -> Self {
297282
Self::default()
298283
}
299284

300-
pub fn with_endpoint(mut self, endpoint: String) -> Self {
301-
self.endpoint = Some(endpoint);
285+
pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
286+
self.endpoint = Some(endpoint.into());
287+
302288
self
303289
}
304290

305-
pub fn with_user_access_token(mut self, user_access_token: String) -> Self {
306-
self.user_access_token = Some(user_access_token);
291+
pub fn with_user_access_token(mut self, user_access_token: impl Into<String>) -> Self {
292+
self.user_access_token = Some(user_access_token.into());
293+
self
294+
}
295+
296+
pub fn with_parsed_url(mut self, parsed_url: ParsedHFUrl) -> Self {
297+
self.parsed_url = Some(parsed_url);
307298
self
308299
}
309300

@@ -320,15 +311,25 @@ impl HFStoreBuilder {
320311
builder
321312
}
322313

323-
pub fn build(&self) -> Result<HFStore> {
324-
let mut inner_builder = HttpBuilder::new();
314+
pub fn build(&self) -> Result<HttpStore> {
315+
let mut builder = HttpBuilder::new();
316+
317+
if self.parsed_url.is_none() {
318+
return config_err!("Parsed URL is required to build HFStore");
319+
}
325320

326-
if let Some(ep) = &self.endpoint {
327-
inner_builder = inner_builder.with_url(ep);
321+
let ep;
322+
if let Some(endpoint) = &self.endpoint {
323+
ep = endpoint.to_string();
328324
} else {
329-
inner_builder = inner_builder.with_url(DEFAULT_ENDPOINT);
325+
ep = DEFAULT_ENDPOINT.to_string();
330326
}
331327

328+
let url = format!("{}/{}", ep, self.parsed_url.as_ref().unwrap().file_path());
329+
println!("URL: {}", url);
330+
331+
builder = builder.with_url(url);
332+
332333
if let Some(user_access_token) = &self.user_access_token {
333334
if let Ok(token) = format!("Bearer {}", user_access_token).parse() {
334335
let mut header_map = HeaderMap::new();
@@ -338,113 +339,47 @@ impl HFStoreBuilder {
338339
);
339340
let options = ClientOptions::new().with_default_headers(header_map);
340341

341-
inner_builder = inner_builder.with_client_options(options);
342+
builder = builder.with_client_options(options);
342343
}
343344
}
344-
let inner_store = inner_builder.build()?;
345-
346-
return Ok(HFStore {
347-
inner: Arc::new(inner_store),
348-
});
349-
}
350-
}
351345

352-
#[derive(Debug, Clone)]
353-
pub struct HFStore {
354-
inner: Arc<HttpStore>,
355-
}
356-
357-
impl Display for HFStore {
358-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359-
write!(f, "HFStore")
346+
builder.build().map_err(|e| DataFusionError::Execution(format!("Unable to build HFStore: {}", e)))
360347
}
361348
}
362349

363-
#[async_trait]
364-
impl ObjectStore for HFStore {
365-
async fn put_opts(
366-
&self,
367-
_location: &Path,
368-
_bytes: Bytes,
369-
_opts: PutOptions,
370-
) -> object_store::Result<PutResult> {
371-
Err(ObjectStoreError::NotSupported {source: "HFStore::put_opts".to_string().into()})
372-
}
350+
pub fn get_hf_object_store_builder(
351+
url: &Url,
352+
options: &HFOptions,
353+
) -> Result<HFStoreBuilder>
354+
{
355+
let parsed_url = ParsedHFUrl::parse(url.to_string())?;
356+
let mut builder = HFStoreBuilder::from_env();
357+
builder = builder.with_parsed_url(parsed_url);
373358

374-
async fn put_multipart(
375-
&self,
376-
_location: &Path,
377-
) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
378-
Err(ObjectStoreError::NotSupported {source: "HFStore::put_multipart".to_string().into()})
359+
if let Some(endpoint) = &options.endpoint {
360+
builder = builder.with_endpoint(endpoint);
379361
}
380362

381-
async fn abort_multipart(
382-
&self,
383-
_location: &Path,
384-
_multipart_id: &MultipartId,
385-
) -> object_store::Result<()> {
386-
Err(ObjectStoreError::NotSupported {source: "HFStore::abort_multipart".to_string().into()})
363+
if let Some(user_access_token) = &options.user_access_token {
364+
builder = builder.with_user_access_token(user_access_token);
387365
}
388366

389-
async fn get_opts(
390-
&self,
391-
location: &Path,
392-
options: GetOptions,
393-
) -> object_store::Result<GetResult> {
394-
println!("HFStore::get_opts: {:?}", location);
395-
396-
self.inner.get_opts(location, options).await
397-
}
398-
399-
async fn delete(&self, _location: &Path) -> object_store::Result<()> {
400-
Err(ObjectStoreError::NotSupported {source: "HFStore::delete".to_string().into()})
401-
}
402-
403-
fn list(
404-
&self,
405-
_prefix: Option<&Path>,
406-
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
407-
Box::pin(futures::stream::empty())
408-
}
409-
410-
async fn list_with_delimiter(
411-
&self,
412-
_prefix: Option<&Path>,
413-
) -> object_store::Result<ListResult> {
414-
Err(ObjectStoreError::NotSupported {source: "HFStore::list_with_delimiter".to_string().into()})
415-
}
416-
417-
async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
418-
Err(ObjectStoreError::NotSupported {source: "HFStore::copy".to_string().into()})
419-
}
420-
421-
async fn copy_if_not_exists(
422-
&self,
423-
_from: &Path,
424-
_to: &Path,
425-
) -> object_store::Result<()> {
426-
Err(ObjectStoreError::NotSupported {source: "HFStore::copy_if_not_exists".to_string().into()})
427-
}
367+
Ok(builder)
428368
}
429369

430370
#[cfg(test)]
431371
mod tests {
432372
use datafusion::error::DataFusionError;
433373

434-
use crate::hf_store::{HFOptions, ParsedHFUrl};
374+
use crate::hf_store::ParsedHFUrl;
435375

436376
#[test]
437377
fn test_parse_hf_url() {
438378
let url =
439379
"hf://datasets/datasets-examples/doc-formats-csv-1/data.csv".to_string();
440-
let options = HFOptions::default();
441380

442-
let parsed_url = ParsedHFUrl::parse(url, options).unwrap();
381+
let parsed_url = ParsedHFUrl::parse(url).unwrap();
443382

444-
assert_eq!(
445-
parsed_url.endpoint,
446-
Some("https://huggingface.co".to_string())
447-
);
448383
assert_eq!(parsed_url.repo_type, Some("datasets".to_string()));
449384
assert_eq!(
450385
parsed_url.repository,
@@ -458,14 +393,9 @@ mod tests {
458393
fn test_parse_hf_url_with_revision() {
459394
let url =
460395
"hf://datasets/datasets-examples/doc-formats-csv-1@~csv/data.csv".to_string();
461-
let options = HFOptions::default();
462396

463-
let parsed_url = ParsedHFUrl::parse(url, options).unwrap();
397+
let parsed_url = ParsedHFUrl::parse(url).unwrap();
464398

465-
assert_eq!(
466-
parsed_url.endpoint,
467-
Some("https://huggingface.co".to_string())
468-
);
469399
assert_eq!(parsed_url.repo_type, Some("datasets".to_string()));
470400
assert_eq!(
471401
parsed_url.repository,
@@ -504,41 +434,41 @@ mod tests {
504434
}
505435

506436
#[test]
507-
fn test_file_url() {
437+
fn test_file_path() {
508438
let url =
509439
"hf://datasets/datasets-examples/doc-formats-csv-1/data.csv".to_string();
510-
let options = HFOptions::default();
511440

512-
let parsed_url = ParsedHFUrl::parse(url, options).unwrap();
441+
let parsed_url = ParsedHFUrl::parse(url);
442+
443+
assert!(parsed_url.is_ok());
513444

514-
let file_url = parsed_url.file_url().unwrap();
445+
let file_path = parsed_url.unwrap().file_path();
515446

516447
assert_eq!(
517-
file_url,
518-
"https://huggingface.co/datasets/datasets-examples/doc-formats-csv-1/resolve/main/data.csv"
448+
file_path,
449+
"datasets/datasets-examples/doc-formats-csv-1/resolve/main/data.csv"
519450
);
520451
}
521452

522453
#[test]
523-
fn test_tree_url() {
454+
fn test_tree_path() {
524455
let url =
525456
"hf://datasets/datasets-examples/doc-formats-csv-1/data.csv".to_string();
526-
let options = HFOptions::default();
527457

528-
let parsed_url = ParsedHFUrl::parse(url, options).unwrap();
458+
let parsed_url = ParsedHFUrl::parse(url);
529459

530-
let tree_url = parsed_url.tree_url().unwrap();
460+
assert!(parsed_url.is_ok());
461+
462+
let tree_path = parsed_url.unwrap().tree_path();
531463

532464
assert_eq!(
533-
tree_url,
534-
"https://huggingface.co/api/datasets/datasets-examples/doc-formats-csv-1/tree/main/data.csv"
465+
tree_path,
466+
"api/datasets/datasets-examples/doc-formats-csv-1/tree/main/data.csv"
535467
);
536468
}
537469

538470
fn test_error(url: &str, expected: &str) {
539-
let options = HFOptions::default();
540-
541-
let parsed_url_result = ParsedHFUrl::parse(url.to_string(), options);
471+
let parsed_url_result = ParsedHFUrl::parse(url.to_string());
542472

543473
match parsed_url_result {
544474
Ok(_) => panic!("Expected error, but got success"),

0 commit comments

Comments
 (0)