Skip to content

Commit 0f40172

Browse files
committed
Fix range queries on early-open BTI files
patch by Branimir Lambov; reviewed by TBD for CASSANDRA-20976
1 parent 829109f commit 0f40172

File tree

8 files changed

+246
-7
lines changed

8 files changed

+246
-7
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.0.6
2+
* Fix range queries on early-open BTI files (CASSANDRA-20976)
23
* Avoid re-initializing underlying iterator in LazilyInitializedUnfilteredRowIterator after closing (CASSANDRA-20972)
34
* Flush SAI segment builder when current SSTable writer is switched (CASSANDRA-20752)
45
* Throw RTE instead of FSError when RTE is thrown from FileUtis.write in TOCComponent (CASSANDRA-20917)

src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,8 @@ void complete() throws FSWriteError
262262
PartitionIndex completedPartitionIndex()
263263
{
264264
complete();
265-
rowIndexFHBuilder.withLengthOverride(0);
266-
partitionIndexFHBuilder.withLengthOverride(0);
265+
rowIndexFHBuilder.withLengthOverride(NO_LENGTH_OVERRIDE);
266+
partitionIndexFHBuilder.withLengthOverride(NO_LENGTH_OVERRIDE);
267267
try
268268
{
269269
return PartitionIndex.load(partitionIndexFHBuilder, metadata.getLocal().partitioner, false);

src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndex.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public PartitionIndex(FileHandle fh, long trieRoot, long keyCount, DecoratedKey
8888
this.root = trieRoot;
8989
}
9090

91-
private PartitionIndex(PartitionIndex src)
91+
protected PartitionIndex(PartitionIndex src)
9292
{
9393
this(src.fh, src.root, src.keyCount, src.first, src.last);
9494
}
@@ -439,7 +439,7 @@ public void dumpTrie(String fileName)
439439
}
440440
catch (Throwable t)
441441
{
442-
logger.warn("Failed to dump trie to {} due to exception {}", fileName, t);
442+
logger.warn("Failed to dump trie to {} due to exception", fileName, t);
443443
}
444444
}
445445

src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private void refreshReadableBoundary()
116116
}
117117
finally
118118
{
119-
fhBuilder.withLengthOverride(-1);
119+
fhBuilder.withLengthOverride(FileHandle.Builder.NO_LENGTH_OVERRIDE);
120120
}
121121

122122
}

src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIndexEarly.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,19 @@ public PartitionIndexEarly(FileHandle fh, long trieRoot, long keyCount, Decorate
4242
this.tail = tail;
4343
}
4444

45+
protected PartitionIndexEarly(PartitionIndexEarly partitionIndexEarly)
46+
{
47+
super(partitionIndexEarly);
48+
this.cutoff = partitionIndexEarly.cutoff;
49+
this.tail = partitionIndexEarly.tail;
50+
}
51+
52+
@Override
53+
public PartitionIndex sharedCopy()
54+
{
55+
return new PartitionIndexEarly(this);
56+
}
57+
4558
@Override
4659
protected Rebufferer instantiateRebufferer()
4760
{

src/java/org/apache/cassandra/io/sstable/format/bti/PartitionIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ static PartitionIterator create(PartitionIndex partitionIndex, IPartitioner part
8383
partitionIterator.advance();
8484
return partitionIterator;
8585
}
86-
catch (IOException | RuntimeException ex)
86+
catch (Throwable ex)
8787
{
8888
if (partitionIterator != null)
8989
{

src/java/org/apache/cassandra/io/tries/Walker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public Walker(Rebufferer source, long root)
7575
bh = source.rebuffer(root);
7676
buf = bh.buffer();
7777
}
78-
catch (RuntimeException ex)
78+
catch (Throwable ex)
7979
{
8080
if (bh != null) bh.release();
8181
source.closeReader();
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.cql3;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Random;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.Future;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
31+
import org.junit.After;
32+
import org.junit.Assert;
33+
import org.junit.Test;
34+
35+
import org.apache.cassandra.config.DatabaseDescriptor;
36+
import org.apache.cassandra.db.ColumnFamilyStore;
37+
import org.hamcrest.Matchers;
38+
39+
import static org.junit.Assert.assertEquals;
40+
import static org.junit.Assert.assertTrue;
41+
42+
public class EarlyOpenCompactionTest extends CQLTester
43+
{
44+
private static final int NUM_PARTITIONS = 1000;
45+
private static final int NUM_ROWS_PER_PARTITION = 100;
46+
private static final int VALUE_SIZE = 1000; // ~1KB per row
47+
private static final int VERIFICATION_THREADS = 4;
48+
49+
private final AtomicBoolean stopVerification = new AtomicBoolean(false);
50+
private final AtomicInteger verificationErrors = new AtomicInteger(0);
51+
private final Random random = new Random();
52+
private ExecutorService executor;
53+
54+
@After
55+
public void cleanupAfter() throws Throwable
56+
{
57+
stopVerification.set(true);
58+
if (executor != null)
59+
{
60+
executor.shutdownNow();
61+
executor.awaitTermination(1, TimeUnit.MINUTES);
62+
}
63+
DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(50);
64+
}
65+
66+
@Test
67+
public void testEarlyOpenDuringCompaction() throws Throwable
68+
{
69+
// Create a table with a simple schema
70+
createTable("CREATE TABLE %s (" +
71+
"pk int, " +
72+
"ck int, " +
73+
"data text, " +
74+
"PRIMARY KEY (pk, ck)" +
75+
")");
76+
77+
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
78+
disableCompaction();
79+
80+
// Insert data to create multiple SSTables
81+
System.out.println("Inserting test data...");
82+
for (int i = 0; i < NUM_PARTITIONS; i++)
83+
{
84+
for (int j = 0; j < NUM_ROWS_PER_PARTITION; j++)
85+
{
86+
String value = randomString(VALUE_SIZE);
87+
execute("INSERT INTO %s (pk, ck, data) VALUES (?, ?, ?)", i, j, value);
88+
}
89+
90+
// Flush from time to time to get 10 sstables
91+
if (i > 0 && i % Math.max(1, NUM_PARTITIONS / 10) == 0)
92+
{
93+
flush();
94+
}
95+
}
96+
97+
// Final flush to ensure all data is written
98+
flush();
99+
100+
// Verify we have multiple SSTables
101+
int sstableCount = cfs.getLiveSSTables().size();
102+
assertTrue("Expected multiple SSTables, got: " + sstableCount, sstableCount > 1);
103+
104+
// Start verification threads
105+
System.out.println("Starting verification threads...");
106+
executor = Executors.newFixedThreadPool(VERIFICATION_THREADS);
107+
List<Future<?>> futures = new ArrayList<>();
108+
109+
for (int i = 0; i < VERIFICATION_THREADS; i++)
110+
{
111+
futures.add(executor.submit(new VerificationTask()));
112+
}
113+
114+
// Wait a bit to ensure verification is running
115+
Thread.sleep(1000);
116+
117+
// Set early open interval to 1MiB to trigger early open during compaction
118+
System.out.println("Setting early open interval to 1MiB...");
119+
DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(1);
120+
// Slow down compaction to give the verifier time to fail.
121+
DatabaseDescriptor.setCompactionThroughputMebibytesPerSec(10);
122+
123+
// Trigger compaction and await its completion
124+
System.out.println("Starting compaction...");
125+
cfs.enableAutoCompaction(true);
126+
127+
// Let verification run for a while during and after compaction
128+
System.out.println("Verifying data during and after compaction...");
129+
Thread.sleep(1000);
130+
131+
// Stop verification
132+
stopVerification.set(true);
133+
134+
// Wait for verification to complete
135+
for (Future<?> future : futures)
136+
{
137+
try
138+
{
139+
future.get(10, TimeUnit.SECONDS);
140+
}
141+
catch (Exception e)
142+
{
143+
System.err.println("Verification task failed: " + e);
144+
e.printStackTrace();
145+
}
146+
}
147+
148+
// Verify no errors occurred during verification
149+
int errors = verificationErrors.get();
150+
assertEquals("Found " + errors + " verification errors. Check logs for details.", 0, errors);
151+
152+
System.out.println("Test completed successfully");
153+
}
154+
155+
private class VerificationTask implements Runnable
156+
{
157+
@Override
158+
public void run()
159+
{
160+
try
161+
{
162+
Random localRandom = new Random(Thread.currentThread().getId());
163+
164+
while (!stopVerification.get() && !Thread.currentThread().isInterrupted())
165+
{
166+
// Randomly choose between point query and partition range query
167+
if (localRandom.nextBoolean())
168+
{
169+
// Point query
170+
int pk = localRandom.nextInt(NUM_PARTITIONS * 110 / 100); // 10% chance outside
171+
int ck = localRandom.nextInt(NUM_ROWS_PER_PARTITION * 110 / 100); // 10% chance outside
172+
173+
try
174+
{
175+
Assert.assertEquals(pk < NUM_PARTITIONS && ck < NUM_ROWS_PER_PARTITION ? 1 : 0,
176+
execute("SELECT data FROM %s WHERE pk = ? AND ck = ?", pk, ck).size());
177+
}
178+
catch (Throwable t)
179+
{
180+
verificationErrors.incrementAndGet();
181+
System.err.println("Point query failed for pk=" + pk + ", ck=" + ck + ": " + t);
182+
t.printStackTrace();
183+
}
184+
}
185+
else
186+
{
187+
// Partition range query
188+
int pk = localRandom.nextInt(NUM_PARTITIONS);
189+
190+
try
191+
{
192+
Assert.assertThat(execute("SELECT data FROM %s WHERE token(pk) <= token(?) AND token(pk) >= token(?)", pk, pk).size(),
193+
Matchers.greaterThanOrEqualTo(NUM_ROWS_PER_PARTITION));
194+
}
195+
catch (Throwable t)
196+
{
197+
verificationErrors.incrementAndGet();
198+
System.err.println("Range query failed for pk in (" + pk + ", " + (pk + 1) + ", " + (pk + 2) + "): " + t);
199+
t.printStackTrace();
200+
}
201+
}
202+
203+
// Add a small delay to prevent overwhelming the system
204+
Thread.yield();
205+
}
206+
}
207+
catch (Throwable t)
208+
{
209+
verificationErrors.incrementAndGet();
210+
System.err.println("Verification task failed: " + t);
211+
t.printStackTrace();
212+
}
213+
}
214+
}
215+
216+
private String randomString(int length)
217+
{
218+
StringBuilder sb = new StringBuilder(length);
219+
for (int i = 0; i < length; i++)
220+
{
221+
sb.append((char)('a' + random.nextInt(26)));
222+
}
223+
return sb.toString();
224+
}
225+
}

0 commit comments

Comments
 (0)