Skip to content

Commit 4526e87

Browse files
committed
DISTINCT queries failing with range tombstones
1 parent ed04f46 commit 4526e87

File tree

7 files changed

+94
-31
lines changed

7 files changed

+94
-31
lines changed

src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
public abstract class LazilyInitializedUnfilteredRowIterator extends AbstractIterator<Unfiltered> implements UnfilteredRowIterator
3333
{
3434
private final DecoratedKey partitionKey;
35-
3635
private UnfilteredRowIterator iterator;
36+
private boolean closed = false;
3737

3838
public LazilyInitializedUnfilteredRowIterator(DecoratedKey partitionKey)
3939
{
@@ -97,15 +97,16 @@ protected Unfiltered computeNext()
9797

9898
public void close()
9999
{
100+
// don't use iterator == null as indicator if this is closed since we might re-initialize after closing it in that case.
101+
closed = true;
100102
if (iterator != null)
101-
{
102103
iterator.close();
103-
iterator = null;
104-
}
105104
}
106105

107106
public boolean isOpen()
108107
{
109-
return iterator != null;
108+
if (closed)
109+
return false;
110+
return iterator != null; // for backwards compatibility - if `maybeInit` has not been run on this class, consider it not open, for example SSTableExport seems to rely on this
110111
}
111112
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test;
20+
21+
import java.io.IOException;
22+
23+
import org.junit.Test;
24+
25+
import org.apache.cassandra.distributed.Cluster;
26+
import org.apache.cassandra.distributed.api.ConsistencyLevel;
27+
28+
public class DistinctReadTest extends TestBaseImpl
29+
{
30+
@Test
31+
public void test() throws IOException
32+
{
33+
try (Cluster cluster = init(Cluster.build()
34+
.withNodes(1)
35+
.start()))
36+
{
37+
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int, ck int, x int, PRIMARY KEY (id, ck))"));
38+
cluster.coordinator(1).execute(withKeyspace("DELETE FROM %s.tbl USING TIMESTAMP 100 WHERE id = 1 AND ck < 10 "), ConsistencyLevel.ONE);
39+
cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (id, ck, x) VALUES (1, 5, 7) USING TIMESTAMP 101"), ConsistencyLevel.ONE);
40+
cluster.get(1).flush(KEYSPACE);
41+
// all these failed before fix;
42+
cluster.coordinator(1).execute(withKeyspace("select distinct id from %s.tbl where token(id) > " + Long.MIN_VALUE), ConsistencyLevel.ONE);
43+
cluster.coordinator(1).execute(withKeyspace("select distinct id from %s.tbl where id > 0 allow filtering"), ConsistencyLevel.ONE);
44+
cluster.coordinator(1).execute(withKeyspace("select id from %s.tbl where token(id) > " + Long.MIN_VALUE +" PER PARTITION LIMIT 1"), ConsistencyLevel.ONE);
45+
}
46+
}
47+
}

test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -169,26 +169,28 @@ private SSTableStats antiCompactRanges(ColumnFamilyStore store, RangesAtEndpoint
169169
{
170170
while (scanner.hasNext())
171171
{
172-
UnfilteredRowIterator row = scanner.next();
173-
Token token = row.partitionKey().getToken();
174-
if (sstable.isPendingRepair() && !sstable.isTransient())
172+
try (UnfilteredRowIterator row = scanner.next())
175173
{
176-
assertTrue(fullContains.test(token));
177-
assertFalse(transContains.test(token));
178-
stats.pendingKeys++;
179-
}
180-
else if (sstable.isPendingRepair() && sstable.isTransient())
181-
{
182-
183-
assertTrue(transContains.test(token));
184-
assertFalse(fullContains.test(token));
185-
stats.transKeys++;
186-
}
187-
else
188-
{
189-
assertFalse(fullContains.test(token));
190-
assertFalse(transContains.test(token));
191-
stats.unrepairedKeys++;
174+
Token token = row.partitionKey().getToken();
175+
if (sstable.isPendingRepair() && !sstable.isTransient())
176+
{
177+
assertTrue(fullContains.test(token));
178+
assertFalse(transContains.test(token));
179+
stats.pendingKeys++;
180+
}
181+
else if (sstable.isPendingRepair() && sstable.isTransient())
182+
{
183+
184+
assertTrue(transContains.test(token));
185+
assertFalse(fullContains.test(token));
186+
stats.transKeys++;
187+
}
188+
else
189+
{
190+
assertFalse(fullContains.test(token));
191+
assertFalse(transContains.test(token));
192+
stats.unrepairedKeys++;
193+
}
192194
}
193195
}
194196
}

test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.cassandra.db.DecoratedKey;
5252
import org.apache.cassandra.db.Keyspace;
5353
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
54+
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
5455
import org.apache.cassandra.dht.Range;
5556
import org.apache.cassandra.dht.Token;
5657
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -286,7 +287,12 @@ public void testCompactionProgress() throws Exception
286287
ISSTableScanner scanner = scanners.get(0);
287288
// scan through to the end
288289
while (scanner.hasNext())
289-
scanner.next();
290+
{
291+
try (UnfilteredRowIterator ignored = scanner.next())
292+
{
293+
// just close the iterator
294+
}
295+
}
290296

291297
// scanner.getCurrentPosition should be equal to total bytes of L1 sstables
292298
assertEquals(scanner.getCurrentPosition(), SSTableReader.getTotalUncompressedBytes(sstables));

test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,10 @@ public void testNoExpire() throws InterruptedException, IOException
258258
assertTrue(scanner.hasNext());
259259
while(scanner.hasNext())
260260
{
261-
UnfilteredRowIterator iter = scanner.next();
262-
assertEquals(Util.dk(noTTLKey), iter.partitionKey());
261+
try (UnfilteredRowIterator iter = scanner.next())
262+
{
263+
assertEquals(Util.dk(noTTLKey), iter.partitionKey());
264+
}
263265
}
264266
scanner.close();
265267
}

test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,6 @@ else if (ck1 == ck2 - 1) // cell tombstone
214214
{
215215
try (UnfilteredRowIterator rowIterator = scanner.next())
216216
{
217-
// only 1 partition data
218-
assertFalse(scanner.hasNext());
219217
List<Unfiltered> expectedUnfiltereds = new ArrayList<>();
220218
rowIterator.forEachRemaining(expectedUnfiltereds::add);
221219

@@ -227,15 +225,17 @@ else if (ck1 == ck2 - 1) // cell tombstone
227225
assertTrue(scannerForThrottle.hasNext());
228226
try (UnfilteredRowIterator rowIteratorForThrottle = scannerForThrottle.next())
229227
{
230-
assertFalse(scannerForThrottle.hasNext());
231228
verifyThrottleIterator(expectedUnfiltereds,
232229
rowIteratorForThrottle,
233230
new ThrottledUnfilteredIterator(rowIteratorForThrottle, throttle),
234231
throttle);
235232
}
233+
assertFalse(scannerForThrottle.hasNext());
236234
}
237235
}
238236
}
237+
// only 1 partition data
238+
assertFalse(scanner.hasNext());
239239
}
240240
}
241241

test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,12 @@ public void testSingleDataRangeWithMovedStart() throws IOException
345345
// full range scan
346346
ISSTableScanner scanner = sstable.getScanner();
347347
for (int i = 4; i < 10; i++)
348-
assertEquals(toKey(i), new String(scanner.next().partitionKey().getKey().array()));
348+
{
349+
try (UnfilteredRowIterator row = scanner.next())
350+
{
351+
assertEquals(toKey(i), new String(row.partitionKey().getKey().array()));
352+
}
353+
}
349354

350355
scanner.close();
351356

0 commit comments

Comments
 (0)