Skip to content

Commit 999f8e2

Browse files
committed
Add Support for Iceberg table sort orders
1 parent 16b447b commit 999f8e2

35 files changed

+1330
-82
lines changed

presto-docs/src/main/sphinx/connector/iceberg.rst

+76-1
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ Property Name Description
258258

259259
Example: ``hdfs://nn:8020/warehouse/path``
260260
This property is required if the ``iceberg.catalog.type`` is
261-
``hadoop``.
261+
``hadoop``. Otherwise, it will be ignored.
262262

263263
``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10``
264264
required if the ``iceberg.catalog.type`` is ``hadoop``.
@@ -1837,3 +1837,78 @@ Map of PrestoDB types to the relevant Iceberg types:
18371837

18381838

18391839
No other types are supported.
1840+
1841+
1842+
Sorted Tables
1843+
^^^^^^^^^^^^^
1844+
1845+
The Iceberg connector supports the creation of sorted tables.
1846+
Data in the Iceberg table is sorted as each file is written.
1847+
1848+
Sorted Iceberg tables can decrease query execution time in many cases; but query times can also depend on the query shape and cluster configuration.
1849+
Sorting is particularly beneficial when the sorted columns have a
1850+
high cardinality and are used as a filter for selective reads.
1851+
1852+
Configure sort order with the ``sorted_by`` table property to specify an array of
1853+
one or more columns to use for sorting.
1854+
The following example creates the table with the ``sorted_by`` property, and sorts the file based
1855+
on the field ``join_date``. The default sort direction is ASC, with null values ordered as NULLS FIRST.
1856+
1857+
.. code-block:: text
1858+
1859+
CREATE TABLE emp.employees.employee (
1860+
emp_id BIGINT,
1861+
emp_name VARCHAR,
1862+
join_date DATE,
1863+
country VARCHAR)
1864+
WITH (
1865+
sorted_by = ARRAY['join_date']
1866+
)
1867+
1868+
Explicitly configure sort directions or null ordering using the following example::
1869+
1870+
CREATE TABLE emp.employees.employee (
1871+
emp_id BIGINT,
1872+
emp_name VARCHAR,
1873+
join_date DATE,
1874+
country VARCHAR)
1875+
WITH (
1876+
sorted_by = ARRAY['join_date DESC NULLS FIRST', 'emp_id ASC NULLS LAST']
1877+
)
1878+
1879+
Sorting can be combined with partitioning on the same column. For example::
1880+
1881+
CREATE TABLE emp.employees.employee (
1882+
emp_id BIGINT,
1883+
emp_name VARCHAR,
1884+
join_date DATE,
1885+
country VARCHAR)
1886+
WITH (
1887+
partitioning = ARRAY['month(join_date)'],
1888+
sorted_by = ARRAY['join_date']
1889+
)
1890+
1891+
The Iceberg connector does not support sort order transforms. The following sort order transformations are not supported:
1892+
1893+
.. code-block:: text
1894+
1895+
bucket(n, column)
1896+
truncate(column, n)
1897+
year(column)
1898+
month(column)
1899+
day(column)
1900+
hour(column)
1901+
1902+
For example::
1903+
1904+
CREATE TABLE emp.employees.employee (
1905+
emp_id BIGINT,
1906+
emp_name VARCHAR,
1907+
join_date DATE,
1908+
country VARCHAR)
1909+
WITH (
1910+
sorted_by = ARRAY['month(join_date)']
1911+
)
1912+
1913+
If a user creates a table externally with non-identity sort columns and then inserts data, the following warning message will be shown.
1914+
``Iceberg table sort order has sort fields of <X>, <Y>, ... which are not currently supported by Presto``
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package com.facebook.presto.hive;
16+
17+
import com.facebook.airlift.configuration.Config;
18+
import com.facebook.airlift.configuration.ConfigDescription;
19+
import io.airlift.units.DataSize;
20+
import io.airlift.units.MaxDataSize;
21+
import io.airlift.units.MinDataSize;
22+
23+
import javax.validation.constraints.Max;
24+
import javax.validation.constraints.Min;
25+
26+
import static io.airlift.units.DataSize.Unit.MEGABYTE;
27+
28+
public class SortingFileWriterConfig
29+
{
30+
private DataSize writerSortBufferSize = new DataSize(64, MEGABYTE);
31+
private int maxOpenSortFiles = 50;
32+
33+
@MinDataSize("1MB")
34+
@MaxDataSize("1GB")
35+
public DataSize getWriterSortBufferSize()
36+
{
37+
return writerSortBufferSize;
38+
}
39+
40+
@Config("writer-sort-buffer-size")
41+
@ConfigDescription("Defines how much memory is used for this in-memory sorting process.")
42+
public SortingFileWriterConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
43+
{
44+
this.writerSortBufferSize = writerSortBufferSize;
45+
return this;
46+
}
47+
48+
@Min(2)
49+
@Max(1000)
50+
public int getMaxOpenSortFiles()
51+
{
52+
return maxOpenSortFiles;
53+
}
54+
55+
@Config("max-open-sort-files")
56+
@ConfigDescription("When writing, the maximum number of temporary files opened at one time to write sorted data.")
57+
public SortingFileWriterConfig setMaxOpenSortFiles(int maxOpenSortFiles)
58+
{
59+
this.maxOpenSortFiles = maxOpenSortFiles;
60+
return this;
61+
}
62+
}

presto-hive/src/main/java/com/facebook/presto/hive/HiveClientConfig.java

-31
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
import javax.validation.constraints.DecimalMax;
3232
import javax.validation.constraints.DecimalMin;
33-
import javax.validation.constraints.Max;
3433
import javax.validation.constraints.Min;
3534
import javax.validation.constraints.NotNull;
3635

@@ -279,20 +278,6 @@ public HiveClientConfig setDomainCompactionThreshold(int domainCompactionThresho
279278
return this;
280279
}
281280

282-
@MinDataSize("1MB")
283-
@MaxDataSize("1GB")
284-
public DataSize getWriterSortBufferSize()
285-
{
286-
return writerSortBufferSize;
287-
}
288-
289-
@Config("hive.writer-sort-buffer-size")
290-
public HiveClientConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
291-
{
292-
this.writerSortBufferSize = writerSortBufferSize;
293-
return this;
294-
}
295-
296281
@Min(1)
297282
public int getMaxConcurrentFileRenames()
298283
{
@@ -695,22 +680,6 @@ public HiveClientConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter)
695680
this.maxPartitionsPerWriter = maxPartitionsPerWriter;
696681
return this;
697682
}
698-
699-
@Min(2)
700-
@Max(1000)
701-
public int getMaxOpenSortFiles()
702-
{
703-
return maxOpenSortFiles;
704-
}
705-
706-
@Config("hive.max-open-sort-files")
707-
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
708-
public HiveClientConfig setMaxOpenSortFiles(int maxOpenSortFiles)
709-
{
710-
this.maxOpenSortFiles = maxOpenSortFiles;
711-
return this;
712-
}
713-
714683
public int getWriteValidationThreads()
715684
{
716685
return writeValidationThreads;

presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void configure(Binder binder)
130130
binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON);
131131
newSetBinder(binder, DynamicConfigurationProvider.class);
132132
configBinder(binder).bindConfig(HiveClientConfig.class);
133-
133+
configBinder(binder).bindConfig(SortingFileWriterConfig.class);
134134
binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON);
135135
binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON);
136136
binder.bind(HiveAnalyzeProperties.class).in(Scopes.SINGLETON);

presto-hive/src/main/java/com/facebook/presto/hive/HivePageSinkProvider.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public HivePageSinkProvider(
9191
TypeManager typeManager,
9292
HiveClientConfig hiveClientConfig,
9393
MetastoreClientConfig metastoreClientConfig,
94+
SortingFileWriterConfig sortingFileWriterConfig,
9495
LocationService locationService,
9596
JsonCodec<PartitionUpdate> partitionUpdateCodec,
9697
SmileCodec<PartitionUpdate> partitionUpdateSmileCodec,
@@ -110,8 +111,8 @@ public HivePageSinkProvider(
110111
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
111112
this.typeManager = requireNonNull(typeManager, "typeManager is null");
112113
this.maxOpenPartitions = hiveClientConfig.getMaxPartitionsPerWriter();
113-
this.maxOpenSortFiles = hiveClientConfig.getMaxOpenSortFiles();
114-
this.writerSortBufferSize = requireNonNull(hiveClientConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
114+
this.maxOpenSortFiles = sortingFileWriterConfig.getMaxOpenSortFiles();
115+
this.writerSortBufferSize = requireNonNull(sortingFileWriterConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
115116
this.immutablePartitions = hiveClientConfig.isImmutablePartitions();
116117
this.locationService = requireNonNull(locationService, "locationService is null");
117118
this.writeVerificationExecutor = listeningDecorator(newFixedThreadPool(hiveClientConfig.getWriteValidationThreads(), daemonThreadsNamed("hive-write-validation-%s")));

presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -1073,6 +1073,7 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
10731073
FUNCTION_AND_TYPE_MANAGER,
10741074
getHiveClientConfig(),
10751075
getMetastoreClientConfig(),
1076+
getSortingFileWriterConfig(),
10761077
locationService,
10771078
HiveTestUtils.PARTITION_UPDATE_CODEC,
10781079
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
@@ -1099,12 +1100,17 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
10991100
protected HiveClientConfig getHiveClientConfig()
11001101
{
11011102
return new HiveClientConfig()
1102-
.setMaxOpenSortFiles(10)
1103-
.setWriterSortBufferSize(new DataSize(100, KILOBYTE))
11041103
.setTemporaryTableSchema(database)
11051104
.setCreateEmptyBucketFilesForTemporaryTable(false);
11061105
}
11071106

1107+
protected SortingFileWriterConfig getSortingFileWriterConfig()
1108+
{
1109+
return new SortingFileWriterConfig()
1110+
.setMaxOpenSortFiles(10)
1111+
.setWriterSortBufferSize(new DataSize(100, KILOBYTE));
1112+
}
1113+
11081114
protected HiveCommonClientConfig getHiveCommonClientConfig()
11091115
{
11101116
return new HiveCommonClientConfig();
@@ -3109,7 +3115,7 @@ private void doTestBucketSortedTables(SchemaTableName table, boolean useTempPath
31093115
true);
31103116
assertThat(listAllDataFiles(context, path))
31113117
.filteredOn(file -> file.contains(".tmp-sort"))
3112-
.size().isGreaterThan(bucketCount * getHiveClientConfig().getMaxOpenSortFiles() * 2);
3118+
.size().isGreaterThan(bucketCount * getSortingFileWriterConfig().getMaxOpenSortFiles() * 2);
31133119

31143120
// finish the write
31153121
Collection<Slice> fragments = getFutureValue(sink.finish());

presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileSystem.java

+1
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ protected void setup(String host, int port, String databaseName, BiFunction<Hive
254254
FUNCTION_AND_TYPE_MANAGER,
255255
config,
256256
metastoreClientConfig,
257+
new SortingFileWriterConfig(),
257258
locationService,
258259
HiveTestUtils.PARTITION_UPDATE_CODEC,
259260
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveClientConfig.java

-6
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ public void testDefaults()
6060
.setMaxInitialSplitSize(new DataSize(32, Unit.MEGABYTE))
6161
.setSplitLoaderConcurrency(4)
6262
.setDomainCompactionThreshold(100)
63-
.setWriterSortBufferSize(new DataSize(64, Unit.MEGABYTE))
6463
.setMaxConcurrentFileRenames(20)
6564
.setMaxConcurrentZeroRowFileCreations(20)
6665
.setRecursiveDirWalkerEnabled(false)
@@ -82,7 +81,6 @@ public void testDefaults()
8281
.setFailFastOnInsertIntoImmutablePartitionsEnabled(true)
8382
.setSortedWritingEnabled(true)
8483
.setMaxPartitionsPerWriter(100)
85-
.setMaxOpenSortFiles(50)
8684
.setWriteValidationThreads(16)
8785
.setTextMaxLineLength(new DataSize(100, Unit.MEGABYTE))
8886
.setUseOrcColumnNames(false)
@@ -195,7 +193,6 @@ public void testExplicitPropertyMappings()
195193
.put("hive.max-initial-split-size", "16MB")
196194
.put("hive.split-loader-concurrency", "1")
197195
.put("hive.domain-compaction-threshold", "42")
198-
.put("hive.writer-sort-buffer-size", "13MB")
199196
.put("hive.recursive-directories", "true")
200197
.put("hive.storage-format", "SEQUENCEFILE")
201198
.put("hive.compression-codec", "NONE")
@@ -207,7 +204,6 @@ public void testExplicitPropertyMappings()
207204
.put("hive.insert-overwrite-immutable-partitions-enabled", "true")
208205
.put("hive.fail-fast-on-insert-into-immutable-partitions-enabled", "false")
209206
.put("hive.max-partitions-per-writers", "222")
210-
.put("hive.max-open-sort-files", "333")
211207
.put("hive.write-validation-threads", "11")
212208
.put("hive.max-concurrent-file-renames", "100")
213209
.put("hive.max-concurrent-zero-row-file-creations", "100")
@@ -313,7 +309,6 @@ public void testExplicitPropertyMappings()
313309
.setMaxInitialSplitSize(new DataSize(16, Unit.MEGABYTE))
314310
.setSplitLoaderConcurrency(1)
315311
.setDomainCompactionThreshold(42)
316-
.setWriterSortBufferSize(new DataSize(13, Unit.MEGABYTE))
317312
.setMaxConcurrentFileRenames(100)
318313
.setMaxConcurrentZeroRowFileCreations(100)
319314
.setRecursiveDirWalkerEnabled(true)
@@ -331,7 +326,6 @@ public void testExplicitPropertyMappings()
331326
.setInsertOverwriteImmutablePartitionEnabled(true)
332327
.setFailFastOnInsertIntoImmutablePartitionsEnabled(false)
333328
.setMaxPartitionsPerWriter(222)
334-
.setMaxOpenSortFiles(333)
335329
.setWriteValidationThreads(11)
336330
.setDomainSocketPath("/foo")
337331
.setS3FileSystemType(S3FileSystemType.EMRFS)

presto-hive/src/test/java/com/facebook/presto/hive/TestHivePageSink.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -114,22 +114,23 @@ public void testAllFormats()
114114
throws Exception
115115
{
116116
HiveClientConfig config = new HiveClientConfig();
117+
SortingFileWriterConfig sortingFileWriterConfig = new SortingFileWriterConfig();
117118
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
118119
File tempDir = Files.createTempDir();
119120
try {
120121
ExtendedHiveMetastore metastore = createTestingFileHiveMetastore(new File(tempDir, "metastore"));
121122
for (HiveStorageFormat format : getSupportedHiveStorageFormats()) {
122123
config.setHiveStorageFormat(format);
123124
config.setCompressionCodec(NONE);
124-
long uncompressedLength = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config));
125+
long uncompressedLength = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config), sortingFileWriterConfig);
125126
assertGreaterThan(uncompressedLength, 0L);
126127

127128
for (HiveCompressionCodec codec : HiveCompressionCodec.values()) {
128129
if (codec == NONE || !codec.isSupportedStorageFormat(format)) {
129130
continue;
130131
}
131132
config.setCompressionCodec(codec);
132-
long length = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config));
133+
long length = writeTestFile(config, metastoreClientConfig, metastore, makeFileName(tempDir, config), sortingFileWriterConfig);
133134
assertTrue(uncompressedLength > length, format("%s with %s compressed to %s which is not less than %s", format, codec, length, uncompressedLength));
134135
}
135136
}
@@ -152,11 +153,11 @@ private static String makeFileName(File tempDir, HiveClientConfig config)
152153
return tempDir.getAbsolutePath() + "/" + config.getHiveStorageFormat().name() + "." + config.getCompressionCodec().name();
153154
}
154155

155-
private static long writeTestFile(HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, String outputPath)
156+
private static long writeTestFile(HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, String outputPath, SortingFileWriterConfig sortingFileWriterConfig)
156157
{
157158
HiveTransactionHandle transaction = new HiveTransactionHandle();
158159
HiveWriterStats stats = new HiveWriterStats();
159-
ConnectorPageSink pageSink = createPageSink(transaction, config, metastoreClientConfig, metastore, new Path("file:///" + outputPath), stats);
160+
ConnectorPageSink pageSink = createPageSink(transaction, config, metastoreClientConfig, metastore, new Path("file:///" + outputPath), stats, sortingFileWriterConfig);
160161
List<LineItemColumn> columns = getTestColumns();
161162
List<Type> columnTypes = columns.stream()
162163
.map(LineItemColumn::getType)
@@ -308,7 +309,7 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa
308309
return provider.createPageSource(transaction, getSession(config, new HiveCommonClientConfig()), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles()), NON_CACHEABLE, new RuntimeStats());
309310
}
310311

311-
private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, Path outputPath, HiveWriterStats stats)
312+
private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, Path outputPath, HiveWriterStats stats, SortingFileWriterConfig sortingFileWriterConfig)
312313
{
313314
LocationHandle locationHandle = new LocationHandle(outputPath, outputPath, Optional.empty(), NEW, DIRECT_TO_TARGET_NEW_DIRECTORY);
314315
HiveOutputTableHandle handle = new HiveOutputTableHandle(
@@ -337,6 +338,7 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio
337338
FUNCTION_AND_TYPE_MANAGER,
338339
config,
339340
metastoreClientConfig,
341+
sortingFileWriterConfig,
340342
new HiveLocationService(hdfsEnvironment),
341343
HiveTestUtils.PARTITION_UPDATE_CODEC,
342344
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,

0 commit comments

Comments
 (0)