Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,20 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.13.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,20 +488,28 @@ private RestoreFiles scanExistingFileMetas(BinaryRow partition, int bucket) {
totalBuckets = restoredTotalBuckets;
}
if (!ignoreNumBucketCheck && totalBuckets != numBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
? "partition "
+ getPartitionComputer(
partitionType,
PARTITION_DEFAULT_NAME.defaultValue(),
legacyPartitionName)
.generatePartValues(partition)
: "table";
throw new RuntimeException(
String.format(
"Try to write %s with a new bucket num %d, but the previous bucket num is %d. "
+ "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.",
partInfo, numBuckets, totalBuckets));
if (partitionType.getFieldCount() > 0) {
// For partitioned tables, allow per-partition bucket counts.
// The partition's existing bucket count takes precedence over the
// table-level default. This supports rescale operations where different
// partitions may have different bucket counts.
LOG.info(
"Partition {} uses {} buckets (table default: {}). "
+ "Accepting per-partition bucket count.",
getPartitionComputer(
partitionType,
PARTITION_DEFAULT_NAME.defaultValue(),
legacyPartitionName)
.generatePartValues(partition),
totalBuckets,
numBuckets);
} else {
throw new RuntimeException(
String.format(
"Try to write table with a new bucket num %d, but the previous bucket num is %d. "
+ "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.",
numBuckets, totalBuckets));
}
}
return restored;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.sink.WriteSelector;
Expand Down Expand Up @@ -240,6 +241,13 @@ public TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer writeId)
return wrapped.newWrite(commitUser, writeId);
}

@Override
public TableWriteImpl<?> newWrite(
String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newWrite(commitUser, writeId, rowKeyExtractor);
}

@Override
public TableCommitImpl newCommit(String commitUser) {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketWriteSelector;
import org.apache.paimon.table.sink.PartitionBucketMapping;
import org.apache.paimon.table.sink.PostponeBucketRowKeyExtractor;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.RowKindGenerator;
Expand Down Expand Up @@ -218,7 +219,9 @@ public Optional<Statistics> statistics() {
public Optional<WriteSelector> newWriteSelector() {
switch (bucketMode()) {
case HASH_FIXED:
return Optional.of(new FixedBucketWriteSelector(schema()));
return Optional.of(
new FixedBucketWriteSelector(
schema(), PartitionBucketMapping.lazyLoadFromTable(this)));
case BUCKET_UNAWARE:
case POSTPONE_MODE:
return Optional.empty();
Expand All @@ -236,7 +239,8 @@ public CatalogEnvironment catalogEnvironment() {
public RowKeyExtractor createRowKeyExtractor() {
switch (bucketMode()) {
case HASH_FIXED:
return new FixedBucketRowKeyExtractor(schema());
return new FixedBucketRowKeyExtractor(
schema(), PartitionBucketMapping.lazyLoadFromTable(this));
case HASH_DYNAMIC:
case KEY_DYNAMIC:
return new DynamicBucketRowKeyExtractor(schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.AppendOnlySplitGenerator;
import org.apache.paimon.table.source.AppendTableRead;
Expand Down Expand Up @@ -127,11 +128,17 @@ public TableWriteImpl<InternalRow> newWrite(String commitUser) {

@Override
public TableWriteImpl<InternalRow> newWrite(String commitUser, @Nullable Integer writeId) {
return newWrite(commitUser, writeId, createRowKeyExtractor());
}

@Override
public TableWriteImpl<InternalRow> newWrite(
String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
BaseAppendFileStoreWrite writer = store().newWrite(commitUser, writeId);
return new TableWriteImpl<>(
rowType(),
writer,
createRowKeyExtractor(),
rowKeyExtractor,
(record, rowKind) -> {
Preconditions.checkState(
rowKind.isAdd(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,12 @@ public TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer writeId)
return wrapped.newWrite(commitUser, writeId);
}

@Override
public TableWriteImpl<?> newWrite(
String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
return wrapped.newWrite(commitUser, writeId, rowKeyExtractor);
}

@Override
public TableCommitImpl newCommit(String commitUser) {
return wrapped.newCommit(commitUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ default Optional<String> comment() {

TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer writeId);

/**
* Create a new write with a custom {@link RowKeyExtractor}. This is useful for scenarios like
* rescaling where the bucket assignment logic needs to be overridden.
*/
TableWriteImpl<?> newWrite(
String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor);

@Override
TableCommitImpl newCommit(String commitUser);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.KeyValueTableRead;
Expand Down Expand Up @@ -159,11 +160,17 @@ public TableWriteImpl<KeyValue> newWrite(String commitUser) {

@Override
public TableWriteImpl<KeyValue> newWrite(String commitUser, @Nullable Integer writeId) {
return newWrite(commitUser, writeId, createRowKeyExtractor());
}

@Override
public TableWriteImpl<KeyValue> newWrite(
String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
rowType(),
store().newWrite(commitUser, writeId),
createRowKeyExtractor(),
rowKeyExtractor,
(record, rowKind) ->
kv.replace(
record.primaryKey(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketWriteSelector;
import org.apache.paimon.table.sink.PartitionBucketMapping;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.sink.WriteSelector;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.Optional;

/**
* A {@link FileStoreTable} wrapper for rescale operations that overrides bucket-related behavior to
* use the new target bucket count instead of loading per-partition bucket mappings from the
* manifest.
*/
public class RescaleFileStoreTable extends DelegatedFileStoreTable {

public RescaleFileStoreTable(FileStoreTable wrapped) {
super(wrapped);
}

@Override
public Optional<WriteSelector> newWriteSelector() {
return Optional.of(
new FixedBucketWriteSelector(
schema(), new PartitionBucketMapping(schema().numBuckets())));
}

@Override
public RowKeyExtractor createRowKeyExtractor() {
return new FixedBucketRowKeyExtractor(
schema(), new PartitionBucketMapping(schema().numBuckets()));
}

@Override
public TableWriteImpl<?> newWrite(String commitUser) {
return newWrite(commitUser, null);
}

@Override
public TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer writeId) {
return wrapped().newWrite(commitUser, writeId, createRowKeyExtractor());
}

@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
return new RescaleFileStoreTable(wrapped().copy(dynamicOptions));
}

@Override
public FileStoreTable copy(TableSchema newTableSchema) {
return new RescaleFileStoreTable(wrapped().copy(newTableSchema));
}

@Override
public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
return new RescaleFileStoreTable(wrapped().copyWithoutTimeTravel(dynamicOptions));
}

@Override
public FileStoreTable copyWithLatestSchema() {
return new RescaleFileStoreTable(wrapped().copyWithLatestSchema());
}

@Override
public FileStoreTable switchToBranch(String branchName) {
return new RescaleFileStoreTable(wrapped().switchToBranch(branchName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,23 @@
/** {@link KeyAndBucketExtractor} for {@link InternalRow}. */
public class FixedBucketRowKeyExtractor extends RowKeyExtractor {

private final int numBuckets;
private transient Projection bucketKeyProjection;

private final boolean sameBucketKeyAndTrimmedPrimaryKey;
private final Projection bucketKeyProjection;
private final PartitionBucketMapping partitionBucketMapping;

private BinaryRow reuseBucketKey;
private Integer reuseBucket;
private final BucketFunction bucketFunction;

public FixedBucketRowKeyExtractor(TableSchema schema) {
public FixedBucketRowKeyExtractor(
TableSchema schema, PartitionBucketMapping partitionBucketMapping) {
super(schema);
numBuckets = new CoreOptions(schema.options()).bucket();
bucketFunction =
BucketFunction.create(
new CoreOptions(schema.options()), schema.logicalBucketKeyType());
sameBucketKeyAndTrimmedPrimaryKey = schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
bucketKeyProjection =
CodeGenUtils.newProjection(
schema.logicalRowType(), schema.projection(schema.bucketKeys()));
this.partitionBucketMapping = partitionBucketMapping;
}

@Override
Expand All @@ -62,7 +61,7 @@ private BinaryRow bucketKey() {
}

if (reuseBucketKey == null) {
reuseBucketKey = bucketKeyProjection.apply(record);
reuseBucketKey = bucketKeyProjection().apply(record);
}
return reuseBucketKey;
}
Expand All @@ -71,8 +70,18 @@ private BinaryRow bucketKey() {
public int bucket() {
BinaryRow bucketKey = bucketKey();
if (reuseBucket == null) {
int numBuckets = partitionBucketMapping.resolveNumBuckets(partition());
reuseBucket = bucketFunction.bucket(bucketKey, numBuckets);
}
return reuseBucket;
}

private Projection bucketKeyProjection() {
if (bucketKeyProjection == null) {
bucketKeyProjection =
CodeGenUtils.newProjection(
schema.logicalRowType(), schema.projection(schema.bucketKeys()));
}
return bucketKeyProjection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ public class FixedBucketWriteSelector implements WriteSelector {
private static final long serialVersionUID = 1L;

private final TableSchema schema;
private final PartitionBucketMapping partitionBucketMapping;

private transient KeyAndBucketExtractor<InternalRow> extractor;

public FixedBucketWriteSelector(TableSchema schema) {
public FixedBucketWriteSelector(
TableSchema schema, PartitionBucketMapping partitionBucketMapping) {
this.schema = schema;
this.partitionBucketMapping = partitionBucketMapping;
}

@Override
public int select(InternalRow row, int numWriters) {
if (extractor == null) {
extractor = new FixedBucketRowKeyExtractor(schema);
extractor = new FixedBucketRowKeyExtractor(schema, partitionBucketMapping);
}
extractor.setRecord(row);
return ChannelComputer.select(extractor.partition(), extractor.bucket(), numWriters);
Expand Down
Loading
Loading