Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions src/main/perf/IndexThreads.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
Expand All @@ -48,14 +49,30 @@ public enum Mode { UPDATE, ADD, NDV_UPDATE, BDV_UPDATE }
final Thread[] threads;
final AtomicBoolean refreshing;
final AtomicLong lastRefreshNS;
final boolean enableUpdateStorms;

/**
* Constructor with default enableUpdateStorms=false for backward compatibility
* @param docsPerSecPerThreadRef AtomicReference for thread-safe rate updates across multiple threads.
*/
public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, LineFileDocs lineFileDocs, int numThreads, int docCountLimit,
boolean addGroupingFields, boolean printDPS, Mode mode, float docsPerSecPerThread, UpdatesListener updatesListener,
boolean addGroupingFields, boolean printDPS, Mode mode, AtomicReference<Double> docsPerSecPerThreadRef, UpdatesListener updatesListener,
double nrtEverySec, int randomDocIDMax)
throws IOException, InterruptedException {
this(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, docsPerSecPerThreadRef, updatesListener, nrtEverySec, -1, false);
}

/**
* @param docsPerSecPerThreadRef AtomicReference for thread-safe rate updates across multiple threads.
*/
public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, LineFileDocs lineFileDocs, int numThreads, int docCountLimit,
boolean addGroupingFields, boolean printDPS, Mode mode, AtomicReference<Double> docsPerSecPerThreadRef, UpdatesListener updatesListener,
double nrtEverySec, int randomDocIDMax, boolean enableUpdateStorms)
throws IOException, InterruptedException {
final AtomicInteger groupBlockIndex;

this.docs = lineFileDocs;
this.enableUpdateStorms = enableUpdateStorms;
if (addGroupingFields) {
IndexThread.group100 = randomStrings(100, random);
IndexThread.group10K = randomStrings(10000, random);
Expand All @@ -77,8 +94,8 @@ public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed,

for(int thread=0;thread<numThreads;thread++) {
threads[thread] = new IndexThread(random, startLatch, stopLatch, w, docs, docCountLimit, count, mode,
groupBlockIndex, stop, refreshing, lastRefreshNS, docsPerSecPerThread, failed, updatesListener,
nrtEverySec, randomDocIDMax);
groupBlockIndex, stop, refreshing, lastRefreshNS, docsPerSecPerThreadRef, failed, updatesListener,
nrtEverySec, randomDocIDMax, enableUpdateStorms);
threads[thread].setName("Index #" + thread);
threads[thread].start();
}
Expand Down Expand Up @@ -141,18 +158,19 @@ private static class IndexThread extends Thread {
private final Mode mode;
private final CountDownLatch startLatch;
private final CountDownLatch stopLatch;
private final float docsPerSec;
private final AtomicReference<Double> docsPerSecRef;
private final Random random;
private final AtomicBoolean failed;
private final UpdatesListener updatesListener;
private final AtomicBoolean refreshing;
private final AtomicLong lastRefreshNS;
private final double nrtEverySec;
final int randomDocIDMax;
private final boolean enableUpdateStorms;
public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stopLatch, IndexWriter w,
LineFileDocs docs, int numTotalDocs, AtomicInteger count, Mode mode, AtomicInteger groupBlockIndex,
AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, float docsPerSec,
AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax) {
AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, AtomicReference<Double> docsPerSecRef,
AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax, boolean enableUpdateStorms) {
this.startLatch = startLatch;
this.stopLatch = stopLatch;
this.w = w;
Expand All @@ -162,14 +180,15 @@ public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stop
this.mode = mode;
this.groupBlockIndex = groupBlockIndex;
this.stop = stop;
this.docsPerSec = docsPerSec;
this.docsPerSecRef = docsPerSecRef;
this.random = random;
this.failed = failed;
this.updatesListener = updatesListener;
this.refreshing = refreshing;
this.lastRefreshNS = lastRefreshNS;
this.nrtEverySec = nrtEverySec;
this.randomDocIDMax = randomDocIDMax;
this.enableUpdateStorms = enableUpdateStorms;
}

private static Field getStringIDField(Document doc) {
Expand Down Expand Up @@ -329,7 +348,7 @@ public void remove() {

docState.doc.removeField("groupend");
}
} else if (docsPerSec > 0 && mode != null) {
} else if (docsPerSecRef.get() > 0 && mode != null) {
System.out.println("Indexing single docs (add/updateDocument)");
final long startNS = System.nanoTime();
int threadCount = 0;
Expand Down Expand Up @@ -382,10 +401,24 @@ public void remove() {
System.out.println("Indexer: " + docCount + " docs... (" + (System.currentTimeMillis() - tStart) + " msec)");
}

double docsPerSec = docsPerSecRef.get();
final long sleepNS = startNS + (long) (1000000000*(threadCount/docsPerSec)) - System.nanoTime();
if (sleepNS > 0) {
final long sleepMS = sleepNS/1000000;
final int sleepNS2 = (int) (sleepNS - sleepMS*1000000);
final long sleepMS;
final int sleepNS2;

if (enableUpdateStorms) {
// Cap sleep at 100ms for update storms to maintain responsiveness
final long maxSleepNS = 100000000L; // 100 ms in nanoseconds
final long actualSleepNS = Math.min(sleepNS, maxSleepNS);
sleepMS = actualSleepNS/1000000;
sleepNS2 = (int) (actualSleepNS - sleepMS*1000000);
System.out.println("Update storms: capped indexer sleep at " + sleepMS + " ms, " + sleepNS2 + " ns (requested: " + (sleepNS/1000000) + " ms)");
} else {
// Normal operation: no sleep cap, use original behavior
sleepMS = sleepNS/1000000;
sleepNS2 = (int) (sleepNS - sleepMS*1000000);
}
Thread.sleep(sleepMS, sleepNS2);
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/perf/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.CharArraySet;
Expand Down Expand Up @@ -573,8 +574,8 @@ public KnnVectorsFormat getKnnVectorsFormatForField(String field) {
float docsPerSecPerThread = -1f;
//float docsPerSecPerThread = 100f;

IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, docsPerSecPerThread, null, nrtEverySec,
randomDocIDMax);
IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, new AtomicReference<>((double)docsPerSecPerThread), null, nrtEverySec,
randomDocIDMax, false);

System.out.println("\nIndexer: start");
final long t0 = System.currentTimeMillis();
Expand Down
77 changes: 72 additions & 5 deletions src/main/perf/NRTPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.io.FileOutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
Expand All @@ -44,6 +46,7 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.NoDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
Expand All @@ -69,6 +72,22 @@
// - hmm: we really should have a separate line file, shuffled, that holds the IDs for each line; this way we can update doc w/ same original doc and then we can assert hit counts match
// - share *Task code from SearchPerfTest

// CONFIGURATION SETTINGS:
//
// Normal Operation Settings:
// - RAM Buffer Size: 256MB
// - TieredMergePolicy deletesPctAllowed: default (10.0)
// - Max merged segment size: 1000000MB (effectively unlimited)
// - ConcurrentMergeScheduler: 4 max merges, 1 thread
//
// Update Storms Settings (-updateStorms true, see nrtPerf.py for example):
// - RAM Buffer Size: 4GB
// - TieredMergePolicy deletesPctAllowed: 2.0
// - Max merged segment size: use Lucene defaults
// - ConcurrentMergeScheduler: use Lucene defaults for max merges & threads
// - Vector indexing enabled for testing heavy update workloads:
// docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, "/path/to/vectors_file", 768, VectorEncoding.FLOAT32);

public class NRTPerfTest {

static final class MergedReaderWarmer implements IndexWriter.IndexReaderWarmer {
Expand Down Expand Up @@ -243,6 +262,9 @@ public static void main(String[] args) throws Exception {
throw new FileNotFoundException("tasks file not found " + tasksFile);
}

// By default, disable update storms
final boolean enableUpdateStorms = args.length > 15 ? Boolean.parseBoolean(args[15]) : false;

final boolean hasProcMemInfo = Files.exists(Paths.get("/proc/meminfo"));

System.out.println("DIR=" + dirImpl);
Expand All @@ -256,6 +278,7 @@ public static void main(String[] args) throws Exception {
System.out.println("Reopen/sec=" + reopenPerSec);
System.out.println("Mode=" + mode);
System.out.println("tasksFile=" + tasksFile);
System.out.println("EnableUpdateStorms=" + enableUpdateStorms);

System.out.println("Record stats every " + statsEverySec + " seconds");
final int count = (int) ((runTimeSec / statsEverySec) + 2);
Expand All @@ -273,7 +296,8 @@ public static void main(String[] args) throws Exception {
System.out.println("Max merge MB/sec = " + (mergeMaxWriteMBPerSec <= 0.0 ? "unlimited" : mergeMaxWriteMBPerSec));
final Random random = new Random(seed);

final LineFileDocs docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null);
final LineFileDocs docs;
docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null);

final Directory dir0;
if (dirImpl.equals("MMapDirectory")) {
Expand All @@ -298,7 +322,8 @@ public static void main(String[] args) throws Exception {
StandardAnalyzer analyzer = new StandardAnalyzer(CharArraySet.EMPTY_SET);
final IndexWriterConfig conf = new IndexWriterConfig(analyzer);
conf.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
conf.setRAMBufferSizeMB(256.0);
conf.setRAMBufferSizeMB(256.0); // 256MB RAM buffer for normal operation

//iwc.setMergeScheduler(ms);

/*
Expand Down Expand Up @@ -328,9 +353,12 @@ public DocValuesFormat getDocValuesFormatForField(String field) {

TieredMergePolicy tmp = new TieredMergePolicy();
tmp.setNoCFSRatio(0.0);
tmp.setMaxMergedSegmentMB(1000000.0);
tmp.setMaxMergedSegmentMB(1000000.0); // effectively unlimited
//tmp.setReclaimDeletesWeight(3.0);
//tmp.setMaxMergedSegmentMB(7000.0);

tmp.setDeletesPctAllowed(2.0);

conf.setMergePolicy(tmp);

if (!commit.equals("none")) {
Expand All @@ -339,10 +367,14 @@ public DocValuesFormat getDocValuesFormatForField(String field) {

// Make sure merges run @ higher prio than indexing:
final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) conf.getMergeScheduler();
// Can swap to your own MergeScheduler impl
cms.setMaxMergesAndThreads(4, 1);

conf.setMergedSegmentWarmer(new MergedReaderWarmer(field));

// Set infoStream to log to file
// PrintStream infoStream = new PrintStream(new FileOutputStream("lucene-infostream.log", true), true, "UTF-8");
// conf.setInfoStream(infoStream);
final IndexWriter w = new IndexWriter(dir, conf);
// w.setInfoStream(System.out);

Expand All @@ -360,8 +392,9 @@ public void afterUpdate() {
}
};
IndexWriter.DocStats stats = w.getDocStats();
final AtomicReference<Double> docsPerSecRef = new AtomicReference<>(docsPerSec / numIndexThreads);
IndexThreads indexThreads = new IndexThreads(random, w, new AtomicBoolean(false), docs, numIndexThreads, -1, false, false, mode,
(float) (docsPerSec / numIndexThreads), updatesListener, -1.0, stats.maxDoc);
docsPerSecRef, updatesListener, -1.0, stats.maxDoc, enableUpdateStorms);

// NativePosixUtil.mlockTermsDict(startR, "id");
final SearcherManager manager = new SearcherManager(w, null);
Expand All @@ -376,12 +409,46 @@ public void afterUpdate() {
final IndexState indexState = new IndexState(null, manager, null, field, spellChecker, "FastVectorHighlighter", null, null);
TaskParserFactory taskParserFactory =
new TaskParserFactory(indexState, field, analyzer, field, 10, random, null, null, -1, true, TestContext.parse(""));
final TaskSource tasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) {
final double peaceTimeRate = docsPerSec / numIndexThreads;
// Conditionally create update storm thread
if (enableUpdateStorms) {
System.out.println("Starting update storms thread...");
// Periodically increase docsPerSec
Thread docsPerSecIncreaser = new Thread(() -> {
// Loop of update storm followed by peace time
while (true) {
try {
int increaseCount = 0;
int maxIncreases = 6;
Thread.sleep(10000); // 10 seconds peace time at the beginning
while (increaseCount < maxIncreases) {
double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2);
System.out.println("Increased docsPerSec per thread to " + newRate);
Thread.sleep(20000); // every 20 seconds
increaseCount++;
}
System.out.println("Reached max increases (" + maxIncreases + "), now peace time mode");
docsPerSecRef.set(peaceTimeRate);
System.out.println("Decreased docsPerSec per thread to " + (peaceTimeRate));
Thread.sleep(900000); // 15 minutes peace time
} catch (InterruptedException e) {
// exit thread
}
}
});
docsPerSecIncreaser.setDaemon(true);
docsPerSecIncreaser.start();
} else {
System.out.println("Update storms disabled - maintaining constant indexing rate");
}
// Aggressive delete storm task source
final TaskSource baseTasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) {
@Override
public void taskDone(Task task, long queueTimeNS, TotalHits toalHitCount) {
searchesByTime[currentQT.get()].incrementAndGet();
}
};
final TaskSource tasks = baseTasks;
System.out.println("Task repeat count 1");
System.out.println("Tasks file " + tasksFile);
System.out.println("Num task per cat 20");
Expand Down
2 changes: 1 addition & 1 deletion src/main/perf/SearchPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public void warm(LeafReader reader) throws IOException {
// hardwired false:
boolean addDVFields = mode == Mode.BDV_UPDATE || mode == Mode.NDV_UPDATE;
LineFileDocs lineFileDocs = new LineFileDocs(lineDocsFile, false, storeBody, tvsBody, false, cloneDocs, null, null, null, addDVFields, null, 0, null);
IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, docsPerSecPerThread, null, -1.0, -1);
IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, new AtomicReference<>((double)docsPerSecPerThread), null, -1.0, -1, false);
threads.start();

mgr = new SearcherManager(writer, new SearcherFactory() {
Expand Down
7 changes: 5 additions & 2 deletions src/main/perf/TaskThreads.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,17 @@ public void run() {

try {
while (stop.get() == false) {
final Task task = tasks.nextTask();
if (task == null) {
final Task originalTask = tasks.nextTask();
if (originalTask == null) {
// Done
this.tasksStopNanos = System.nanoTime();
// first thread that finishes snapshots all threads. this way we do not include "winddown" time in our measurement.
endThreadDetails.compareAndSet(null, new SearchPerfTest.ThreadDetails());
break;
}

// Clone the task to avoid reuse issues
final Task task = originalTask.clone();

// Run the task in the IndexSearcher's executor. This is important because IndexSearcher#search also uses the current thread to
// search, so not running #search from the executor would artificially use one more thread than configured via luceneutil.
Expand Down
13 changes: 13 additions & 0 deletions src/python/nrtPerf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

# Measures the NRT performance under different document updates mode
# (add, update, ndv_update, bdv_update)
#
# EXAMPLE USAGE:
#
# Normal operation:
# python nrtPerf.py -source /path/to/source -dps 100 -rps 1.0 -rts 3000
#
# With update storms enabled (aggressive indexing bursts):
# python nrtPerf.py -source /path/to/source -dps 200 -rps 0.06 -nst 8 -nit 8 -rts 3000 -updateStorms true

import os
import re
Expand Down Expand Up @@ -53,6 +61,7 @@ def runOne(
numIndexThreads=constants.INDEX_NUM_THREADS,
statsEverySec=1,
commit="no",
enableUpdateStorms=False,
):
logFileName = "%s/%s_dps%s_reopen%s.txt" % (constants.LOGS_DIR, mode, docsPerSec, reopensPerSec)
print("log: %s" % logFileName)
Expand All @@ -75,6 +84,8 @@ def runOne(
command += " %s" % commit
command += " 0.0"
command += " %s" % data.tasksFile
if enableUpdateStorms:
command += " true"
command += " > %s 2>&1" % logFileName

if VERBOSE:
Expand Down Expand Up @@ -190,6 +201,7 @@ def toString(self):
for rps in reopenPerSec.split(","):
print()
print("params: mode=%s docs/sec=%s reopen/sec=%s runTime(s)=%s searchThreads=%s indexThreads=%s" % (mode, dps, rps, runTimeSec, numSearchThreads, numIndexThreads))
enableUpdateStorms = benchUtil.getArg("-updateStorms", "false", True).lower() == "true"
reopenStats = runOne(
classpath=cp,
mode=mode,
Expand All @@ -200,6 +212,7 @@ def toString(self):
runTimeSec=runTimeSec,
numSearchThreads=numSearchThreads,
numIndexThreads=numIndexThreads,
enableUpdateStorms=enableUpdateStorms,
)
allStats.append((dps, rps, runTimeSec, reopenStats))

Expand Down