Skip to content

Commit 2ea4f50

Browse files
committed
GH-3358: Add Configurable Thrift Max Message Size for Parquet Metadata Reading
1 parent 8e740f0 commit 2ea4f50

File tree

3 files changed

+254
-10
lines changed

3 files changed

+254
-10
lines changed

parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.parquet.format.event.TypedConsumer.I64Consumer;
4646
import org.apache.parquet.format.event.TypedConsumer.StringConsumer;
4747
import org.apache.thrift.TBase;
48+
import org.apache.thrift.TConfiguration;
4849
import org.apache.thrift.TException;
4950
import org.apache.thrift.protocol.TCompactProtocol;
5051
import org.apache.thrift.protocol.TProtocol;
@@ -156,6 +157,15 @@ public static FileMetaData readFileMetaData(InputStream from, BlockCipher.Decryp
156157
return read(from, new FileMetaData(), decryptor, AAD);
157158
}
158159

160+
public static FileMetaData readFileMetaData(InputStream from, int maxMessageSize) throws IOException {
161+
return readFileMetaData(from, null, null, maxMessageSize);
162+
}
163+
164+
public static FileMetaData readFileMetaData(
165+
InputStream from, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) throws IOException {
166+
return read(from, new FileMetaData(), decryptor, AAD, maxMessageSize);
167+
}
168+
159169
public static void writeColumnMetaData(
160170
ColumnMetaData columnMetaData, OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD)
161171
throws IOException {
@@ -190,6 +200,18 @@ public static FileMetaData readFileMetaData(
190200
return md;
191201
}
192202

203+
public static FileMetaData readFileMetaData(
204+
InputStream from, boolean skipRowGroups, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
205+
throws IOException {
206+
FileMetaData md = new FileMetaData();
207+
if (skipRowGroups) {
208+
readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups, decryptor, AAD, maxMessageSize);
209+
} else {
210+
read(from, md, decryptor, AAD, maxMessageSize);
211+
}
212+
return md;
213+
}
214+
193215
public static void writeFileCryptoMetaData(
194216
org.apache.parquet.format.FileCryptoMetaData cryptoMetadata, OutputStream to) throws IOException {
195217
write(cryptoMetadata, to, null, null);
@@ -293,6 +315,17 @@ public static void readFileMetaData(
293315
BlockCipher.Decryptor decryptor,
294316
byte[] AAD)
295317
throws IOException {
318+
readFileMetaData(input, consumer, skipRowGroups, decryptor, AAD, -1);
319+
}
320+
321+
public static void readFileMetaData(
322+
final InputStream input,
323+
final FileMetaDataConsumer consumer,
324+
boolean skipRowGroups,
325+
BlockCipher.Decryptor decryptor,
326+
byte[] AAD,
327+
int maxMessageSize)
328+
throws IOException {
296329
try {
297330
DelegatingFieldConsumer eventConsumer = fieldConsumer()
298331
.onField(VERSION, new I32Consumer() {
@@ -358,26 +391,56 @@ public void consume(RowGroup rowGroup) {
358391
byte[] plainText = decryptor.decrypt(input, AAD);
359392
from = new ByteArrayInputStream(plainText);
360393
}
361-
new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
394+
new EventBasedThriftReader(protocol(from, maxMessageSize)).readStruct(eventConsumer);
362395
} catch (TException e) {
363396
throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
364397
}
365398
}
366399

367400
private static TProtocol protocol(OutputStream to) throws TTransportException {
368-
return protocol(new TIOStreamTransport(to));
401+
return protocol(new TIOStreamTransport(to), -1);
369402
}
370403

371404
private static TProtocol protocol(InputStream from) throws TTransportException {
372-
return protocol(new TIOStreamTransport(from));
405+
return protocol(new TIOStreamTransport(from), -1);
406+
}
407+
408+
private static TProtocol protocol(InputStream from, int maxMessageSize) throws TTransportException {
409+
return protocol(new TIOStreamTransport(from), maxMessageSize);
373410
}
374411

375-
private static InterningProtocol protocol(TIOStreamTransport t) {
412+
private static InterningProtocol protocol(TIOStreamTransport t, int configuredMaxMessageSize)
413+
throws TTransportException, NumberFormatException {
414+
415+
int maxMessageSize = 104857600; // Default to 100 MB
416+
if (configuredMaxMessageSize > 0) {
417+
maxMessageSize = configuredMaxMessageSize;
418+
} else if (configuredMaxMessageSize == -1) {
419+
// Set to default 100 MB
420+
maxMessageSize = 104857600;
421+
} else {
422+
throw new NumberFormatException("Invalid max message size: " + configuredMaxMessageSize);
423+
}
424+
425+
TConfiguration config = t.getConfiguration();
426+
config.setMaxMessageSize(maxMessageSize);
427+
/*
428+
Reset known message size to 0 to force checking against the max message size.
429+
This is necessary when reusing the same transport for multiple reads/writes,
430+
as the known message size may be larger than the max message size.
431+
*/
432+
t.updateKnownMessageSize(0);
376433
return new InterningProtocol(new TCompactProtocol(t));
377434
}
378435

379436
private static <T extends TBase<?, ?>> T read(
380437
final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
438+
return read(input, tbase, decryptor, AAD, -1);
439+
}
440+
441+
private static <T extends TBase<?, ?>> T read(
442+
final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
443+
throws IOException {
381444
final InputStream from;
382445
if (null == decryptor) {
383446
from = input;
@@ -387,7 +450,7 @@ private static InterningProtocol protocol(TIOStreamTransport t) {
387450
}
388451

389452
try {
390-
tbase.read(protocol(from));
453+
tbase.read(protocol(from, maxMessageSize));
391454
return tbase;
392455
} catch (TException e) {
393456
throw new IOException("can not read " + tbase.getClass() + ": " + e.getMessage(), e);

parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,23 @@ public class ParquetMetadataConverter {
147147
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
148148
public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k
149149

150+
/**
151+
* Configuration property to control the Thrift max message size when reading Parquet metadata.
152+
* This is useful for files with very large metadata
153+
* Default value is 100 MB.
154+
*/
155+
public static final String PARQUET_THRIFT_STRING_SIZE_LIMIT = "parquet.thrift.string.size.limit";
156+
157+
private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB
158+
150159
private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class);
151160
private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR =
152161
new LogicalTypeConverterVisitor();
153162
private static final ConvertedTypeConverterVisitor CONVERTED_TYPE_CONVERTER_VISITOR =
154163
new ConvertedTypeConverterVisitor();
155164
private final int statisticsTruncateLength;
156165
private final boolean useSignedStringMinMax;
166+
private final ParquetReadOptions options;
157167

158168
public ParquetMetadataConverter() {
159169
this(false);
@@ -173,19 +183,38 @@ public ParquetMetadataConverter(Configuration conf) {
173183
}
174184

175185
public ParquetMetadataConverter(ParquetReadOptions options) {
176-
this(options.useSignedStringMinMax());
186+
this(options.useSignedStringMinMax(), ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, options);
177187
}
178188

179189
private ParquetMetadataConverter(boolean useSignedStringMinMax) {
180190
this(useSignedStringMinMax, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
181191
}
182192

183193
private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) {
194+
this(useSignedStringMinMax, statisticsTruncateLength, null);
195+
}
196+
197+
private ParquetMetadataConverter(
198+
boolean useSignedStringMinMax, int statisticsTruncateLength, ParquetReadOptions options) {
184199
if (statisticsTruncateLength <= 0) {
185200
throw new IllegalArgumentException("Truncate length should be greater than 0");
186201
}
187202
this.useSignedStringMinMax = useSignedStringMinMax;
188203
this.statisticsTruncateLength = statisticsTruncateLength;
204+
this.options = options;
205+
}
206+
207+
/**
208+
* Gets the configured max message size for Thrift deserialization.
209+
* Reads from ParquetReadOptions configuration, or returns -1 if not available.
210+
*
211+
* @return the max message size in bytes, or -1 to use the default
212+
*/
213+
private int getMaxMessageSize() {
214+
if (options != null && options.getConfiguration() != null) {
215+
return options.getConfiguration().getInt(PARQUET_THRIFT_STRING_SIZE_LIMIT, DEFAULT_MAX_MESSAGE_SIZE);
216+
}
217+
return -1;
189218
}
190219

191220
// NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate
@@ -1694,21 +1723,27 @@ public ParquetMetadata readParquetMetadata(
16941723
filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
16951724
@Override
16961725
public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
1697-
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
1726+
int maxMessageSize = getMaxMessageSize();
1727+
FileMetaData fileMetadata =
1728+
readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
16981729
return new FileMetaDataAndRowGroupOffsetInfo(
16991730
fileMetadata, generateRowGroupOffsets(fileMetadata));
17001731
}
17011732

17021733
@Override
17031734
public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
1704-
FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
1735+
int maxMessageSize = getMaxMessageSize();
1736+
FileMetaData fileMetadata =
1737+
readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD, maxMessageSize);
17051738
return new FileMetaDataAndRowGroupOffsetInfo(
17061739
fileMetadata, generateRowGroupOffsets(fileMetadata));
17071740
}
17081741

17091742
@Override
17101743
public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
1711-
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
1744+
int maxMessageSize = getMaxMessageSize();
1745+
FileMetaData fileMetadata =
1746+
readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
17121747
// We must generate the map *before* filtering because it modifies `fileMetadata`.
17131748
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
17141749
FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
@@ -1717,7 +1752,9 @@ public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) thro
17171752

17181753
@Override
17191754
public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
1720-
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
1755+
int maxMessageSize = getMaxMessageSize();
1756+
FileMetaData fileMetadata =
1757+
readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
17211758
// We must generate the map *before* filtering because it modifies `fileMetadata`.
17221759
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
17231760
FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop;
20+
21+
import static org.junit.Assert.*;
22+
23+
import java.io.File;
24+
import java.io.IOException;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.fs.Path;
27+
import org.apache.parquet.HadoopReadOptions;
28+
import org.apache.parquet.ParquetReadOptions;
29+
import org.apache.parquet.example.data.Group;
30+
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
31+
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
32+
import org.apache.parquet.hadoop.example.GroupWriteSupport;
33+
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
34+
import org.apache.parquet.hadoop.util.HadoopInputFile;
35+
import org.apache.parquet.hadoop.util.HadoopOutputFile;
36+
import org.apache.parquet.schema.MessageType;
37+
import org.apache.parquet.schema.MessageTypeParser;
38+
import org.junit.Before;
39+
import org.junit.Rule;
40+
import org.junit.Test;
41+
import org.junit.rules.TemporaryFolder;
42+
43+
public class TestParquetFileReaderMaxMessageSize {
44+
45+
public static Path TEST_FILE;
46+
public MessageType schema;
47+
48+
@Rule
49+
public final TemporaryFolder temp = new TemporaryFolder();
50+
51+
@Before
52+
public void testSetup() throws IOException {
53+
54+
File testParquetFile = temp.newFile();
55+
testParquetFile.delete();
56+
57+
TEST_FILE = new Path(testParquetFile.toURI());
58+
// Create a file with many columns
59+
StringBuilder schemaBuilder = new StringBuilder("message test_schema {");
60+
for (int i = 0; i < 2000; i++) {
61+
schemaBuilder.append("required int64 col_").append(i).append(";");
62+
}
63+
schemaBuilder.append("}");
64+
65+
schema = MessageTypeParser.parseMessageType(schemaBuilder.toString());
66+
67+
Configuration conf = new Configuration();
68+
GroupWriteSupport.setSchema(schema, conf);
69+
70+
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(HadoopOutputFile.fromPath(TEST_FILE, conf))
71+
.withConf(conf)
72+
.withType(schema)
73+
.build()) {
74+
75+
SimpleGroupFactory factory = new SimpleGroupFactory(schema);
76+
Group group = factory.newGroup();
77+
for (int col = 0; col < 2000; col++) {
78+
group.append("col_" + col, 1L);
79+
}
80+
writer.write(group);
81+
}
82+
}
83+
84+
/**
85+
* Test reading a file with many columns using custom max message size
86+
*/
87+
@Test
88+
public void testReadFileWithManyColumns() throws IOException {
89+
Configuration readConf = new Configuration();
90+
readConf.setInt("parquet.thrift.string.size.limit", 200 * 1024 * 1024);
91+
92+
ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
93+
94+
try (ParquetFileReader reader =
95+
ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
96+
97+
ParquetMetadata metadata = reader.getFooter();
98+
assertNotNull(metadata);
99+
assertEquals(schema, metadata.getFileMetaData().getSchema());
100+
assertTrue(metadata.getBlocks().size() > 0);
101+
}
102+
}
103+
104+
/**
105+
* Test that default configuration works for normal files
106+
*/
107+
@Test
108+
public void testReadNormalFileWithDefaultConfig() throws IOException {
109+
// Read with default configuration (no custom max message size)
110+
Configuration readConf = new Configuration();
111+
ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
112+
113+
try (ParquetFileReader reader =
114+
ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
115+
116+
ParquetMetadata metadata = reader.getFooter();
117+
assertNotNull(metadata);
118+
assertEquals(1, metadata.getBlocks().get(0).getRowCount());
119+
}
120+
}
121+
122+
/**
123+
* Test that insufficient max message size produces error
124+
*/
125+
@Test
126+
public void testInsufficientMaxMessageSizeError() throws IOException {
127+
// Try to read with very small max message size
128+
Configuration readConf = new Configuration();
129+
readConf.setInt("parquet.thrift.string.size.limit", 1); // Only 1 byte
130+
131+
ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
132+
133+
try (ParquetFileReader reader =
134+
ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
135+
fail("Should have thrown Message size exceeds limit due to MaxMessageSize");
136+
} catch (IOException e) {
137+
e.printStackTrace();
138+
assertTrue(
139+
"Error should mention TTransportException",
140+
e.getMessage().contains("Message size exceeds limit")
141+
|| e.getCause().getMessage().contains("Message size exceeds limit"));
142+
}
143+
}
144+
}

0 commit comments

Comments
 (0)