Skip to content

Commit c006bfc

Browse files
authored
STAR-1353 Change Verifier to allow no column family store (apache#469)
Change Verifier to not require a column family store if the option mutateRepairStatus is false. Also SSTableReader can be created without column family store by passing metadata directly.
1 parent 4499e5e commit c006bfc

File tree

5 files changed

+184
-31
lines changed

5 files changed

+184
-31
lines changed

src/java/org/apache/cassandra/db/compaction/Verifier.java

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -59,24 +59,24 @@
5959
import java.io.IOException;
6060
import java.nio.ByteBuffer;
6161
import java.nio.file.Files;
62-
import java.nio.file.Path;
63-
import java.nio.file.Paths;
6462
import java.util.*;
6563
import java.util.concurrent.locks.Lock;
6664
import java.util.concurrent.locks.ReadWriteLock;
6765
import java.util.concurrent.locks.ReentrantReadWriteLock;
6866
import java.util.function.Function;
69-
import java.util.function.LongPredicate;
67+
68+
import javax.annotation.Nullable;
7069

7170
import org.apache.cassandra.io.util.File;
7271

72+
import static com.google.common.base.Preconditions.checkArgument;
73+
import static com.google.common.base.Preconditions.checkState;
74+
7375
public class Verifier implements Closeable
7476
{
75-
private final CompactionRealm realm;
77+
private final @Nullable CompactionRealm realm;
7678
private final SSTableReader sstable;
7779

78-
private final CompactionController controller;
79-
8080
private final ReadWriteLock fileAccessLock;
8181
private final RandomAccessReader dataFile;
8282
private final VerifyInfo verifyInfo;
@@ -94,19 +94,45 @@ public class Verifier implements Closeable
9494
private final OutputHandler outputHandler;
9595
private FileDigestValidator validator;
9696

97-
public Verifier(CompactionRealm realm, SSTableReader sstable, boolean isOffline, Options options)
97+
/**
98+
* Creates an instance of Verifier without providing a CompactionRealm. This is only
99+
* allowed if the provided options have {@link Options#mutateRepairStatus} set to false.
100+
* @param sstable The SSTable reader used to verify the SSTable files
101+
* @param isOffline if set to true reading the SSTable data file is not rate limited
102+
* @param options the verification options
103+
*/
104+
public Verifier(SSTableReader sstable, boolean isOffline, Options options)
105+
{
106+
this(sstable, new OutputHandler.LogOutput(), isOffline, options);
107+
}
108+
109+
/**
110+
* Creates an instance of Verifier without providing a CompactionRealm. This is only
111+
* allowed if the provided options have {@link Options#mutateRepairStatus} set to false.
112+
* @param sstable The SSTable reader used to verify the SSTable files
113+
* @param outputHandler The output handler used for logging
114+
* @param isOffline if set to true reading the SSTable data file is not rate limited
115+
* @param options the verification options
116+
*/
117+
public Verifier(SSTableReader sstable, OutputHandler outputHandler, boolean isOffline, Options options)
118+
{
119+
this(null, sstable, outputHandler, isOffline, options);
120+
}
121+
122+
public Verifier(@Nullable CompactionRealm realm, SSTableReader sstable, boolean isOffline, Options options)
98123
{
99124
this(realm, sstable, new OutputHandler.LogOutput(), isOffline, options);
100125
}
101126

102-
public Verifier(CompactionRealm realm, SSTableReader sstable, OutputHandler outputHandler, boolean isOffline, Options options)
127+
public Verifier(@Nullable CompactionRealm realm, SSTableReader sstable, OutputHandler outputHandler, boolean isOffline, Options options)
103128
{
129+
checkArgument(!options.mutateRepairStatus || realm != null,
130+
"Compaction realm must be provided with option mutateRepairStatus=true");
131+
104132
this.realm = realm;
105133
this.sstable = sstable;
106134
this.outputHandler = outputHandler;
107135

108-
this.controller = new VerifyController(realm);
109-
110136
this.fileAccessLock = new ReentrantReadWriteLock();
111137
this.dataFile = isOffline
112138
? sstable.openDataReader()
@@ -342,10 +368,6 @@ public void verify()
342368
{
343369
throw Throwables.propagate(t);
344370
}
345-
finally
346-
{
347-
controller.close();
348-
}
349371

350372
outputHandler.output("Verify of " + sstable + " succeeded. All " + goodRows + " rows read successfully");
351373
}
@@ -426,11 +448,11 @@ private void deserializeIndex(SSTableReader sstable) throws IOException
426448
private void deserializeIndexSummary(SSTableReader sstable) throws IOException
427449
{
428450
File file = sstable.descriptor.fileFor(Component.SUMMARY);
429-
TableMetadata metadata = realm.metadata();
451+
TableMetadata metadata = sstable.metadata();
430452
try (DataInputStream iStream = new DataInputStream(Files.newInputStream(file.toPath())))
431453
{
432454
try (IndexSummary indexSummary = IndexSummary.serializer.deserialize(iStream,
433-
realm.getPartitioner(),
455+
sstable.getPartitioner(),
434456
metadata.params.minIndexInterval,
435457
metadata.params.maxIndexInterval))
436458
{
@@ -480,6 +502,7 @@ private void markAndThrow(Throwable cause, boolean mutateRepaired)
480502
{
481503
if (mutateRepaired && options.mutateRepairStatus) // if we are able to mutate repaired flag, an incremental repair should be enough
482504
{
505+
checkState(realm != null, "Cannot mutate repair status as compaction realm is null");
483506
try
484507
{
485508
realm.mutateRepairedWithLock(ImmutableList.of(sstable), ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getPendingRepair(), sstable.isTransient());
@@ -544,20 +567,6 @@ public boolean isGlobal()
544567
}
545568
}
546569

547-
private static class VerifyController extends CompactionController
548-
{
549-
public VerifyController(CompactionRealm cfs)
550-
{
551-
super(cfs, Integer.MAX_VALUE);
552-
}
553-
554-
@Override
555-
public LongPredicate getPurgeEvaluator(DecoratedKey key)
556-
{
557-
return time -> false;
558-
}
559-
}
560-
561570
public static Options.Builder options()
562571
{
563572
return new Options.Builder();

src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,12 @@ private static SSTableReader openNoValidation(Descriptor descriptor, Set<Compone
466466
return open(descriptor, components, cfs.metadata, false, true);
467467
}
468468

469+
// use only for offline or "Standalone" operations
470+
private static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, TableMetadataRef metadata)
471+
{
472+
return open(descriptor, components, metadata, false, true);
473+
}
474+
469475
// use only for offline or "Standalone" operations
470476
private static SSTableReader openNoValidation(Descriptor descriptor, TableMetadataRef metadata)
471477
{
@@ -2510,6 +2516,8 @@ public interface Factory
25102516

25112517
SSTableReader openNoValidation(Descriptor desc, TableMetadataRef tableMetadataRef);
25122518

2519+
SSTableReader openNoValidation(Descriptor desc, Set<Component> components, TableMetadataRef metadata);
2520+
25132521
SSTableReader openNoValidation(Descriptor desc, Set<Component> components, ColumnFamilyStore cfs);
25142522

25152523
SSTableReader moveAndOpenSSTable(ColumnFamilyStore cfs, Descriptor oldDescriptor, Descriptor newDescriptor, Set<Component> components, boolean copyData);
@@ -2561,6 +2569,12 @@ public SSTableReader openNoValidation(Descriptor desc, TableMetadataRef tableMet
25612569
return SSTableReader.openNoValidation(desc, tableMetadataRef);
25622570
}
25632571

2572+
@Override
2573+
public SSTableReader openNoValidation(Descriptor desc, Set<Component> components, TableMetadataRef metadata)
2574+
{
2575+
return SSTableReader.openNoValidation(desc, components, metadata);
2576+
}
2577+
25642578
@Override
25652579
public SSTableReader openNoValidation(Descriptor desc, Set<Component> components, ColumnFamilyStore cfs)
25662580
{

src/java/org/apache/cassandra/io/sstable/format/trieindex/TrieIndexFormat.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,12 @@ public SSTableReader openNoValidation(Descriptor desc, TableMetadataRef tableMet
264264
return TrieIndexSSTableReader.open(desc, componentsFor(desc), tableMetadataRef, false, true);
265265
}
266266

267+
@Override
268+
public SSTableReader openNoValidation(Descriptor desc, Set<Component> components, TableMetadataRef metadata)
269+
{
270+
return TrieIndexSSTableReader.open(desc, components, metadata, false, true);
271+
}
272+
267273
@Override
268274
public SSTableReader openNoValidation(Descriptor desc, Set<Component> components, ColumnFamilyStore cfs)
269275
{

src/java/org/apache/cassandra/utils/OutputHandler.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,19 @@ public default void warn(Throwable th)
4141

4242
public static class LogOutput implements OutputHandler
4343
{
44-
private static Logger logger = LoggerFactory.getLogger(LogOutput.class);
44+
private static final Logger LOGGER_LOGOUTPUT = LoggerFactory.getLogger(LogOutput.class);
45+
46+
private final Logger logger;
47+
48+
public LogOutput(Logger logger)
49+
{
50+
this.logger = logger;
51+
}
52+
53+
public LogOutput()
54+
{
55+
this(LOGGER_LOGOUTPUT);
56+
}
4557

4658
public void output(String msg)
4759
{
@@ -64,6 +76,15 @@ public void warn(String msg, Throwable th)
6476
}
6577
}
6678

79+
@DseLegacy
80+
public static class CustomLogOutput extends LogOutput
81+
{
82+
public CustomLogOutput(Logger customLogger)
83+
{
84+
super(customLogger);
85+
}
86+
}
87+
6788
public static class SystemOutput implements OutputHandler
6889
{
6990
public final boolean debug;

test/unit/org/apache/cassandra/db/VerifyTest.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,21 @@
2626
import java.util.ArrayList;
2727
import java.util.Collections;
2828
import java.util.List;
29+
import java.util.Set;
2930
import java.util.concurrent.ExecutionException;
3031
import java.util.zip.CRC32;
3132
import java.util.zip.CheckedInputStream;
3233

3334
import com.google.common.base.Charsets;
3435
import org.apache.commons.lang3.StringUtils;
36+
import org.junit.After;
3537
import org.junit.Assume;
38+
import org.junit.Before;
3639
import org.junit.BeforeClass;
3740
import org.junit.Test;
3841

42+
import org.slf4j.LoggerFactory;
43+
3944
import org.apache.cassandra.UpdateBuilder;
4045
import org.apache.cassandra.Util;
4146
import org.apache.cassandra.batchlog.Batch;
@@ -54,8 +59,11 @@
5459
import org.apache.cassandra.io.FSWriteError;
5560
import org.apache.cassandra.io.sstable.Component;
5661
import org.apache.cassandra.io.sstable.CorruptSSTableException;
62+
import org.apache.cassandra.io.sstable.Descriptor;
5763
import org.apache.cassandra.io.sstable.format.SSTableFormat;
5864
import org.apache.cassandra.io.sstable.format.SSTableReader;
65+
import org.apache.cassandra.io.sstable.format.big.BigFormat;
66+
import org.apache.cassandra.io.sstable.format.trieindex.TrieIndexFormat;
5967
import org.apache.cassandra.io.util.File;
6068
import org.apache.cassandra.io.util.FileInputStreamPlus;
6169
import org.apache.cassandra.io.util.FileUtils;
@@ -65,6 +73,7 @@
6573
import org.apache.cassandra.schema.KeyspaceParams;
6674
import org.apache.cassandra.service.StorageService;
6775
import org.apache.cassandra.utils.ByteBufferUtil;
76+
import org.apache.cassandra.utils.OutputHandler;
6877
import org.apache.cassandra.utils.UUIDGen;
6978

7079
import static org.apache.cassandra.SchemaLoader.counterCFMD;
@@ -75,6 +84,7 @@
7584
import static org.hamcrest.Matchers.is;
7685
import static org.junit.Assert.assertEquals;
7786
import static org.junit.Assert.assertFalse;
87+
import static org.junit.Assert.assertThrows;
7888
import static org.junit.Assert.assertTrue;
7989
import static org.junit.Assert.fail;
8090

@@ -106,6 +116,8 @@ public class VerifyTest
106116
public static final String CF_UUID = "UUIDKeys";
107117
public static final String BF_ALWAYS_PRESENT = "BfAlwaysPresent";
108118

119+
private String savedProp;
120+
109121
@BeforeClass
110122
public static void defineSchema() throws ConfigurationException
111123
{
@@ -133,6 +145,20 @@ public static void defineSchema() throws ConfigurationException
133145
standardCFMD(KEYSPACE, BF_ALWAYS_PRESENT).bloomFilterFpChance(1.0));
134146
}
135147

148+
@Before
149+
public void before()
150+
{
151+
savedProp = System.getProperty(SSTableFormat.FORMAT_DEFAULT_PROP);
152+
}
153+
154+
@After
155+
public void after()
156+
{
157+
if (savedProp == null)
158+
System.getProperties().remove(SSTableFormat.FORMAT_DEFAULT_PROP);
159+
else
160+
System.setProperty(SSTableFormat.FORMAT_DEFAULT_PROP, savedProp);
161+
}
136162

137163
@Test
138164
public void testVerifyCorrect()
@@ -758,6 +784,83 @@ public void testNoFilterFile()
758784
}
759785
}
760786

787+
@Test
788+
public void testVerifyWithoutRealmBTI()
789+
{
790+
CompactionManager.instance.disableAutoCompaction();
791+
Keyspace keyspace = Keyspace.open(KEYSPACE);
792+
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
793+
794+
fillCF(cfs, 2);
795+
796+
Descriptor descriptor = cfs.getLiveSSTables().iterator().next().getDescriptor();
797+
Set<Component> components = SSTableReader.componentsFor(descriptor);
798+
SSTableFormat trieFormat = descriptor.getFormat();
799+
SSTableReader.Factory readerFactory = trieFormat.getReaderFactory();
800+
801+
assertTrue(trieFormat instanceof TrieIndexFormat);
802+
803+
SSTableReader sstable = readerFactory.openNoValidation(descriptor, components, cfs.metadata);
804+
Verifier.Options options = Verifier.options().invokeDiskFailurePolicy(true).build();
805+
806+
try (Verifier verifier = new Verifier(sstable, true, options))
807+
{
808+
verifier.verify();
809+
}
810+
catch (CorruptSSTableException err)
811+
{
812+
fail("Unexpected CorruptSSTableException");
813+
}
814+
}
815+
816+
@Test
817+
public void testVerifyWithoutRealmBIG()
818+
{
819+
System.setProperty(SSTableFormat.FORMAT_DEFAULT_PROP, SSTableFormat.Type.BIG.name);
820+
CompactionManager.instance.disableAutoCompaction();
821+
Keyspace keyspace = Keyspace.open(KEYSPACE);
822+
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
823+
824+
fillCF(cfs, 2);
825+
826+
Descriptor descriptor = cfs.getLiveSSTables().iterator().next().getDescriptor();
827+
Set<Component> components = SSTableReader.componentsFor(descriptor);
828+
SSTableFormat bigFormat = descriptor.getFormat();
829+
SSTableReader.Factory readerFactory = bigFormat.getReaderFactory();
830+
831+
assertTrue(bigFormat instanceof BigFormat);
832+
833+
SSTableReader sstable = readerFactory.openNoValidation(descriptor, components, cfs.metadata);
834+
OutputHandler.CustomLogOutput handler = new OutputHandler.CustomLogOutput(LoggerFactory.getLogger(Verifier.class));
835+
Verifier.Options options = Verifier.options().invokeDiskFailurePolicy(true).build();
836+
837+
try (Verifier verifier = new Verifier(sstable, handler, true, options))
838+
{
839+
verifier.verify();
840+
}
841+
catch (CorruptSSTableException err)
842+
{
843+
fail("Unexpected CorruptSSTableException");
844+
}
845+
}
846+
847+
@Test
848+
public void testVerifierIllegalArgument()
849+
{
850+
CompactionManager.instance.disableAutoCompaction();
851+
Keyspace keyspace = Keyspace.open(KEYSPACE);
852+
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
853+
854+
fillCF(cfs, 2);
855+
856+
SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
857+
858+
// Check that is not possible to create a Verifier without passing a ColumnFamilyStore
859+
// if mutateRepairStatus is true.
860+
Verifier.Options optionsRepairTrue = Verifier.options().mutateRepairStatus(true).build();
861+
assertThrows(IllegalArgumentException.class,
862+
() -> new Verifier(sstable, false, optionsRepairTrue));
863+
}
761864

762865

763866
private DecoratedKey dk(long l)

0 commit comments

Comments
 (0)