Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/java/org/apache/cassandra/dht/Range.java
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,21 @@ public static <T extends RingPosition<T>> Set<Range<T>> subtract(Collection<Rang
return result;
}

public static <T extends RingPosition<T>> List<Range<T>> intersect(Collection<Range<T>> ranges1, Collection<Range<T>> ranges2)
{
Set<Range<T>> result = new HashSet<>();
// note: O(n^2), simple but not very efficient
for (Range<T> range1 : ranges1)
{
for (Range<T> range2 : ranges2)
{
result.addAll(range1.intersectionWith(range2));
}
}
return normalize(result);
}


/**
* Calculate set of the difference ranges of given two ranges
* (as current (A, B] and rhs is (C, D])
Expand Down
30 changes: 28 additions & 2 deletions src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.cassandra.repair.messages.StatusRequest;
import org.apache.cassandra.repair.messages.StatusResponse;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.repair.NoSuchRepairSessionException;
import org.apache.cassandra.service.StorageService;
Expand Down Expand Up @@ -241,21 +242,46 @@ private boolean shouldStoreSession(LocalSession session)
*/
private boolean isSuperseded(LocalSession session)
{
// to reduce overheads of intersect calculation for tables within the same keyspace
Map<String, Collection<Range<Token>>> rangesPerKeyspaceCache = new HashMap<>();
for (TableId tid : session.tableIds)
{
RepairedState state = repairedStates.get(tid);
TableMetadata tableMetadata = getTableMetadata(tid);
if (tableMetadata == null) // if a table was removed - ignore it
continue;

RepairedState state = repairedStates.get(tid);
if (state == null)
return false;

long minRepaired = state.minRepairedAt(session.ranges);
Collection<Range<Token>> actualRanges = rangesPerKeyspaceCache.computeIfAbsent(tableMetadata.keyspace, (keyspace) -> {
List<Range<Token>> localRanges = getLocalRanges(tableMetadata.keyspace);
if (localRanges.isEmpty()) // to handle the case when we run before the information about owned ranges is properly populated
return session.ranges;

// ignore token ranges which were moved to other nodes and not owned by the current one anymore
return Range.intersect(session.ranges, localRanges);
});
long minRepaired = state.minRepairedAt(actualRanges);
if (minRepaired <= session.repairedAt)
return false;
}

return true;
}

@VisibleForTesting
protected TableMetadata getTableMetadata(TableId tableId)
{
return Schema.instance.getTableMetadata(tableId);
}

@VisibleForTesting
protected List<Range<Token>> getLocalRanges(String keyspace)
{
return StorageService.instance.getLocalAndPendingRanges(keyspace);
}

public RepairedState.Stats getRepairedStats(TableId tid, Collection<Range<Token>> ranges)
{
RepairedState state = repairedStates.get(tid);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test;

import java.util.List;
import java.util.Map;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;

import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.shared.WithProperties;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.Condition;
import org.apache.cassandra.utils.progress.ProgressEventType;

import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
import static org.apache.cassandra.repair.messages.RepairOption.INCREMENTAL_KEY;
import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;

public class IncrementalRepairCleanupAfterNodeAddingTest extends TestBaseImpl
{
@Test
public void test() throws Exception
{
int originalNodeCount = 3;
try (WithProperties withProperties = new WithProperties())
{
withProperties.setProperty("cassandra.repair_delete_timeout_seconds", "0");
try (Cluster cluster = builder().withNodes(originalNodeCount)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(originalNodeCount + 1, 1))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
.start())
{
populate(cluster, 0, 100);

repair(cluster, KEYSPACE, ImmutableMap.of(INCREMENTAL_KEY, "true"));

Thread.sleep(1); // to ensure that we crossed LocalSessions.AUTO_DELETE_TIMEOUT

// to check that the session is still here (it is not superseded yet)
cluster.get(1).runOnInstance(rethrow(() -> {
ActiveRepairService.instance.consistent.local.cleanup();
List<Map<String, String>> sessions = ActiveRepairService.instance.getSessions(true, null);
Assert.assertThat(sessions, hasSize(1));
}));

addNode(cluster);

repair(cluster, KEYSPACE, ImmutableMap.of(INCREMENTAL_KEY, "true"));

Thread.sleep(1); // to ensure that we crossed LocalSessions.AUTO_DELETE_TIMEOUT

cluster.get(1).runOnInstance(rethrow(() -> {
ActiveRepairService.instance.consistent.local.cleanup();
List<Map<String, String>> sessions = ActiveRepairService.instance.getSessions(true, null);
Assert.assertThat(sessions, hasSize(1));
}));
}
}
}

protected void addNode(Cluster cluster)
{
IInstanceConfig config = cluster.newInstanceConfig();
config.set("auto_bootstrap", true);
IInvokableInstance newInstance = cluster.bootstrap(config);
newInstance.startup(cluster);
}

public static void populate(ICluster cluster, int from, int to)
{
populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
}

public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl)
{
cluster.schemaChange(withKeyspace("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};"));
cluster.schemaChange(withKeyspace("CREATE TABLE IF NOT EXISTS %s.repair_add_node_test (pk int, ck int, v int, PRIMARY KEY (pk, ck))"));
for (int i = from; i < to; i++)
{
cluster.coordinator(coord).execute(withKeyspace("INSERT INTO %s.repair_add_node_test (pk, ck, v) VALUES (?, ?, ?)"),
cl, i, i, i);
}
}

static void repair(ICluster<IInvokableInstance> cluster, String keyspace, Map<String, String> options)
{
cluster.get(1).runOnInstance(rethrow(() -> {
Condition await = newOneTimeCondition();
StorageService.instance.repair(keyspace, options, ImmutableList.of((tag, event) -> {
if (event.getType() == ProgressEventType.COMPLETE)
await.signalAll();
})).right.get();
await.await(1L, MINUTES);
}));
}
}
44 changes: 44 additions & 0 deletions test/unit/org/apache/cassandra/dht/RangeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -736,4 +736,48 @@ public void testGroupSubtract()
assertEquals(ranges, Range.subtract(ranges, asList(r(6, 7), r(20, 25))));
assertEquals(Sets.newHashSet(r(1, 4), r(11, 15)), Range.subtract(ranges, asList(r(4, 7), r(8, 11))));
}

@Test
public void testGroupIntersection()
{
assertEquals(Collections.emptyList(),
Range.intersect(asList(r(1, 5), r(10, 15)),
asList(r(6, 7), r(20, 25))
));

assertEquals(asList(r(5, 6)),
Range.intersect(asList(r(1, 6), r(10, 15)),
asList(r(5, 10))
));

assertEquals(asList(r(5, 6), r(10, 11)),
Range.intersect(asList(r(1, 6), r(10, 15)),
asList(r(5, 11))
));

assertEquals(asList(r(5, 6), r(10, 11)),
Range.intersect(asList(r(1, 6), r(10, 15)),
asList(r(5, 11))
));

assertEquals(asList(r(5, 6), r(10, 11), r(12, 15)),
Range.intersect(asList(r(1, 6), r(10, 15)),
asList(r(5, 11), r(12, 20))
));

assertEquals(asList(r(5, 6), r(10, 15)),
Range.intersect(asList(r(1, 6), r(10, 15)),
asList(r(5, 11), r(11, 20))
));

assertEquals(Collections.emptyList(),
Range.intersect(Collections.emptyList(),
asList(r(5, 11), r(11, 20))
));

assertEquals(Collections.emptyList(),
Range.intersect(asList(r(1, 6), r(10, 15)),
Collections.emptyList()
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.cassandra.repair.consistent;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -42,11 +43,14 @@
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.repair.AbstractRepairTest;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.KeyspaceRepairManager;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
Expand Down Expand Up @@ -208,6 +212,18 @@ protected boolean sessionHasData(LocalSession session)
{
return sessionHasData;
}

@Override
protected TableMetadata getTableMetadata(TableId tableId)
{
return cfm;
}

@Override
protected List<Range<Token>> getLocalRanges(String keyspace)
{
return Arrays.asList(RANGE1, RANGE2, RANGE3);
}
}

private static TableMetadata cfm;
Expand Down