Skip to content

Commit 2fa4c47

Browse files
committed
Deprecate RecordBatch::concat (apache#2594)
1 parent 566ef3d commit 2fa4c47

File tree

2 files changed

+105
-99
lines changed

2 files changed

+105
-99
lines changed

arrow/src/compute/kernels/concat.rs

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@
3131
//! ```
3232
3333
use crate::array::*;
34-
use crate::datatypes::DataType;
34+
use crate::datatypes::{DataType, SchemaRef};
3535
use crate::error::{ArrowError, Result};
36+
use crate::record_batch::RecordBatch;
3637

3738
fn compute_str_values_length<Offset: OffsetSizeTrait>(arrays: &[&ArrayData]) -> usize {
3839
arrays
@@ -102,6 +103,35 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef> {
102103
Ok(make_array(mutable.freeze()))
103104
}
104105

106+
/// Concatenates `batches` together into a single record batch.
107+
pub fn concat_batches(schema: &SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
108+
if batches.is_empty() {
109+
return Ok(RecordBatch::new_empty(schema.clone()));
110+
}
111+
if let Some((i, _)) = batches
112+
.iter()
113+
.enumerate()
114+
.find(|&(_, batch)| batch.schema() != *schema)
115+
{
116+
return Err(ArrowError::InvalidArgumentError(format!(
117+
"batches[{}] schema is different with argument schema.",
118+
i
119+
)));
120+
}
121+
let field_num = schema.fields().len();
122+
let mut arrays = Vec::with_capacity(field_num);
123+
for i in 0..field_num {
124+
let array = concat(
125+
&batches
126+
.iter()
127+
.map(|batch| batch.column(i).as_ref())
128+
.collect::<Vec<_>>(),
129+
)?;
130+
arrays.push(array);
131+
}
132+
RecordBatch::try_new(schema.clone(), arrays)
133+
}
134+
105135
#[cfg(test)]
106136
mod tests {
107137
use super::*;
@@ -569,4 +599,76 @@ mod tests {
569599
assert!(!copy.data().child_data()[0].ptr_eq(&combined.data().child_data()[0]));
570600
assert!(!new.data().child_data()[0].ptr_eq(&combined.data().child_data()[0]));
571601
}
602+
603+
#[test]
604+
fn concat_record_batches() {
605+
let schema = Arc::new(Schema::new(vec![
606+
Field::new("a", DataType::Int32, false),
607+
Field::new("b", DataType::Utf8, false),
608+
]));
609+
let batch1 = RecordBatch::try_new(
610+
schema.clone(),
611+
vec![
612+
Arc::new(Int32Array::from(vec![1, 2])),
613+
Arc::new(StringArray::from(vec!["a", "b"])),
614+
],
615+
)
616+
.unwrap();
617+
let batch2 = RecordBatch::try_new(
618+
schema.clone(),
619+
vec![
620+
Arc::new(Int32Array::from(vec![3, 4])),
621+
Arc::new(StringArray::from(vec!["c", "d"])),
622+
],
623+
)
624+
.unwrap();
625+
let new_batch = RecordBatch::concat(&schema, &[batch1, batch2]).unwrap();
626+
assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
627+
assert_eq!(2, new_batch.num_columns());
628+
assert_eq!(4, new_batch.num_rows());
629+
}
630+
631+
#[test]
632+
fn concat_empty_record_batch() {
633+
let schema = Arc::new(Schema::new(vec![
634+
Field::new("a", DataType::Int32, false),
635+
Field::new("b", DataType::Utf8, false),
636+
]));
637+
let batch = RecordBatch::concat(&schema, &[]).unwrap();
638+
assert_eq!(batch.schema().as_ref(), schema.as_ref());
639+
assert_eq!(0, batch.num_rows());
640+
}
641+
642+
#[test]
643+
fn concat_record_batches_of_different_schemas() {
644+
let schema1 = Arc::new(Schema::new(vec![
645+
Field::new("a", DataType::Int32, false),
646+
Field::new("b", DataType::Utf8, false),
647+
]));
648+
let schema2 = Arc::new(Schema::new(vec![
649+
Field::new("c", DataType::Int32, false),
650+
Field::new("d", DataType::Utf8, false),
651+
]));
652+
let batch1 = RecordBatch::try_new(
653+
schema1.clone(),
654+
vec![
655+
Arc::new(Int32Array::from(vec![1, 2])),
656+
Arc::new(StringArray::from(vec!["a", "b"])),
657+
],
658+
)
659+
.unwrap();
660+
let batch2 = RecordBatch::try_new(
661+
schema2,
662+
vec![
663+
Arc::new(Int32Array::from(vec![3, 4])),
664+
Arc::new(StringArray::from(vec!["c", "d"])),
665+
],
666+
)
667+
.unwrap();
668+
let error = RecordBatch::concat(&schema1, &[batch1, batch2]).unwrap_err();
669+
assert_eq!(
670+
error.to_string(),
671+
"Invalid argument error: batches[1] schema is different with argument schema.",
672+
);
673+
}
572674
}

arrow/src/record_batch.rs

Lines changed: 2 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
use std::sync::Arc;
2222

2323
use crate::array::*;
24-
use crate::compute::kernels::concat::concat;
2524
use crate::datatypes::*;
2625
use crate::error::{ArrowError, Result};
2726

@@ -390,32 +389,9 @@ impl RecordBatch {
390389
}
391390

392391
/// Concatenates `batches` together into a single record batch.
392+
#[deprecated(note = "please use arrow::compute::concat_batches")]
393393
pub fn concat(schema: &SchemaRef, batches: &[Self]) -> Result<Self> {
394-
if batches.is_empty() {
395-
return Ok(RecordBatch::new_empty(schema.clone()));
396-
}
397-
if let Some((i, _)) = batches
398-
.iter()
399-
.enumerate()
400-
.find(|&(_, batch)| batch.schema() != *schema)
401-
{
402-
return Err(ArrowError::InvalidArgumentError(format!(
403-
"batches[{}] schema is different with argument schema.",
404-
i
405-
)));
406-
}
407-
let field_num = schema.fields().len();
408-
let mut arrays = Vec::with_capacity(field_num);
409-
for i in 0..field_num {
410-
let array = concat(
411-
&batches
412-
.iter()
413-
.map(|batch| batch.column(i).as_ref())
414-
.collect::<Vec<_>>(),
415-
)?;
416-
arrays.push(array);
417-
}
418-
Self::try_new(schema.clone(), arrays)
394+
crate::compute::concat_batches(schema, batches)
419395
}
420396
}
421397

@@ -713,78 +689,6 @@ mod tests {
713689
assert_eq!(batch.column(1).as_ref(), int.as_ref());
714690
}
715691

716-
#[test]
717-
fn concat_record_batches() {
718-
let schema = Arc::new(Schema::new(vec![
719-
Field::new("a", DataType::Int32, false),
720-
Field::new("b", DataType::Utf8, false),
721-
]));
722-
let batch1 = RecordBatch::try_new(
723-
schema.clone(),
724-
vec![
725-
Arc::new(Int32Array::from(vec![1, 2])),
726-
Arc::new(StringArray::from(vec!["a", "b"])),
727-
],
728-
)
729-
.unwrap();
730-
let batch2 = RecordBatch::try_new(
731-
schema.clone(),
732-
vec![
733-
Arc::new(Int32Array::from(vec![3, 4])),
734-
Arc::new(StringArray::from(vec!["c", "d"])),
735-
],
736-
)
737-
.unwrap();
738-
let new_batch = RecordBatch::concat(&schema, &[batch1, batch2]).unwrap();
739-
assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
740-
assert_eq!(2, new_batch.num_columns());
741-
assert_eq!(4, new_batch.num_rows());
742-
}
743-
744-
#[test]
745-
fn concat_empty_record_batch() {
746-
let schema = Arc::new(Schema::new(vec![
747-
Field::new("a", DataType::Int32, false),
748-
Field::new("b", DataType::Utf8, false),
749-
]));
750-
let batch = RecordBatch::concat(&schema, &[]).unwrap();
751-
assert_eq!(batch.schema().as_ref(), schema.as_ref());
752-
assert_eq!(0, batch.num_rows());
753-
}
754-
755-
#[test]
756-
fn concat_record_batches_of_different_schemas() {
757-
let schema1 = Arc::new(Schema::new(vec![
758-
Field::new("a", DataType::Int32, false),
759-
Field::new("b", DataType::Utf8, false),
760-
]));
761-
let schema2 = Arc::new(Schema::new(vec![
762-
Field::new("c", DataType::Int32, false),
763-
Field::new("d", DataType::Utf8, false),
764-
]));
765-
let batch1 = RecordBatch::try_new(
766-
schema1.clone(),
767-
vec![
768-
Arc::new(Int32Array::from(vec![1, 2])),
769-
Arc::new(StringArray::from(vec!["a", "b"])),
770-
],
771-
)
772-
.unwrap();
773-
let batch2 = RecordBatch::try_new(
774-
schema2,
775-
vec![
776-
Arc::new(Int32Array::from(vec![3, 4])),
777-
Arc::new(StringArray::from(vec!["c", "d"])),
778-
],
779-
)
780-
.unwrap();
781-
let error = RecordBatch::concat(&schema1, &[batch1, batch2]).unwrap_err();
782-
assert_eq!(
783-
error.to_string(),
784-
"Invalid argument error: batches[1] schema is different with argument schema.",
785-
);
786-
}
787-
788692
#[test]
789693
fn record_batch_equality() {
790694
let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);

0 commit comments

Comments
 (0)