Skip to content

Commit 674736f

Browse files
committed
Revamp ArrowFile
1 parent 4812d7c commit 674736f

File tree

3 files changed

+37
-55
lines changed

3 files changed

+37
-55
lines changed

arrow-integration-testing/src/bin/arrow-json-integration-test.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use arrow::error::{ArrowError, Result};
1919
use arrow::ipc::reader::FileReader;
2020
use arrow::ipc::writer::FileWriter;
2121
use arrow_integration_test::*;
22-
use arrow_integration_testing::{canonicalize_schema, read_json_file};
22+
use arrow_integration_testing::{canonicalize_schema, open_json_file};
2323
use clap::Parser;
2424
use std::fs::File;
2525

@@ -63,12 +63,12 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()>
6363
eprintln!("Converting {json_name} to {arrow_name}");
6464
}
6565

66-
let json_file = read_json_file(json_name)?;
66+
let json_file = open_json_file(json_name)?;
6767

6868
let arrow_file = File::create(arrow_name)?;
6969
let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?;
7070

71-
for b in json_file.batches {
71+
for b in json_file.read_batches()? {
7272
writer.write(&b)?;
7373
}
7474

@@ -116,7 +116,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
116116
}
117117

118118
// open JSON file
119-
let json_file = read_json_file(json_name)?;
119+
let json_file = open_json_file(json_name)?;
120120

121121
// open Arrow file
122122
let arrow_file = File::open(arrow_name)?;
@@ -131,7 +131,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
131131
)));
132132
}
133133

134-
let json_batches = &json_file.batches;
134+
let json_batches = json_file.read_batches()?;
135135

136136
// compare number of batches
137137
assert!(

arrow-integration-testing/src/flight_client_scenarios/integration_test.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::{read_json_file, ArrowFile};
18+
use crate::open_json_file;
1919
use std::collections::HashMap;
2020

2121
use arrow::{
@@ -45,23 +45,16 @@ pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result {
4545

4646
let client = FlightServiceClient::connect(url).await?;
4747

48-
let ArrowFile {
49-
schema, batches, ..
50-
} = read_json_file(path)?;
48+
let json_file = open_json_file(path)?;
5149

52-
let schema = Arc::new(schema);
50+
let batches = json_file.read_batches()?;
51+
let schema = Arc::new(json_file.schema);
5352

5453
let mut descriptor = FlightDescriptor::default();
5554
descriptor.set_type(DescriptorType::Path);
5655
descriptor.path = vec![path.to_string()];
5756

58-
upload_data(
59-
client.clone(),
60-
schema.clone(),
61-
descriptor.clone(),
62-
batches.clone(),
63-
)
64-
.await?;
57+
upload_data(client.clone(), schema, descriptor.clone(), batches.clone()).await?;
6558
verify_data(client, descriptor, &batches).await?;
6659

6760
Ok(())

arrow-integration-testing/src/lib.rs

Lines changed: 27 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,27 @@ pub struct ArrowFile {
4646
pub schema: Schema,
4747
// we can evolve this into a concrete Arrow type
4848
// this is temporarily not being read from
49-
pub _dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
50-
pub batches: Vec<RecordBatch>,
49+
dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
50+
arrow_json: Value,
51+
}
52+
53+
impl ArrowFile {
54+
pub fn read_batch(&self, batch_num: usize) -> Result<RecordBatch> {
55+
let b = self.arrow_json["batches"].get(batch_num).unwrap();
56+
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
57+
record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries))
58+
}
59+
60+
pub fn read_batches(&self) -> Result<Vec<RecordBatch>> {
61+
let mut batches = vec![];
62+
// XXX collect?
63+
for b in self.arrow_json["batches"].as_array().unwrap() {
64+
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
65+
let batch = record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries))?;
66+
batches.push(batch);
67+
}
68+
Ok(batches)
69+
}
5170
}
5271

5372
// Canonicalize the names of map fields in a schema
@@ -87,13 +106,7 @@ pub fn canonicalize_schema(schema: &Schema) -> Schema {
87106
Schema::new(fields).with_metadata(schema.metadata().clone())
88107
}
89108

90-
struct LazyArrowFile {
91-
schema: Schema,
92-
dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
93-
arrow_json: Value,
94-
}
95-
96-
fn read_json_file_metadata(json_name: &str) -> Result<LazyArrowFile> {
109+
pub fn open_json_file(json_name: &str) -> Result<ArrowFile> {
97110
let json_file = File::open(json_name)?;
98111
let reader = BufReader::new(json_file);
99112
let arrow_json: Value = serde_json::from_reader(reader).unwrap();
@@ -111,37 +124,13 @@ fn read_json_file_metadata(json_name: &str) -> Result<LazyArrowFile> {
111124
dictionaries.insert(json_dict.id, json_dict);
112125
}
113126
}
114-
Ok(LazyArrowFile {
127+
Ok(ArrowFile {
115128
schema,
116129
dictionaries,
117130
arrow_json,
118131
})
119132
}
120133

121-
pub fn read_json_file(json_name: &str) -> Result<ArrowFile> {
122-
let f = read_json_file_metadata(json_name)?;
123-
124-
let mut batches = vec![];
125-
for b in f.arrow_json["batches"].as_array().unwrap() {
126-
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
127-
let batch = record_batch_from_json(&f.schema, json_batch, Some(&f.dictionaries))?;
128-
batches.push(batch);
129-
}
130-
Ok(ArrowFile {
131-
schema: f.schema,
132-
_dictionaries: f.dictionaries,
133-
batches,
134-
})
135-
}
136-
137-
pub fn read_single_batch_from_json_file(json_name: &str, batch_num: usize) -> Result<RecordBatch> {
138-
let f = read_json_file_metadata(json_name)?;
139-
let b = f.arrow_json["batches"].get(batch_num).unwrap();
140-
let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
141-
let batch = record_batch_from_json(&f.schema, json_batch, Some(&f.dictionaries))?;
142-
Ok(batch)
143-
}
144-
145134
/// Read gzipped JSON test file
146135
///
147136
/// For example given the input:
@@ -176,7 +165,7 @@ fn cdata_integration_export_schema_from_json(
176165
out: *mut FFI_ArrowSchema,
177166
) -> Result<()> {
178167
let json_name = unsafe { CStr::from_ptr(c_json_name) };
179-
let f = read_json_file_metadata(json_name.to_str()?)?;
168+
let f = open_json_file(json_name.to_str()?)?;
180169
let c_schema = FFI_ArrowSchema::try_from(&f.schema)?;
181170
// Move exported schema into output struct
182171
unsafe { ptr::write(out, c_schema) };
@@ -189,7 +178,7 @@ fn cdata_integration_export_batch_from_json(
189178
out: *mut FFI_ArrowArray,
190179
) -> Result<()> {
191180
let json_name = unsafe { CStr::from_ptr(c_json_name) };
192-
let b = read_single_batch_from_json_file(json_name.to_str()?, batch_num.try_into().unwrap())?;
181+
let b = open_json_file(json_name.to_str()?)?.read_batch(batch_num.try_into().unwrap())?;
193182
let a = StructArray::from(b).into_data();
194183
let c_array = FFI_ArrowArray::new(&a);
195184
// Move exported array into output struct
@@ -202,7 +191,7 @@ fn cdata_integration_import_schema_and_compare_to_json(
202191
c_schema: *mut FFI_ArrowSchema,
203192
) -> Result<()> {
204193
let json_name = unsafe { CStr::from_ptr(c_json_name) };
205-
let json_schema = read_json_file_metadata(json_name.to_str()?)?.schema;
194+
let json_schema = open_json_file(json_name.to_str()?)?.schema;
206195

207196
// The source ArrowSchema will be released when this is dropped
208197
let imported_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema) };
@@ -241,7 +230,7 @@ fn cdata_integration_import_batch_and_compare_to_json(
241230
) -> Result<()> {
242231
let json_name = unsafe { CStr::from_ptr(c_json_name) };
243232
let json_batch =
244-
read_single_batch_from_json_file(json_name.to_str()?, batch_num.try_into().unwrap())?;
233+
open_json_file(json_name.to_str()?)?.read_batch(batch_num.try_into().unwrap())?;
245234
let schema = json_batch.schema();
246235

247236
let data_type_for_import = DataType::Struct(schema.fields.clone());

0 commit comments

Comments
 (0)