Skip to content

Commit 4e94bc4

Browse files
committed
Added medium priority threads and cleanup
1 parent 9fb7a3e commit 4e94bc4

9 files changed

+145
-35
lines changed

src/bucket/BucketManagerImpl.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -972,8 +972,9 @@ BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq)
972972
});
973973

974974
mEvictionFuture = task->get_future();
975-
mApp.postOnBackgroundThread(bind(&task_t::operator(), task),
976-
"SearchableBucketListSnapshot: eviction scan");
975+
mApp.postOnEvictionBackgroundThread(
976+
bind(&task_t::operator(), task),
977+
"SearchableBucketListSnapshot: eviction scan");
977978
}
978979

979980
void
@@ -982,13 +983,13 @@ BucketManagerImpl::resolveBackgroundEvictionScan(
982983
LedgerKeySet const& modifiedKeys)
983984
{
984985
ZoneScoped;
986+
releaseAssert(threadIsMain());
985987

986988
if (!mEvictionFuture.valid())
987989
{
988990
startBackgroundEvictionScan(ledgerSeq);
989991
}
990992

991-
mEvictionFuture.wait();
992993
auto evictionCandidates = mEvictionFuture.get();
993994

994995
auto const& networkConfig =
@@ -1000,7 +1001,6 @@ BucketManagerImpl::resolveBackgroundEvictionScan(
10001001
networkConfig.stateArchivalSettings()))
10011002
{
10021003
startBackgroundEvictionScan(ledgerSeq);
1003-
mEvictionFuture.wait();
10041004
evictionCandidates = mEvictionFuture.get();
10051005
}
10061006

src/bucket/BucketManagerImpl.h

-11
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,6 @@ class BucketManagerImpl : public BucketManager
6262

6363
std::future<EvictionResult> mEvictionFuture{};
6464

65-
// Lock for managing raw Bucket files or the bucket directory. This lock is
66-
// only required for file access, but is not required for logical changes to
67-
// the BucketList (i.e. addBatch).
68-
mutable std::recursive_mutex mBucketFileMutex;
69-
70-
// Lock for logical BucketList changes and snapshots (i.e. addBatch,
71-
// getSearchableSnapshot). This lock is not required for raw Bucket file
72-
// management.
73-
mutable std::recursive_mutex mBucketSnapshotMutex;
74-
75-
7665
bool const mDeleteEntireBucketDirInDtor;
7766

7867
// Records bucket-merges that are currently _live_ in some FutureBucket, in

src/main/Application.h

+6
Original file line numberDiff line numberDiff line change
@@ -226,12 +226,18 @@ class Application
226226
// this io_context will execute in parallel with the calling thread, so use
227227
// with caution.
228228
virtual asio::io_context& getWorkerIOContext() = 0;
229+
virtual asio::io_context& getEvictionIOContext() = 0;
229230

230231
virtual void postOnMainThread(
231232
std::function<void()>&& f, std::string&& name,
232233
Scheduler::ActionType type = Scheduler::ActionType::NORMAL_ACTION) = 0;
234+
235+
// While both are lower priority than the main thread, eviction threads have
236+
// more priority than regular worker background threads
233237
virtual void postOnBackgroundThread(std::function<void()>&& f,
234238
std::string jobName) = 0;
239+
virtual void postOnEvictionBackgroundThread(std::function<void()>&& f,
240+
std::string jobName) = 0;
235241

236242
// Perform actions necessary to transition from BOOTING_STATE to other
237243
// states. In particular: either reload or reinitialize the database, and

src/main/ApplicationImpl.cpp

+72-6
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,20 @@ namespace stellar
8181
ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg)
8282
: mVirtualClock(clock)
8383
, mConfig(cfg)
84-
, mWorkerIOContext(mConfig.WORKER_THREADS)
84+
// Allocate one worker to eviction when background eviction enabled
85+
, mWorkerIOContext(mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN
86+
? mConfig.WORKER_THREADS - 1
87+
: mConfig.WORKER_THREADS)
88+
, mEvictionIOContext(mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN
89+
? std::make_optional<asio::io_context>(1)
90+
: std::nullopt)
8591
, mWork(std::make_unique<asio::io_context::work>(mWorkerIOContext))
92+
, mEvictionWork(
93+
mEvictionIOContext
94+
? std::make_unique<asio::io_context::work>(*mEvictionIOContext)
95+
: nullptr)
8696
, mWorkerThreads()
97+
, mEvictionThread()
8798
, mStopSignals(clock.getIOContext(), SIGINT)
8899
, mStarted(false)
89100
, mStopping(false)
@@ -135,6 +146,21 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg)
135146

136147
auto t = mConfig.WORKER_THREADS;
137148
LOG_DEBUG(DEFAULT_LOG, "Application constructing (worker threads: {})", t);
149+
150+
if (mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN)
151+
{
152+
releaseAssert(mConfig.WORKER_THREADS > 0);
153+
releaseAssert(mEvictionIOContext);
154+
155+
// Allocate one thread for Eviction scan
156+
mEvictionThread = std::thread{[this]() {
157+
runCurrentThreadWithMediumPriority();
158+
mEvictionIOContext->run();
159+
}};
160+
161+
--t;
162+
}
163+
138164
while (t--)
139165
{
140166
auto thread = std::thread{[this]() {
@@ -760,12 +786,21 @@ ApplicationImpl::validateAndLogConfig()
760786
"stellar-core new-db.");
761787
}
762788

763-
if (mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN &&
764-
!mConfig.isUsingBucketListDB())
789+
if (mConfig.EXPERIMENTAL_BACKGROUND_EVICTION_SCAN)
765790
{
766-
throw std::invalid_argument(
767-
"EXPERIMENTAL_BUCKETLIST_DB must be enabled to use "
768-
"EXPERIMENTAL_BACKGROUND_EVICTION_SCAN");
791+
if (!mConfig.isUsingBucketListDB())
792+
{
793+
throw std::invalid_argument(
794+
"EXPERIMENTAL_BUCKETLIST_DB must be enabled to use "
795+
"EXPERIMENTAL_BACKGROUND_EVICTION_SCAN");
796+
}
797+
798+
if (mConfig.WORKER_THREADS < 2)
799+
{
800+
throw std::invalid_argument(
801+
"EXPERIMENTAL_BACKGROUND_EVICTION_SCAN requires "
802+
"WORKER_THREADS > 1");
803+
}
769804
}
770805

771806
if (isNetworkedValidator && mConfig.isInMemoryMode())
@@ -917,6 +952,18 @@ ApplicationImpl::joinAllThreads()
917952
{
918953
w.join();
919954
}
955+
956+
if (mEvictionWork)
957+
{
958+
mEvictionWork.reset();
959+
}
960+
961+
if (mEvictionThread)
962+
{
963+
LOG_DEBUG(DEFAULT_LOG, "Joining eviction thread");
964+
mEvictionThread->join();
965+
}
966+
920967
LOG_DEBUG(DEFAULT_LOG, "Joined all {} threads", mWorkerThreads.size());
921968
}
922969

@@ -1386,6 +1433,13 @@ ApplicationImpl::getWorkerIOContext()
13861433
return mWorkerIOContext;
13871434
}
13881435

1436+
asio::io_context&
1437+
ApplicationImpl::getEvictionIOContext()
1438+
{
1439+
releaseAssert(mEvictionIOContext);
1440+
return *mEvictionIOContext;
1441+
}
1442+
13891443
void
13901444
ApplicationImpl::postOnMainThread(std::function<void()>&& f, std::string&& name,
13911445
Scheduler::ActionType type)
@@ -1418,6 +1472,18 @@ ApplicationImpl::postOnBackgroundThread(std::function<void()>&& f,
14181472
});
14191473
}
14201474

1475+
void
1476+
ApplicationImpl::postOnEvictionBackgroundThread(std::function<void()>&& f,
1477+
std::string jobName)
1478+
{
1479+
LogSlowExecution isSlow{std::move(jobName), LogSlowExecution::Mode::MANUAL,
1480+
"executed after"};
1481+
asio::post(getEvictionIOContext(), [this, f = std::move(f), isSlow]() {
1482+
mPostOnBackgroundThreadDelay.Update(isSlow.checkElapsedTime());
1483+
f();
1484+
});
1485+
}
1486+
14211487
void
14221488
ApplicationImpl::enableInvariantsFromConfig()
14231489
{

src/main/ApplicationImpl.h

+11
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,13 @@ class ApplicationImpl : public Application
7878
virtual StatusManager& getStatusManager() override;
7979

8080
virtual asio::io_context& getWorkerIOContext() override;
81+
virtual asio::io_context& getEvictionIOContext() override;
8182
virtual void postOnMainThread(std::function<void()>&& f, std::string&& name,
8283
Scheduler::ActionType type) override;
8384
virtual void postOnBackgroundThread(std::function<void()>&& f,
8485
std::string jobName) override;
86+
virtual void postOnEvictionBackgroundThread(std::function<void()>&& f,
87+
std::string jobName) override;
8588

8689
virtual void start() override;
8790
void startServices();
@@ -142,7 +145,9 @@ class ApplicationImpl : public Application
142145
// subsystems.
143146

144147
asio::io_context mWorkerIOContext;
148+
std::optional<asio::io_context> mEvictionIOContext;
145149
std::unique_ptr<asio::io_context::work> mWork;
150+
std::unique_ptr<asio::io_context::work> mEvictionWork;
146151

147152
std::unique_ptr<BucketManager> mBucketManager;
148153
std::unique_ptr<Database> mDatabase;
@@ -188,6 +193,12 @@ class ApplicationImpl : public Application
188193

189194
std::vector<std::thread> mWorkerThreads;
190195

196+
// Unlike mWorkerThreads (which are low priority), eviction scans require a
197+
// medium priority thread. In the future, this may become a more general
198+
// higher-priority worker thread type, but for now we only need a single
199+
// thread for eviction scans.
200+
std::optional<std::thread> mEvictionThread;
201+
191202
asio::signal_set mStopSignals;
192203

193204
bool mStarted;

src/main/Config.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -1331,7 +1331,7 @@ Config::processConfig(std::shared_ptr<cpptoml::table> t)
13311331
}
13321332
else if (item.first == "WORKER_THREADS")
13331333
{
1334-
WORKER_THREADS = readInt<int>(item, 1, 1000);
1334+
WORKER_THREADS = readInt<int>(item, 2, 1000);
13351335
}
13361336
else if (item.first == "MAX_CONCURRENT_SUBPROCESSES")
13371337
{

src/test/FuzzerImpl.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -867,7 +867,7 @@ getFuzzConfig(int instanceNumber)
867867
cfg.ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING = false;
868868
cfg.ARTIFICIALLY_SET_CLOSE_TIME_FOR_TESTING = UINT32_MAX;
869869
cfg.HTTP_PORT = 0;
870-
cfg.WORKER_THREADS = 1;
870+
cfg.WORKER_THREADS = 2;
871871
cfg.QUORUM_INTERSECTION_CHECKER = false;
872872
cfg.PREFERRED_PEERS_ONLY = false;
873873
cfg.RUN_STANDALONE = true;

src/util/Thread.cpp

+49-12
Original file line numberDiff line numberDiff line change
@@ -19,64 +19,101 @@ namespace stellar
1919

2020
#if defined(_WIN32)
2121

22-
void
23-
runCurrentThreadWithLowPriority()
22+
static void
23+
runCurrentThreadWithPriority(int priority)
2424
{
2525
HANDLE curThread = ::GetCurrentThread();
26-
BOOL ret = ::SetThreadPriority(curThread, THREAD_PRIORITY_BELOW_NORMAL);
26+
BOOL ret = ::SetThreadPriority(curThread, priority);
2727

2828
if (!ret)
2929
{
3030
LOG_DEBUG(DEFAULT_LOG, "Unable to set priority for thread: {}", ret);
3131
}
3232
}
3333

34-
#elif defined(__linux__)
35-
3634
void
3735
runCurrentThreadWithLowPriority()
3836
{
39-
constexpr auto const LOW_PRIORITY_NICE = 5;
37+
runCurrentThreadWithPriority(THREAD_PRIORITY_LOWEST);
38+
}
39+
40+
void
41+
runCurrentThreadWithMediumPriority()
42+
{
43+
runCurrentThreadWithPriority(THREAD_PRIORITY_BELOW_NORMAL);
44+
}
4045

41-
auto newNice = nice(LOW_PRIORITY_NICE);
42-
if (newNice != LOW_PRIORITY_NICE)
46+
#elif defined(__linux__)
47+
48+
static void
49+
runCurrentThreadWithPriority(int priority)
50+
{
51+
auto newNice = nice(priority);
52+
if (newNice != priority)
4353
{
4454
LOG_DEBUG(DEFAULT_LOG, "Unable to run worker thread with low priority. "
4555
"Normal priority will be used.");
4656
}
4757
}
4858

49-
#elif defined(__APPLE__)
50-
5159
void
5260
runCurrentThreadWithLowPriority()
61+
{
62+
runCurrentThreadWithPriority(/*LOW_PRIORITY_NICE*/ 5);
63+
}
64+
65+
void
66+
runCurrentThreadWithMediumPriority()
67+
{
68+
runCurrentThreadWithPriority(/*MED_PRIORITY_NICE*/ 3);
69+
}
70+
71+
#elif defined(__APPLE__)
72+
73+
static void
74+
runCurrentThreadWithPriority(int priority)
5375
{
5476
// Default MacOS priority is 31 in a user-mode band from 0..63, niceing (or
5577
// other priority-adjustment) usually subtracts from there. Range is +/- 16,
5678
// with lower meaning lower (i.e. UTILITY class is 20). The standard
5779
// pthreads API works for adjusting a single thread's priority.
58-
constexpr auto const LOW_PRIORITY_NICE = 5;
5980
struct sched_param sp;
6081
int policy;
6182
int ret = pthread_getschedparam(pthread_self(), &policy, &sp);
6283
if (ret != 0)
6384
{
6485
LOG_DEBUG(DEFAULT_LOG, "Unable to get priority for thread: {}", ret);
6586
}
66-
sp.sched_priority -= LOW_PRIORITY_NICE;
87+
sp.sched_priority -= priority;
6788
ret = pthread_setschedparam(pthread_self(), policy, &sp);
6889
if (ret != 0)
6990
{
7091
LOG_DEBUG(DEFAULT_LOG, "Unable to set priority for thread: {}", ret);
7192
}
7293
}
7394

95+
void
96+
runCurrentThreadWithLowPriority()
97+
{
98+
runCurrentThreadWithPriority(/*LOW_PRIORITY_NICE*/ 5);
99+
}
100+
101+
void
102+
runCurrentThreadWithMediumPriority()
103+
{
104+
runCurrentThreadWithPriority(/*MED_PRIORITY_NICE*/ 3);
105+
}
74106
#else
75107

76108
void
77109
runCurrentThreadWithLowPriority()
78110
{
79111
}
80112

113+
void
114+
runCurrentThreadWithMediumPriority()
115+
{
116+
}
117+
81118
#endif
82119
}

src/util/Thread.h

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ namespace stellar
1212
{
1313

1414
void runCurrentThreadWithLowPriority();
15+
void runCurrentThreadWithMediumPriority();
1516

1617
template <typename T>
1718
bool

0 commit comments

Comments
 (0)