Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,16 @@ public void execute(Namespace ns, String command) throws Exception
{
for (Column column : columns)
{
String sql = "SELECT COUNT(DISTINCT(" + column.getName() + ")) AS cardinality, " +
"SUM(CASE WHEN " + column.getName() + " IS NULL THEN 1 ELSE 0 END) AS null_count " +
"FROM " + schemaName + "." + tableName;
// ANSI SQL uses double quotes for identifiers.
String sql = String.format(
"SELECT COUNT(DISTINCT(\"%s\")) AS cardinality, " +
"SUM(CASE WHEN \"%s\" IS NULL THEN 1 ELSE 0 END) AS null_count " +
"FROM \"%s\".\"%s\"",
column.getName(),
column.getName(),
schemaName,
tableName
);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
if (resultSet.next())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.common.utils.DateUtil;
import io.pixelsdb.pixels.common.utils.IndexUtils;
import io.pixelsdb.pixels.common.utils.RetinaUtils;
import io.pixelsdb.pixels.core.PixelsWriter;
import io.pixelsdb.pixels.core.TypeDescription;
Expand Down Expand Up @@ -101,14 +102,14 @@ protected void processSourceFile(String originalFilePath) throws IOException, Me

// 1. Calculate Primary Key and Bucket ID
ByteString pkByteString = calculatePrimaryKeyBytes(colsInLine);
// Assume BucketCache has the necessary method and configuration
int bucketId = RetinaUtils.getBucketIdFromByteBuffer(pkByteString);
VnodeIdentifier vnodeIdentifier = RetinaUtils.getInstance().getVnodeIdentifierFromBucketId(bucketId);
int retinaBucketId = RetinaUtils.getBucketIdFromByteBuffer(pkByteString);
VnodeIdentifier vnodeIdentifier = RetinaUtils.getInstance().getVnodeIdentifierFromBucketId(retinaBucketId);
int indexBucketId = IndexUtils.getBucketIdFromByteBuffer(pkByteString);
PerVirtualNodeWriter retinaNodeWriter = retinaWriters.computeIfAbsent(vnodeIdentifier, id ->
{
try
{
return initializeRetinaWriter(bucketId);
return initializeRetinaWriter(retinaBucketId);
} catch (Exception e)
{
throw new RuntimeException("Failed to initialize writer for bucket " + id, e);
Expand All @@ -122,7 +123,7 @@ protected void processSourceFile(String originalFilePath) throws IOException, Me
try
{
// 4. Update Index Entry
updateIndexEntry(retinaNodeWriter, pkByteString);
updateIndexEntry(retinaNodeWriter, pkByteString, indexBucketId);

// 5. Check and Flush Row Batch
if (retinaNodeWriter.rowBatch.size >= retinaNodeWriter.rowBatch.getMaxSize())
Expand Down Expand Up @@ -222,7 +223,7 @@ private ByteString calculatePrimaryKeyBytes(String[] colsInLine)
return ByteString.copyFrom((ByteBuffer) indexKeyBuffer.rewind());
}

private void updateIndexEntry(PerVirtualNodeWriter bucketWriter, ByteString pkByteString) throws IndexException
private void updateIndexEntry(PerVirtualNodeWriter bucketWriter, ByteString pkByteString, int indexBucketId) throws IndexException
{
IndexProto.PrimaryIndexEntry.Builder builder = IndexProto.PrimaryIndexEntry.newBuilder();
builder.getIndexKeyBuilder()
Expand All @@ -237,7 +238,7 @@ private void updateIndexEntry(PerVirtualNodeWriter bucketWriter, ByteString pkBy
.setFileId(bucketWriter.currFile.getId())
.setRgRowOffset(bucketWriter.rgRowOffset++);

bucketWriter.indexEntries.add(builder.build());
bucketWriter.indexBucketQueues[indexBucketId].add(builder.build());
}

private void flushRowBatch(PerVirtualNodeWriter bucketWriter) throws IOException, IndexException
Expand All @@ -252,10 +253,18 @@ private void flushRowBatch(PerVirtualNodeWriter bucketWriter) throws IOException
bucketWriter.prevRgId = bucketWriter.rgId;
}

// Push index entries to the corresponding IndexService (determined by targetNode address)
bucketWriter.indexService.putPrimaryIndexEntries(index.getTableId(), index.getId(), bucketWriter.indexEntries, bucketWriter.option);
bucketWriter.indexService.flushIndexEntriesOfFile(index.getTableId(), index.getId(),bucketWriter.currFile.getId(), true, bucketWriter.option);
bucketWriter.indexEntries.clear();
for (int i = 0; i < bucketWriter.totalBuckets; i++)
{
List<IndexProto.PrimaryIndexEntry> queue = bucketWriter.indexBucketQueues[i];
if (!queue.isEmpty())
{
IndexOption option = bucketWriter.indexOptions[i];
bucketWriter.indexService.putPrimaryIndexEntries(index.getTableId(), index.getId(), queue, option);
queue.clear();
}
}
bucketWriter.indexService.flushIndexEntriesOfFile(index.getTableId(), index.getId(),
bucketWriter.currFile.getId(), true, bucketWriter.defaultIndexOption);
}

private void closePixelsFile(PerVirtualNodeWriter bucketWriter) throws IOException, IndexException
Expand All @@ -279,13 +288,16 @@ private class PerVirtualNodeWriter
int prevRgId;
int rowCounter;
int vNodeId;
IndexOption option;
NodeProto.NodeInfo targetNode;
List<IndexProto.PrimaryIndexEntry> indexEntries = new ArrayList<>();
VectorizedRowBatch rowBatch;
IndexService indexService;
RowIdAllocator rowIdAllocator;

private final IndexOption defaultIndexOption;
private final int totalBuckets;
private final List<IndexProto.PrimaryIndexEntry>[] indexBucketQueues;
private final IndexOption[] indexOptions;

public PerVirtualNodeWriter(PixelsWriter writer, File file, Path path, NodeProto.NodeInfo node, int vNodeId)
{
this.pixelsWriter = writer;
Expand All @@ -301,15 +313,17 @@ public PerVirtualNodeWriter(PixelsWriter writer, File file, Path path, NodeProto
this.indexService = indexServices.computeIfAbsent(node.getAddress(), nodeInfo ->
RPCIndexService.CreateInstance(nodeInfo, indexServerPort));
this.rowIdAllocator = new RowIdAllocator(index.getTableId(), maxRowNum, this.indexService);
initIndexOption();
}

private void initIndexOption()
{
this.option = IndexOption.builder()
.vNodeId(this.vNodeId)
.build();
this.totalBuckets = IndexUtils.getInstance().getBucketNum();
this.indexBucketQueues = new List[totalBuckets];
this.indexOptions = new IndexOption[totalBuckets];
this.defaultIndexOption = new IndexOption();
for (int i = 0; i < totalBuckets; i++)
{
this.indexBucketQueues[i] = new ArrayList<>();
this.indexOptions[i] = IndexOption.builder()
.vNodeId(i)
.build();
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package io.pixelsdb.pixels.common.utils;

import com.google.common.hash.Hashing;
import com.google.protobuf.ByteString;
import io.pixelsdb.pixels.common.exception.MetadataException;
import io.pixelsdb.pixels.common.metadata.MetadataService;
import io.pixelsdb.pixels.common.metadata.domain.Column;
Expand All @@ -34,6 +36,29 @@
public class IndexUtils
{
private static final MetadataService metadataService = MetadataService.Instance();
private static volatile IndexUtils instance;
private final int bucketNum;

private IndexUtils()
{
ConfigFactory config = ConfigFactory.Instance();
this.bucketNum = Integer.parseInt(config.getProperty("index.bucket.num"));
}

public static IndexUtils getInstance()
{
if (instance == null)
{
synchronized (IndexUtils.class)
{
if (instance == null)
{
instance = new IndexUtils();
}
}
}
return instance;
}

public static List<Column> extractInfoFromIndex(long tableId, long indexId) throws MetadataException
{
Expand Down Expand Up @@ -65,4 +90,22 @@ public static List<Column> extractInfoFromIndex(String dbName, String tableName,
}
return orderedKeyCols;
}

public static int getBucketIdFromByteBuffer(ByteString byteString)
{
IndexUtils indexUtils = IndexUtils.getInstance();

int hash = Hashing.sha256()
.hashBytes(byteString.toByteArray())
.asInt();

int absHash = Math.abs(hash);

return absHash % indexUtils.getBucketNum();
}

public int getBucketNum()
{
return bucketNum;
}
}
1 change: 1 addition & 0 deletions pixels-common/src/main/resources/pixels.properties
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ index.cache.capacity=10000000
index.cache.expiration.seconds=3600
# whether each index corresponds to its own column family
index.rocksdb.multicf=false
index.bucket.num=128
# the directory where the sqlite files of main index are stored, each main index is stored as a sqlite file
index.sqlite.path=/tmp/sqlite
index.main.cache.bucket.num=7
Expand Down
Loading