Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
5.1
* Add configuration to disk usage guardrails to stop writes across all replicas of a keyspace when any node replicating that keyspace exceeds the disk usage failure threshold. (CASSANDRA-21024)
* Add configuration for sorted imports in source files (CASSANDRA-17925)
* Change the eager reference counting of compression dictionaries to lazy (CASSANDRA-21074)
* Add cursor based optimized compaction path (CASSANDRA-20918)
Expand Down
4 changes: 4 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2517,6 +2517,10 @@ drop_compact_storage_enabled: false
# Min unit: B
# data_disk_usage_max_disk_size:
#
# Configures the disk usage guardrails to block all writes to a keyspace if any node which replicates that keyspace
# is full. By default, this is disabled.
# data_disk_usage_stop_writes_for_keyspace_on_fail: false
#
# Guardrail to warn or fail when the minimum replication factor is lesser than threshold.
# This would also apply to system keyspaces.
# Suggested value for use in production: 2 or higher
Expand Down
4 changes: 4 additions & 0 deletions conf/cassandra_latest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2295,6 +2295,10 @@ drop_compact_storage_enabled: false
# Min unit: B
# data_disk_usage_max_disk_size:
#
# Configures the disk usage guardrails to block all writes to a keyspace if any node which replicates that keyspace
# is full. By default, this is disabled.
# data_disk_usage_stop_writes_for_keyspace_on_fail: false
#
# Guardrail to warn or fail when the minimum replication factor is lesser than threshold.
# This would also apply to system keyspaces.
# Suggested value for use in production: 2 or higher
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,7 @@ public static void setClientMode(boolean clientMode)
public volatile int data_disk_usage_percentage_warn_threshold = -1;
public volatile int data_disk_usage_percentage_fail_threshold = -1;
public volatile DataStorageSpec.LongBytesBound data_disk_usage_max_disk_size = null;
public volatile boolean data_disk_usage_stop_writes_for_keyspace_on_fail = false;
public volatile int minimum_replication_factor_warn_threshold = -1;
public volatile int minimum_replication_factor_fail_threshold = -1;
public volatile int maximum_replication_factor_warn_threshold = -1;
Expand Down
14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/config/GuardrailsOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,20 @@ public void setDataDiskUsagePercentageThreshold(int warn, int fail)
x -> config.data_disk_usage_percentage_fail_threshold = x);
}


public boolean getDataDiskUsageStopWritesForKeyspaceOnFail()
{
return config.data_disk_usage_stop_writes_for_keyspace_on_fail;
}

public void setDataDiskUsageStopWritesForKeyspaceOnFail(boolean enabled)
{
updatePropertyWithLogging("data_disk_usage_stop_writes_for_keyspace_on_fail",
enabled,
() -> config.data_disk_usage_stop_writes_for_keyspace_on_fail,
x -> config.data_disk_usage_stop_writes_for_keyspace_on_fail = x);
}

@Override
public DataStorageSpec.LongBytesBound getDataDiskUsageMaxDiskSize()
{
Expand Down
17 changes: 17 additions & 0 deletions src/java/org/apache/cassandra/db/guardrails/Guardrails.java
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,23 @@ public void setDataDiskUsageMaxDiskSize(@Nullable String size)
DEFAULT_CONFIG.setDataDiskUsageMaxDiskSize(sizeFromString(size));
}

@Override
public boolean getDataDiskUsageStopWritesForKeyspaceOnFail()
{
return DEFAULT_CONFIG.getDataDiskUsageStopWritesForKeyspaceOnFail();
}

public static boolean getDiskUsageStopWritesForKeyspaceOnFail()
{
return DEFAULT_CONFIG.getDataDiskUsageStopWritesForKeyspaceOnFail();
}

@Override
public void setDataDiskUsageStopWritesForKeyspaceOnFail(boolean enabled)
{
DEFAULT_CONFIG.setDataDiskUsageStopWritesForKeyspaceOnFail(enabled);
}

@Override
public int getMinimumReplicationFactorWarnThreshold()
{
Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,17 @@ public interface GuardrailsMBean
@Nullable
String getDataDiskUsageMaxDiskSize();

/**
* @return Return whether a single node replicating a given keyspace being full should block writes for the
* entire keyspace. Returns true if this behavior is set, false otherwise.
*/
boolean getDataDiskUsageStopWritesForKeyspaceOnFail();

/**
* @param enabled Enables or disables blocking writes for a keyspace if a node replicating that keyspace is full.
*/
void setDataDiskUsageStopWritesForKeyspaceOnFail(boolean enabled);

/**
* @param size The max disk size of the data directories when calculating disk usage thresholds, as a string
* formatted as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.cassandra.service.disk.usage;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,13 +28,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Locator;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.utils.NoSpamLogger;

/**
Expand All @@ -50,6 +55,7 @@ public class DiskUsageBroadcaster implements IEndpointStateChangeSubscriber
private final DiskUsageMonitor monitor;
private final ConcurrentMap<InetAddressAndPort, DiskUsageState> usageInfo = new ConcurrentHashMap<>();
private volatile boolean hasStuffedOrFullNode = false;
private final ConcurrentMap<String, Set<InetAddressAndPort>> fullNodesByDatacenter = new ConcurrentHashMap<>();

@VisibleForTesting
public DiskUsageBroadcaster(DiskUsageMonitor monitor)
Expand All @@ -67,11 +73,39 @@ public boolean hasStuffedOrFullNode()
return hasStuffedOrFullNode;
}

public boolean hasFullNodeInDataCenter(String datacenter)
{
Set<InetAddressAndPort> fullNodes = fullNodesByDatacenter.get(datacenter);
return fullNodes != null && !fullNodes.isEmpty();
}

/**
* @return {@code true} if given node's disk usage is FULL
* @return {@code true} if given node's disk usage is FULL or if the node is in a data center that contains
* a full node and the data_disk_usage_stop_writes_for_keyspace_on_fail config is enabled.
*/
public boolean isFull(InetAddressAndPort endpoint)
{
if (Guardrails.getDiskUsageStopWritesForKeyspaceOnFail())
{
if (!hasStuffedOrFullNode())
{
// If no node in the cluster is full or stuffed then we exit early.
return false;
}
Locator locator = DatabaseDescriptor.getLocator();
if (locator == null)
{
logger.debug("Unable to block writes for keyspace because we cannot determine the datacenter of {}.", endpoint);
return state(endpoint).isFull();
}
Location location = locator.location(endpoint);
if (location == null || location.equals(Location.UNKNOWN))
{
logger.debug("Unable to block writes for keyspace because location of {} is null or UNKNOWN", endpoint);
return state(endpoint).isFull();
}
return hasFullNodeInDataCenter(location.datacenter);
}
return state(endpoint).isFull();
}

Expand Down Expand Up @@ -114,8 +148,9 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio
noSpamLogger.warn(String.format("Found unknown DiskUsageState: %s. Using default state %s instead.",
value.value, usageState));
}
usageInfo.put(endpoint, usageState);

computeUsageStateForEpDatacenter(endpoint, usageState);
usageInfo.put(endpoint, usageState);
hasStuffedOrFullNode = usageState.isStuffedOrFull() || computeHasStuffedOrFullNode();
}

Expand All @@ -131,6 +166,51 @@ private boolean computeHasStuffedOrFullNode()
return false;
}

/**
* Update the set of full nodes by datacenter based on the disk usage state for the given endpoint.
* If the node is FULL, add it to the set for its datacenter. Otherwise, remove it from the set.
* This method is idempotent - adding an already-present node or removing an absent node has no effect.
*
* @param endpoint The endpoint whose state has changed.
* @param usageState The new disk usage state value.
*/
private void computeUsageStateForEpDatacenter(InetAddressAndPort endpoint, DiskUsageState usageState)
{
Locator locator = DatabaseDescriptor.getLocator();
if (locator == null)
{
logger.debug("Unable to track disk usage by datacenter for endpoint {} because locator is null", endpoint);
return;
}

Location location = locator.location(endpoint);
if (location == null || location.equals(Location.UNKNOWN))
{
logger.debug("Unable to track disk usage by datacenter for endpoint {} because null or UNKNOWN location was returned",
endpoint);
return;
}

String datacenter = location.datacenter;

if (usageState.isFull())
{
// Add this node to the set of full nodes for its datacenter
fullNodesByDatacenter.computeIfAbsent(datacenter, dc -> ConcurrentHashMap.newKeySet())
.add(endpoint);
logger.debug("Endpoint {} is full, added to full nodes set for datacenter {}", endpoint, datacenter);
}
else
{
// Remove this node from the set of full nodes for its datacenter (if it was there)
Set<InetAddressAndPort> fullNodes = fullNodesByDatacenter.get(datacenter);
if (fullNodes != null && fullNodes.remove(endpoint))
{
logger.debug("Endpoint {} is no longer full, removed from full nodes set for datacenter {}", endpoint, datacenter);
}
}
}

@Override
public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
{
Expand Down Expand Up @@ -164,10 +244,41 @@ public void onRestart(InetAddressAndPort endpoint, EndpointState state)
@Override
public void onRemove(InetAddressAndPort endpoint)
{
DiskUsageState usageState = usageInfo.getOrDefault(endpoint, DiskUsageState.NOT_AVAILABLE);
updateDiskUsageStateForDatacenterOnRemoval(endpoint, usageState);
usageInfo.remove(endpoint);
hasStuffedOrFullNode = usageInfo.values().stream().anyMatch(DiskUsageState::isStuffedOrFull);
}

private void updateDiskUsageStateForDatacenterOnRemoval(InetAddressAndPort endpoint, DiskUsageState usageState)
{
if (!Guardrails.getDiskUsageStopWritesForKeyspaceOnFail())
{
return;
}

Locator locator = DatabaseDescriptor.getLocator();
if (locator == null)
{
logger.debug("Unable to track disk usage by datacenter for removed endpoint {} because locator is null", endpoint);
return;
}

Location nodeLocation = locator.location(endpoint);
if (nodeLocation == null || nodeLocation.equals(Location.UNKNOWN))
{
logger.debug("Unable to determine location for removed endpoint {}. Will not update datacenter tracking.", endpoint);
return;
}

// Remove the endpoint from the full nodes set for its datacenter
Set<InetAddressAndPort> fullNodes = fullNodesByDatacenter.get(nodeLocation.datacenter);
if (fullNodes != null && fullNodes.remove(endpoint))
{
logger.debug("Removed endpoint {} from full nodes set for datacenter {} on node removal", endpoint, nodeLocation.datacenter);
}
}

private void updateDiskUsage(InetAddressAndPort endpoint, EndpointState state)
{
VersionedValue localValue = state.getApplicationState(ApplicationState.DISK_USAGE);
Expand Down
Loading