-
Notifications
You must be signed in to change notification settings - Fork 19
Description
RecordBatchStreamWriter.write() silently discards RecordBatch payloads whose schema does not match the writer's current schema. No error is thrown, no warning is logged, and write() returns undefined — identical to a successful write. This makes it very easy to lose data without any indication that something went wrong.
Current behavior
With the default autoDestroy: true:
- The first
write(batch)call establishes the writer's schema. - If a subsequent
write(batch)is called with a batch whose schema differs, the writer silently callsthis.close()and returns — the batch is never written. - The caller receives no indication that data was dropped.
The relevant code path is in src/ipc/writer.ts:
if (schema && !compareSchemas(schema, this._schema)) {
if (this._started && this._autoDestroy) {
return this.close(); // ← batch silently dropped, no error
}
this.reset(this._sink, schema);
}Additionally, the autoDestroy option in RecordBatchStreamWriterOptions has an empty JSDoc comment, so this behavior is undocumented.
Expected behavior
write() should throw an error when a batch's schema does not match the writer's schema, e.g.:
Error: RecordBatch schema does not match the writer's schema.
Expected: [id: Int32, name: Utf8]
Received: [x: Float64, y: Float64]
This is consistent with how other Arrow implementations handle this case — for example, PyArrow raises ArrowInvalid if you attempt to write a batch with a mismatched schema.
Reproducer
import {
Field, Float64, Int32, makeData,
RecordBatch, RecordBatchStreamWriter,
Schema, Struct, tableFromIPC, Utf8,
} from 'apache-arrow';
const schemaA = new Schema([
new Field('id', new Int32()),
new Field('name', new Utf8()),
]);
const schemaB = new Schema([
new Field('x', new Float64()),
new Field('y', new Float64()),
]);
// Build batch A (3 rows)
const batchA = new RecordBatch(schemaA, makeData({
type: new Struct(schemaA.fields), length: 3, nullCount: 0,
children: [
makeData({ type: new Int32(), data: new Int32Array([1, 2, 3]) }),
makeData({ type: new Utf8(), data: Buffer.from('foobarbaz'), valueOffsets: new Int32Array([0, 3, 6, 9]) }),
],
}));
// Build batch B (2 rows, different schema)
const batchB = new RecordBatch(schemaB, makeData({
type: new Struct(schemaB.fields), length: 2, nullCount: 0,
children: [
makeData({ type: new Float64(), data: new Float64Array([1.1, 2.2]) }),
makeData({ type: new Float64(), data: new Float64Array([3.3, 4.4]) }),
],
}));
const writer = new RecordBatchStreamWriter(); // autoDestroy defaults to true
writer.write(batchA); // establishes schema
writer.write(batchB); // silently dropped — no error thrown
const table = tableFromIPC(writer.toUint8Array(true));
console.log(table.numRows); // 3 — only batchA was written
console.log(table.batches.length); // 1 — batchB was silently lostAdditional notes
- The
autoDestroy: falsepath has different but also surprising behavior: instead of dropping the batch, it callsreset()which silently switches the writer to the new schema. This may be the intended behavior for the multi-stream use case (seestream-writer-tests.tsline 109), but it would benefit from documentation. - All existing tests that use
autoDestroyexplicitly set it tofalse, so the defaulttruepath was effectively untested for schema mismatches.
Thank you,
Rusty