Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 58 additions & 18 deletions cpp/src/arrow/util/compression_lz4.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,30 +426,59 @@ class Lz4HadoopCodec : public Lz4Codec {

int64_t MaxCompressedLen(int64_t input_len,
const uint8_t* ARROW_ARG_UNUSED(input)) override {
return kPrefixLength + Lz4Codec::MaxCompressedLen(input_len, nullptr);
// Each block gets its own 8-byte prefix and is compressed independently,
// so we sum LZ4_compressBound per block (not for the whole input at once,
// since LZ4_compressBound has a small per-call overhead).
const int64_t num_full_blocks = input_len / kBlockSize;
const int64_t tail = input_len % kBlockSize;
int64_t max_len = num_full_blocks *
(kPrefixLength + Lz4Codec::MaxCompressedLen(kBlockSize, nullptr));
if (tail > 0) {
max_len += kPrefixLength + Lz4Codec::MaxCompressedLen(tail, nullptr);
}
return max_len;
}

Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) override {
if (output_buffer_len < kPrefixLength) {
return Status::Invalid("Output buffer too small for Lz4HadoopCodec compression");
// Hadoop's BlockCompressorStream splits data into blocks of at most
// kBlockSize uncompressed bytes, each prefixed with [decompressed_size,
// compressed_size] in big-endian uint32. Hadoop's Lz4Decompressor
// allocates a fixed output buffer of the same size, so a single block
// that decompresses to more than kBlockSize will overflow that buffer
// and fail. We must split large inputs the same way Hadoop does.
int64_t total_output_len = 0;

while (input_len > 0) {
const int64_t block_input_len = input_len < kBlockSize ? input_len : kBlockSize;

if (output_buffer_len < kPrefixLength) {
return Status::Invalid("Output buffer too small for Lz4HadoopCodec compression");
}

ARROW_ASSIGN_OR_RAISE(
int64_t block_compressed_len,
Lz4Codec::Compress(block_input_len, input, output_buffer_len - kPrefixLength,
output_buffer + kPrefixLength));

// Prepend decompressed size in bytes and compressed size in bytes
// to be compatible with Hadoop Lz4Codec
const uint32_t decompressed_size =
bit_util::ToBigEndian(static_cast<uint32_t>(block_input_len));
const uint32_t compressed_size =
bit_util::ToBigEndian(static_cast<uint32_t>(block_compressed_len));
SafeStore(output_buffer, decompressed_size);
SafeStore(output_buffer + sizeof(uint32_t), compressed_size);

const int64_t block_total_len = kPrefixLength + block_compressed_len;
total_output_len += block_total_len;
output_buffer += block_total_len;
output_buffer_len -= block_total_len;
input += block_input_len;
input_len -= block_input_len;
}

ARROW_ASSIGN_OR_RAISE(
int64_t output_len,
Lz4Codec::Compress(input_len, input, output_buffer_len - kPrefixLength,
output_buffer + kPrefixLength));

// Prepend decompressed size in bytes and compressed size in bytes
// to be compatible with Hadoop Lz4Codec
const uint32_t decompressed_size =
bit_util::ToBigEndian(static_cast<uint32_t>(input_len));
const uint32_t compressed_size =
bit_util::ToBigEndian(static_cast<uint32_t>(output_len));
SafeStore(output_buffer, decompressed_size);
SafeStore(output_buffer + sizeof(uint32_t), compressed_size);

return kPrefixLength + output_len;
return total_output_len;
}

Result<std::shared_ptr<Compressor>> MakeCompressor() override {
Expand All @@ -470,6 +499,17 @@ class Lz4HadoopCodec : public Lz4Codec {
// Offset starting at which page data can be read/written
static const int64_t kPrefixLength = sizeof(uint32_t) * 2;

// Maximum uncompressed block size per Hadoop-framed LZ4 block.
// Hadoop's IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE is configurable at
// runtime (default 256 KiB). Both its BlockCompressorStream and
// Lz4Decompressor use that value: the compressor splits data into
// blocks of this size, and the decompressor allocates a fixed output
// buffer of the same size. We use the default here since we cannot
// read Hadoop's runtime configuration from C++.
static constexpr int64_t kBlockSize = 256 * 1024;
static_assert(kBlockSize <= std::numeric_limits<uint32_t>::max(),
"kBlockSize must fit in uint32_t for Hadoop framing prefix");

static const int64_t kNotHadoop = -1;

int64_t TryDecompressHadoop(int64_t input_len, const uint8_t* input,
Expand Down
60 changes: 60 additions & 0 deletions cpp/src/arrow/util/compression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,66 @@ TEST(TestCodecLZ4Hadoop, Compatibility) {
std::vector<uint8_t> data = MakeRandomData(100);
CheckCodecRoundtrip(c1, c2, data, /*check_reverse=*/false);
}

TEST(TestCodecLZ4Hadoop, MultiBlockRoundtrip) {
// Verify multi-block Hadoop-framed LZ4 compression:
// 1. MaxCompressedLen provides a sufficient buffer (even for incompressible data)
// 2. Compress -> Decompress round-trips correctly
// 3. No block exceeds Hadoop's 256 KiB decompressed limit
// (Hadoop's Lz4Decompressor allocates a fixed output buffer of that size;
// exceeding it causes LZ4Exception on JVM readers.)
//
// Check (3) fails without the multi-block splitting fix.
ASSERT_OK_AND_ASSIGN(auto codec, Codec::Create(Compression::LZ4_HADOOP));
constexpr int kHadoopBlockSize = 256 * 1024;

for (int data_size :
{0, 1, 10000, 256 * 1024, 256 * 1024 + 1, 320000, 512 * 1024, 1024 * 1024}) {
ARROW_SCOPED_TRACE("data_size = ", data_size);
std::vector<uint8_t> data = MakeRandomData(data_size);

// (1) MaxCompressedLen must be sufficient — Compress fails if not.
int64_t max_compressed_len = codec->MaxCompressedLen(data.size(), data.data());
std::vector<uint8_t> compressed(max_compressed_len);

ASSERT_OK_AND_ASSIGN(
int64_t compressed_len,
codec->Compress(data.size(), data.data(), max_compressed_len, compressed.data()));
ASSERT_LE(compressed_len, max_compressed_len);

// (2) Round-trip: decompress and verify data integrity.
std::vector<uint8_t> decompressed(data.size());
ASSERT_OK_AND_ASSIGN(int64_t decompressed_len,
codec->Decompress(compressed_len, compressed.data(), data.size(),
decompressed.data()));
ASSERT_EQ(decompressed_len, static_cast<int64_t>(data.size()));
ASSERT_EQ(data, decompressed);

// (3) Walk Hadoop-framed blocks and verify each decompressed_size <= 256 KiB.
const uint8_t* ptr = compressed.data();
int64_t remaining = compressed_len;
int block_count = 0;
while (remaining > 0) {
ASSERT_GE(remaining, 8) << "truncated block header";
uint32_t block_decompressed_size =
static_cast<uint32_t>(ptr[0]) << 24 | static_cast<uint32_t>(ptr[1]) << 16 |
static_cast<uint32_t>(ptr[2]) << 8 | static_cast<uint32_t>(ptr[3]);
uint32_t block_compressed_size =
static_cast<uint32_t>(ptr[4]) << 24 | static_cast<uint32_t>(ptr[5]) << 16 |
static_cast<uint32_t>(ptr[6]) << 8 | static_cast<uint32_t>(ptr[7]);
ASSERT_LE(static_cast<int>(block_decompressed_size), kHadoopBlockSize)
<< "block " << block_count << " exceeds Hadoop's 256 KiB limit";
ASSERT_GE(remaining, 8 + static_cast<int64_t>(block_compressed_size));
ptr += 8 + block_compressed_size;
remaining -= 8 + block_compressed_size;
++block_count;
}
ASSERT_EQ(remaining, 0);
if (data_size > kHadoopBlockSize) {
ASSERT_GE(block_count, 2) << "expected multiple blocks for data > 256 KiB";
}
}
}
#endif

} // namespace util
Expand Down
Loading