Skip to content

Commit

Permalink
orc_retain_size_flush_policy
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyangfb committed Sep 24, 2024
1 parent 2c1426b commit 6147f01
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.facebook.presto.orc.FlushReason.DICTIONARY_FULL;
import static com.facebook.presto.orc.FlushReason.MAX_BYTES;
import static com.facebook.presto.orc.FlushReason.MAX_RETAIN_BYTES;
import static com.facebook.presto.orc.FlushReason.MAX_ROWS;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -33,28 +34,34 @@ public class DefaultOrcWriterFlushPolicy
{
public static final DataSize DEFAULT_STRIPE_MIN_SIZE = new DataSize(32, MEGABYTE);
public static final DataSize DEFAULT_STRIPE_MAX_SIZE = new DataSize(64, MEGABYTE);
public static final DataSize DEFAULT_STRIPE_MAX_RETAIN_SIZE = new DataSize(1024, MEGABYTE);
public static final int DEFAULT_STRIPE_MAX_ROW_COUNT = 10_000_000;

private final int stripeMaxRowCount;
private final int stripeMinBytes;
private final int stripeMaxBytes;
private final int stripeMaxRetainSize;

private DefaultOrcWriterFlushPolicy(int stripeMaxRowCount, int stripeMinBytes, int stripeMaxBytes)
private DefaultOrcWriterFlushPolicy(int stripeMaxRowCount, int stripeMinBytes, int stripeMaxBytes, int stripeMaxRetainSize)
{
this.stripeMaxRowCount = stripeMaxRowCount;
this.stripeMinBytes = stripeMinBytes;
this.stripeMaxBytes = stripeMaxBytes;
this.stripeMaxRetainSize = stripeMaxRetainSize;
}

@Override
public Optional<FlushReason> shouldFlushStripe(int stripeRowCount, int bufferedBytes, boolean dictionaryIsFull)
public Optional<FlushReason> shouldFlushStripe(int stripeRowCount, int bufferedBytes, int retainedBytes, boolean dictionaryIsFull)
{
if (stripeRowCount == stripeMaxRowCount) {
return Optional.of(MAX_ROWS);
}
else if (bufferedBytes > stripeMaxBytes) {
return Optional.of(MAX_BYTES);
}
else if (retainedBytes > stripeMaxRetainSize && bufferedBytes > stripeMinBytes) {
return Optional.of(MAX_RETAIN_BYTES);
}
else if (dictionaryIsFull) {
return Optional.of(DICTIONARY_FULL);
}
Expand Down Expand Up @@ -82,6 +89,12 @@ public int getStripeMaxBytes()
return stripeMaxBytes;
}

@Override
public int getRetainMaxBytes()
{
return stripeMaxRetainSize;
}

@Override
public int getStripeMaxRowCount()
{
Expand All @@ -108,6 +121,7 @@ public static class Builder
private int stripeMaxRowCount = DEFAULT_STRIPE_MAX_ROW_COUNT;
private DataSize stripeMinSize = DEFAULT_STRIPE_MIN_SIZE;
private DataSize stripeMaxSize = DEFAULT_STRIPE_MAX_SIZE;
private DataSize stripeMaxRetainSize = DEFAULT_STRIPE_MAX_RETAIN_SIZE;

private Builder() {}

Expand All @@ -130,13 +144,20 @@ public Builder withStripeMaxSize(DataSize stripeMaxSize)
return this;
}

public Builder withStripeMaxRetainSize(DataSize stripeMaxRetainSize)
{
this.stripeMaxRetainSize = requireNonNull(stripeMaxRetainSize, "stripeMaxRetainSize is null");
return this;
}

public DefaultOrcWriterFlushPolicy build()
{
checkArgument(stripeMaxSize.compareTo(stripeMinSize) >= 0, "stripeMaxSize must be greater than or equal to stripeMinSize");
return new DefaultOrcWriterFlushPolicy(
stripeMaxRowCount,
toIntExact(stripeMinSize.toBytes()),
toIntExact(stripeMaxSize.toBytes()));
toIntExact(stripeMaxSize.toBytes()),
toIntExact(stripeMaxRetainSize.toBytes()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ public enum FlushReason
/** Stripe accumulated enough rows. */
MAX_ROWS,

/** tripe binary size reached size limit. */
/** Stripe binary size reached size limit. */
MAX_BYTES,

/** Retain size reached size limit. */
MAX_RETAIN_BYTES,
/** Dictionary binary size reached size limit. */
DICTIONARY_FULL,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ private void writeChunk(Page chunk)
// flush stripe if necessary
bufferedBytes = toIntExact(columnWriters.stream().mapToLong(ColumnWriter::getBufferedBytes).sum());
boolean dictionaryIsFull = dictionaryCompressionOptimizer.isFull(bufferedBytes);
Optional<FlushReason> flushReason = flushPolicy.shouldFlushStripe(stripeRowCount, bufferedBytes, dictionaryIsFull);
Optional<FlushReason> flushReason = flushPolicy.shouldFlushStripe(stripeRowCount, bufferedBytes, toIntExact(getRetainedBytes()), dictionaryIsFull);
if (flushReason.isPresent()) {
flushStripe(flushReason.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public interface OrcWriterFlushPolicy
Optional<FlushReason> shouldFlushStripe(
int stripeRowCount,
int bufferedBytes,
int retainedBytes,
boolean dictionaryIsFull);

/**
Expand All @@ -38,5 +39,7 @@ Optional<FlushReason> shouldFlushStripe(

int getStripeMaxBytes();

int getRetainMaxBytes();

int getStripeMaxRowCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ public void testFlushMaxStripeRowCount()

assertEquals(flushPolicy.getStripeMaxRowCount(), 10);

Optional<FlushReason> actual = flushPolicy.shouldFlushStripe(5, 0, false);
Optional<FlushReason> actual = flushPolicy.shouldFlushStripe(5, 0, 0, false);
assertFalse(actual.isPresent());

actual = flushPolicy.shouldFlushStripe(10, 0, false);
actual = flushPolicy.shouldFlushStripe(10, 0, 0, false);
assertTrue(actual.isPresent());
assertEquals(actual.get(), MAX_ROWS);

actual = flushPolicy.shouldFlushStripe(20, 0, false);
actual = flushPolicy.shouldFlushStripe(20, 0, 0, false);
assertFalse(actual.isPresent());
}

Expand All @@ -59,13 +59,13 @@ public void testFlushMaxStripeSize()
assertEquals(flushPolicy.getStripeMinBytes(), 50);
assertEquals(flushPolicy.getStripeMaxBytes(), 100);

Optional<FlushReason> actual = flushPolicy.shouldFlushStripe(1, 90, false);
Optional<FlushReason> actual = flushPolicy.shouldFlushStripe(1, 90, 0, false);
assertFalse(actual.isPresent());

actual = flushPolicy.shouldFlushStripe(1, 100, false);
actual = flushPolicy.shouldFlushStripe(1, 100, 0, false);
assertFalse(actual.isPresent());

actual = flushPolicy.shouldFlushStripe(1, 200, false);
actual = flushPolicy.shouldFlushStripe(1, 200, 0, false);
assertTrue(actual.isPresent());
assertEquals(actual.get(), MAX_BYTES);
}
Expand All @@ -75,10 +75,10 @@ public void testFlushDictionaryFull()
{
DefaultOrcWriterFlushPolicy flushPolicy = DefaultOrcWriterFlushPolicy.builder().build();

Optional<FlushReason> actual = flushPolicy.shouldFlushStripe(1, 1, false);
Optional<FlushReason> actual = flushPolicy.shouldFlushStripe(1, 1, 0, false);
assertFalse(actual.isPresent());

actual = flushPolicy.shouldFlushStripe(1, 1, true);
actual = flushPolicy.shouldFlushStripe(1, 1, 0, true);
assertTrue(actual.isPresent());
assertEquals(actual.get(), DICTIONARY_FULL);
}
Expand Down

0 comments on commit 6147f01

Please sign in to comment.