Skip to content

Commit

Permalink
feat: add monitor for local storage (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 24, 2025
1 parent ff91a0f commit bb256c1
Show file tree
Hide file tree
Showing 20 changed files with 451 additions and 109 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package com.netease.nim.camellia.redis.proxy.monitor;

import com.netease.nim.camellia.redis.proxy.util.ExecutorUtils;
import com.netease.nim.camellia.redis.proxy.util.Utils;
import com.netease.nim.camellia.tools.utils.CamelliaMapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

/**
* Created by caojiajun on 2025/1/23
*/
public class LocalStorageMonitor {

private static final Logger logger = LoggerFactory.getLogger(LocalStorageMonitor.class);

private static final Time compactTime = new Time();
private static final Time flushTime = new Time();
private static final Time keyFlushTime = new Time();
private static final Time valueFlushTime = new Time();
private static final Time walFlushTime = new Time();
private static final Time walAppendTime = new Time();
private static final Time valueWaitFlushTime = new Time();
private static final Time keyWaitFlushTime = new Time();

private static final Time fileReadTime = new Time();
private static final Time fileWriteTime = new Time();

private static final ConcurrentHashMap<String, LongAdder> fileReadMap = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, LongAdder> fileWriteMap = new ConcurrentHashMap<>();

private static final LongAdder slotInit= new LongAdder();
private static final LongAdder slotExpand = new LongAdder();

public static void compactTime(long time) {
if (time < 0) return;
time = time / 10000;
compactTime.update(time);
}

public static void flushTime(long time) {
if (time < 0) return;
time = time / 10000;
flushTime.update(time);
}

public static void keyFlushTime(long time) {
if (time < 0) return;
time = time / 10000;
keyFlushTime.update(time);
}

public static void valueFlushTime(long time) {
if (time < 0) return;
time = time / 10000;
valueFlushTime.update(time);
}

public static void walFlushTime(long time) {
if (time < 0) return;
time = time / 10000;
walFlushTime.update(time);
}

public static void walAppendTime(long time) {
if (time < 0) return;
time = time / 10000;
walAppendTime.update(time);
}

public static void valueWaitFlushTime(long time) {
if (time < 0) return;
time = time / 10000;
valueWaitFlushTime.update(time);
}

public static void keyWaitFlushTime(long time) {
if (time < 0) return;
time = time / 10000;
keyWaitFlushTime.update(time);
}

public static void fileReadTime(long time) {
if (time < 0) return;
time = time / 10000;
fileReadTime.update(time);
}

public static void fileWriteTime(long time) {
if (time < 0) return;
time = time / 10000;
fileWriteTime.update(time);
}

public static void fileRead(String file, long size) {
CamelliaMapUtils.computeIfAbsent(fileReadMap, file, k -> new LongAdder()).add(size);
}

public static void fileWrite(String file, long size) {
CamelliaMapUtils.computeIfAbsent(fileWriteMap, file, k -> new LongAdder()).add(size);
}

public static void slotInit() {
slotInit.increment();
}

public static void slotExpand() {
slotExpand.increment();
}

static {
ExecutorUtils.scheduleAtFixedRate(() -> {
{
Stats stats = compactTime.getStats();
logger.info("compact stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = flushTime.getStats();
logger.info("flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = keyFlushTime.getStats();
logger.info("key flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = valueFlushTime.getStats();
logger.info("value flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = walFlushTime.getStats();
logger.info("wal flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = walAppendTime.getStats();
logger.info("wal append stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = keyWaitFlushTime.getStats();
logger.info("key wait flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = valueWaitFlushTime.getStats();
logger.info("value wait flush stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = fileReadTime.getStats();
logger.info("file read stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
Stats stats = fileWriteTime.getStats();
logger.info("file write stats, count = {}, avg = {}", stats.count, stats.avg);
}
{
for (Map.Entry<String, LongAdder> entry : fileWriteMap.entrySet()) {
long size = entry.getValue().sumThenReset();
if (size == 0) {
continue;
}
logger.info("file write, file = {}, size = {}", entry.getKey(), Utils.humanReadableByteCountBin(size));
}
}
{
for (Map.Entry<String, LongAdder> entry : fileReadMap.entrySet()) {
long size = entry.getValue().sumThenReset();
if (size == 0) {
continue;
}
logger.info("file read, file = {}, size = {}", entry.getKey(), Utils.humanReadableByteCountBin(size));
}
}
{
logger.info("slot init, count = {}", slotInit.sumThenReset());
logger.info("slot expand, count = {}", slotExpand.sumThenReset());
}
}, 10, 10, TimeUnit.SECONDS);
}

private static class Time {
LongAdder time = new LongAdder();
LongAdder count = new LongAdder();

void update(long time) {
this.time.add(time);
this.count.increment();
}

Stats getStats() {
long time = this.time.sumThenReset();
long count = this.count.sumThenReset();
if (count == 0) {
return new Stats();
}
Stats stats = new Stats();
stats.avg = ((double) time / count) / 100;
stats.count = count;
return stats;
}
}

private static class Stats {
long count;
double avg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public static long parse(String configKey, String defaultValue) {
long bytes = bytes(string);
if (bytes < 0) {
logger.warn("illegal config = {} for config-key = {}, use default config = {}", string, configKey, defaultValue);
bytes = bytes(defaultValue);
}
bytes = bytes(defaultValue);
return bytes;
} catch (Exception e) {
logger.warn("error config = {} parse for config-key = {}, use default config = {}", string, configKey, defaultValue);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.util.Utils;
import com.netease.nim.camellia.tools.executor.CamelliaThreadFactory;
import org.slf4j.Logger;
Expand All @@ -26,6 +27,7 @@ public class LRUCache<K, V> {

private String config;

private boolean enable;
private final ConcurrentLinkedHashMap<K, V> cache;
private long capacity;

Expand All @@ -46,23 +48,37 @@ public LRUCache(LRUCacheName name, int estimateSizePerKV,
.maximumWeightedCapacity(maxKeyCount)
.build();
scheduler.scheduleAtFixedRate(this::schedule, 10, 10, TimeUnit.SECONDS);
logger.info("lru-cache init, name = {}, capacity = {}, key.max.count = {}", name, CacheCapacityConfigParser.toString(capacity), maxKeyCount);
enable = ProxyDynamicConf.getBoolean(name.getConfigKey() + ".enable", true);
logger.info("lru-cache init, name = {}, capacity = {}, key.max.count = {}, enable = {}", name, CacheCapacityConfigParser.toString(capacity), maxKeyCount, enable);
}

public void put(K key, V value) {
if (!enable) {
return;
}
cache.put(key, value);
}

public void delete(K key) {
if (!enable) {
return;
}
cache.remove(key);
}

public V get(K key) {
if (!enable) {
return null;
}
return cache.get(key);
}

private void schedule() {
try {
enable = ProxyDynamicConf.getBoolean(name.getConfigKey() + ".enable", true);
if (!enable) {
cache.clear();
}
this.capacity = name.getCapacity(capacity);
this.config = CacheCapacityConfigParser.toString(capacity);
long estimateSize = 0;
Expand All @@ -85,8 +101,8 @@ private void schedule() {
}
loop ++;
if (loop == 6) {//print log every 60s
logger.info("lru-cache, name = {}, target.capacity = {}, current.estimate.size = {}, current.key.count = {}, current.key.max.count = {}",
name, config, Utils.humanReadableByteCountBin(estimateSize), keyCount, maxKeyCount);
logger.info("lru-cache, name = {}, enable = {}, target.capacity = {}, current.estimate.size = {}, current.key.count = {}, current.key.max.count = {}",
name, enable, config, Utils.humanReadableByteCountBin(estimateSize), keyCount, maxKeyCount);
loop = 0;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
*/
public enum LRUCacheName {

key_cache("local.storage.key.cache.capacity"),
key_read_block_cache("local.storage.key.block.read.cache.capacity"),
key_write_block_cache("local.storage.key.block.write.cache.capacity"),
string_read_cache("local.storage.string.read.cache.capacity"),
string_write_cache("local.storage.string.write.cache.capacity"),
string_read_block_cache("local.storage.string.block.read.cache.capacity"),
string_write_block_cache("local.storage.string.block.write.cache.capacity"),
key_cache("local.storage.key.cache"),
key_read_block_cache("local.storage.key.block.read.cache"),
key_write_block_cache("local.storage.key.block.write.cache"),
string_read_cache("local.storage.string.read.cache"),
string_write_cache("local.storage.string.write.cache"),
string_read_block_cache("local.storage.string.block.read.cache"),
string_write_block_cache("local.storage.string.block.write.cache"),
;

private final String configKey;
Expand All @@ -22,15 +22,19 @@ public enum LRUCacheName {
this.configKey = configKey;
}

public String getConfigKey() {
return configKey;
}

public long getCapacity(long defaultSize) {
if (defaultSize < 0) {
long heapMemoryMax = MemoryInfoCollector.getMemoryInfo().getHeapMemoryMax();
long size = heapMemoryMax * 3 / 5 / LRUCacheName.values().length;
long size = heapMemoryMax * 2 / 5 / LRUCacheName.values().length;
if (size <= 0) {
size = 32 * 1024 * 1024L;
}
defaultSize = size;
}
return CacheCapacityConfigParser.parse(configKey, CacheCapacityConfigParser.toString(defaultSize));
return CacheCapacityConfigParser.parse(configKey + ".capacity", CacheCapacityConfigParser.toString(defaultSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,21 @@ public static byte[] encodeBucket(Map<Key, KeyInfo> keys) {
short compressLen;
if (compressType == CompressType.none) {
compressed = array;
compressLen = (short) compressed.length;
} else {
ICompressor compressor = CompressUtils.get(compressType);
compressed = compressor.compress(array, 0, array.length);
compressLen = (short) compressed.length;
if (compressed.length + 9 > _4k) {
return null;
if (compressed.length >= array.length) {
compressed = array;
compressType = CompressType.none;
}
}
if (compressed.length > Short.MAX_VALUE) {
return null;
}
compressLen = (short) compressed.length;
if (compressed.length + 9 > _4k) {
return null;
}
ByteBuffer buffer = ByteBuffer.allocate(_4k);
buffer.putInt(0);//0,1,2,3
buffer.putShort(decompressLen);//4,5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netease.nim.camellia.redis.proxy.command.Command;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.monitor.LocalStorageMonitor;
import com.netease.nim.camellia.redis.proxy.reply.ErrorReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.kv.command.Commanders;
Expand Down Expand Up @@ -98,6 +99,7 @@ private void afterWrite(short slot) throws IOException {
compactExecutor.compact(slot);
//check need flush
if (keyReadWrite.needFlush(slot) || stringReadWrite.needFlush(slot)) {
long startTime = System.nanoTime();
//获取slot当前wal写到哪里了
SlotWalOffset slotWalOffset = wal.getSlotWalOffsetEnd(slot);
//key flush prepare
Expand All @@ -108,7 +110,13 @@ private void afterWrite(short slot) throws IOException {
//flush key
CompletableFuture<FlushResult> future2 = keyReadWrite.flush(slot);
//flush wal,表示这个offset之前的关于指定slot的日志条目都无效了
future2.thenAccept(flushResult2 -> wal.flush(slot, slotWalOffset));
future2.thenAccept(flushResult2 -> {
try {
wal.flush(slot, slotWalOffset);
} finally {
LocalStorageMonitor.flushTime(System.nanoTime() - startTime);
}
});
});
}
}
Expand Down
Loading

0 comments on commit bb256c1

Please sign in to comment.