Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Copyright 2026 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.utils;

import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.StorageFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;

/**
* Unified checkpoint file read/write utility class.
* <p>
* Checkpoint file format:
* <pre>
* [4B] rgCount -- total number of entries
* Repeated rgCount times:
* [8B] fileId -- file ID
* [4B] rgId -- Row Group ID
* [4B] recordNum -- number of records
* [4B] bitmapLen -- length of bitmap array
* [8B x bitmapLen] bitmap -- visibility bitmap data
* </pre>
* <p>
* This class is shared by the server-side (RetinaResourceManager) and the client-side
* (VisibilityCheckpointCache) to eliminate duplicated read/write logic and ensure
* consistent file format across both sides.
*/
public class CheckpointFileIO
{
private static final Logger logger = LogManager.getLogger(CheckpointFileIO.class);

/**
* A checkpoint entry containing the visibility information of a single Row Group.
*/
public static class CheckpointEntry
{
public final long fileId;
public final int rgId;
public final int recordNum;
public final long[] bitmap;

public CheckpointEntry(long fileId, int rgId, int recordNum, long[] bitmap)
{
this.fileId = fileId;
this.rgId = rgId;
this.recordNum = recordNum;
this.bitmap = bitmap;
}
}

/**
* Functional interface for processing each CheckpointEntry during parallel reading.
*/
@FunctionalInterface
public interface EntryConsumer
{
void accept(CheckpointEntry entry);
}

private CheckpointFileIO()
{
}

/**
* Write checkpoint entries to a file.
* <p>
* Uses a producer-consumer pattern: the caller puts CheckpointEntry objects into the queue,
* and this method consumes totalRgs entries from the queue and writes them sequentially.
*
* @param path the file path
* @param totalRgs total number of entries to write
* @param queue blocking queue containing CheckpointEntry objects
* @throws Exception if writing fails
*/
public static void writeCheckpoint(String path, int totalRgs, BlockingQueue<CheckpointEntry> queue) throws Exception
{
Storage storage = StorageFactory.Instance().getStorage(path);
try (DataOutputStream out = storage.create(path, true, 8 * 1024 * 1024))
{
out.writeInt(totalRgs);
for (int i = 0; i < totalRgs; i++)
{
CheckpointEntry entry = queue.take();
out.writeLong(entry.fileId);
out.writeInt(entry.rgId);
out.writeInt(entry.recordNum);
out.writeInt(entry.bitmap.length);
for (long l : entry.bitmap)
{
out.writeLong(l);
}
}
out.flush();
}
}

/**
* Read and parse a checkpoint file in parallel using the specified executor.
*
* @param path the file path
* @param consumer callback for each parsed CheckpointEntry
* @param executor the executor service for parallel parsing
* @return the rgCount (total number of entries) in the file
* @throws IOException if reading fails
*/
public static int readCheckpointParallel(String path, EntryConsumer consumer, ExecutorService executor) throws IOException
{
Storage storage = StorageFactory.Instance().getStorage(path);
long fileLength = storage.getStatus(path).getLength();

// Step 1: Read the entire file into byte[] at once
byte[] content;
try (PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(storage, path))
{
ByteBuffer buffer = reader.readFully((int) fileLength);
if (buffer.hasArray())
{
content = buffer.array();
} else
{
content = new byte[(int) fileLength];
buffer.get(content);
}
}

ByteBuffer buf = ByteBuffer.wrap(content);
int rgCount = buf.getInt();

if (rgCount > 0)
{
// Step 2: Sequential scan to build offset index (lightweight, pointer jumps only)
int[] offsets = new int[rgCount];
for (int i = 0; i < rgCount; i++)
{
offsets[i] = buf.position();
buf.position(buf.position() + 8 + 4 + 4); // skip fileId(8) + rgId(4) + recordNum(4)
int bitmapLen = buf.getInt();
buf.position(buf.position() + bitmapLen * 8); // skip bitmap data
}

// Step 3: Parallel parsing
int parallelism = Math.min(Runtime.getRuntime().availableProcessors(),
Math.max(1, rgCount / 64));
int batchSize = (rgCount + parallelism - 1) / parallelism;
List<CompletableFuture<Void>> futures = new ArrayList<>();

for (int batchStart = 0; batchStart < rgCount; batchStart += batchSize)
{
int start = batchStart;
int end = Math.min(batchStart + batchSize, rgCount);
futures.add(CompletableFuture.runAsync(() -> {
for (int i = start; i < end; i++)
{
// Each thread uses its own ByteBuffer instance (sharing the same byte[], thread-safe)
ByteBuffer slice = ByteBuffer.wrap(content);
slice.position(offsets[i]);
long fileId = slice.getLong();
int rgId = slice.getInt();
int recordNum = slice.getInt();
int len = slice.getInt();
long[] bitmap = new long[len];
for (int j = 0; j < len; j++)
{
bitmap[j] = slice.getLong();
}
consumer.accept(new CheckpointEntry(fileId, rgId, recordNum, bitmap));
}
}, executor));
}

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}

return rgCount;
}

/**
* Read and parse a checkpoint file in parallel using the default ForkJoinPool.commonPool().
*
* @param path the file path
* @param consumer callback for each parsed CheckpointEntry
* @return the rgCount (total number of entries) in the file
* @throws IOException if reading fails
*/
public static int readCheckpointParallel(String path, EntryConsumer consumer) throws IOException
{
return readCheckpointParallel(path, consumer, ForkJoinPool.commonPool());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,12 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.utils.CheckpointFileIO;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -88,47 +82,18 @@ public long[] getVisibilityBitmap(long timestamp, String checkpointPath, long ta

private Map<String, long[]> loadCheckpointFile(String path) throws IOException
{
long start = System.currentTimeMillis();
Storage storage = StorageFactory.Instance().getStorage(path);
long fileLength = storage.getStatus(path).getLength();
long startTime = System.currentTimeMillis();

byte[] content;
try (PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(storage, path))
{
ByteBuffer buffer = reader.readFully((int) fileLength);
if (buffer.hasArray())
{
content = buffer.array();
}
else
{
content = new byte[(int) fileLength];
buffer.get(content);
}
}
// Use ConcurrentHashMap to support concurrent writes from parallel parsing
Map<String, long[]> timestampCache = new ConcurrentHashMap<>();

Map<String, long[]> timestampCache;
try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(content)))
{
int rgCount = in.readInt();
// Initial capacity: count / 0.75 + 1 to avoid rehash
timestampCache = new ConcurrentHashMap<>((int) (rgCount / 0.75) + 1);

for (int i = 0; i < rgCount; i++)
{
long fileId = in.readLong();
int rgId = in.readInt();
int len = in.readInt();
long[] bitmap = new long[len];
for (int j = 0; j < len; j++)
{
bitmap[j] = in.readLong();
}
timestampCache.put(fileId + "_" + rgId, bitmap);
}
}
long end = System.currentTimeMillis();
logger.info("Loaded visibility checkpoint from {} in {} ms, RG count: {}", path, (end - start), timestampCache.size());
// Use CheckpointFileIO for unified read + parallel parsing logic
int rgCount = CheckpointFileIO.readCheckpointParallel(path, entry -> {
timestampCache.put(entry.fileId + "_" + entry.rgId, entry.bitmap);
});

long endTime = System.currentTimeMillis();
logger.info("Loaded visibility checkpoint from {} in {} ms, RG count: {}", path, (endTime - startTime), timestampCache.size());
return timestampCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@

import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.utils.CheckpointFileIO;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import org.junit.Before;
import org.junit.Test;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;

import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -65,30 +62,28 @@ private String resolve(String dir, String filename) {
/**
* Helper to create a dummy checkpoint file.
*/
private void createDummyCheckpoint(String path, int numFiles, int rgsPerFile, long[] bitmap) throws IOException
private void createDummyCheckpoint(String path, int numFiles, int rgsPerFile, long[] bitmap) throws Exception
{
try (DataOutputStream out = storage.create(path, true, 8 * 1024 * 1024))
// Default recordNum = bitmap.length * 64 (each long represents 64 rows)
createDummyCheckpoint(path, numFiles, rgsPerFile, bitmap, bitmap.length * 64);
}

private void createDummyCheckpoint(String path, int numFiles, int rgsPerFile, long[] bitmap, int recordNum) throws Exception
{
int totalRgs = numFiles * rgsPerFile;
BlockingQueue<CheckpointFileIO.CheckpointEntry> queue = new LinkedBlockingQueue<>(totalRgs);
for (int f = 0; f < numFiles; f++)
{
out.writeInt(numFiles * rgsPerFile);
for (int f = 0; f < numFiles; f++)
for (int r = 0; r < rgsPerFile; r++)
{
for (int r = 0; r < rgsPerFile; r++)
{
out.writeLong((long)f);
out.writeInt(r);
out.writeInt(bitmap.length);
for (long l : bitmap)
{
out.writeLong(l);
}
}
queue.put(new CheckpointFileIO.CheckpointEntry((long) f, r, recordNum, bitmap));
}
out.flush();
}
CheckpointFileIO.writeCheckpoint(path, totalRgs, queue);
}

@Test
public void testCacheLoading() throws IOException
public void testCacheLoading() throws Exception
{
long timestamp = 1000L;
String checkpointPath = resolve(testCheckpointDir, "vis_gc_tencent_100.bin");
Expand All @@ -108,7 +103,7 @@ public void testCacheLoading() throws IOException
* Migrated and adapted performance test.
*/
@Test
public void testCheckpointPerformance() throws InterruptedException, IOException
public void testCheckpointPerformance() throws Exception
{
// 1. Configuration parameters
int numFiles = 5000;
Expand Down
Loading