11
11
#include " historywork/GetAndUnzipRemoteFileWork.h"
12
12
#include " historywork/VerifyBucketWork.h"
13
13
#include " work/WorkWithCallback.h"
14
- #include " xdr/Stellar-contract-config-setting.h"
15
14
#include < Tracy.hpp>
16
15
#include < fmt/format.h>
17
16
@@ -73,13 +72,13 @@ DownloadBucketsWork::resetIter()
73
72
mNextHotBucketIter = mHotHashes .begin ();
74
73
}
75
74
76
- template <typename BucketT>
75
+ template <typename BucketT>
77
76
bool
78
77
DownloadBucketsWork::onSuccessCb (
79
78
Application& app, FileTransferInfo const & ft, std::string const & hash,
80
- int currId,
81
- std::map<std::string , std::shared_ptr< BucketT>>& buckets ,
82
- std::map< int , std::unique_ptr< typename BucketT::IndexT const >>& indexMap )
79
+ int currId, std::map<std::string, std::shared_ptr<BucketT>>& buckets,
80
+ std::map<int , std::unique_ptr< typename BucketT::IndexT const >>& indexMap ,
81
+ std::lock_guard< std::mutex> const & indexMapLock )
83
82
{
84
83
auto bucketPath = ft.localPath_nogz ();
85
84
auto indexIter = indexMap.find (currId);
@@ -140,29 +139,47 @@ DownloadBucketsWork::yieldMoreWork()
140
139
if (isHotHash)
141
140
{
142
141
auto currId = mHotIndexId ++;
142
+ mHotIndexMapMutex .lock ();
143
143
auto [indexIter, inserted] = mHotIndexMap .emplace (currId, nullptr );
144
+ mHotIndexMapMutex .unlock ();
144
145
releaseAssertOrThrow (inserted);
145
146
verifyWork = std::make_shared<VerifyBucketWork<HotArchiveBucket>>(mApp , ft.localPath_nogz (),
146
147
hexToBin256 (hash),
147
148
indexIter->second , failureCb);
148
- adoptBucketCb = [this , &ft, hash, currId](Application& app) {
149
- return onSuccessCb<HotArchiveBucket>(app, ft, hash, currId,
150
- mHotBuckets , mHotIndexMap );
149
+ adoptBucketCb = [weakSelf, ft, hash, currId](Application& app) {
150
+ auto self = weakSelf.lock ();
151
+ if (self)
152
+ {
153
+ std::lock_guard lock (self->mHotIndexMapMutex );
154
+ return onSuccessCb<HotArchiveBucket>(app, ft, hash, currId,
155
+ self->mHotBuckets ,
156
+ self->mHotIndexMap , lock);
157
+ }
158
+ return true ;
151
159
};
152
160
153
161
mNextHotBucketIter ++;
154
162
}
155
163
else
156
164
{
157
165
auto currId = mLiveIndexId ++;
166
+ mLiveIndexMapMutex .lock ();
158
167
auto [indexIter, inserted] = mLiveIndexMap .emplace (currId, nullptr );
168
+ mLiveIndexMapMutex .unlock ();
159
169
releaseAssertOrThrow (inserted);
160
170
verifyWork = std::make_shared<VerifyBucketWork<LiveBucket>>(mApp , ft.localPath_nogz (),
161
171
hexToBin256 (hash),
162
172
indexIter->second , failureCb);
163
- adoptBucketCb = [this , &ft, hash, currId](Application& app) {
164
- return onSuccessCb<LiveBucket>(app, ft, hash, currId,
165
- mLiveBuckets , mLiveIndexMap );
173
+ adoptBucketCb = [weakSelf, ft, hash, currId](Application& app) {
174
+ auto self = weakSelf.lock ();
175
+ if (self)
176
+ {
177
+ std::lock_guard lock (self->mLiveIndexMapMutex );
178
+ return onSuccessCb<LiveBucket>(app, ft, hash, currId,
179
+ self->mLiveBuckets ,
180
+ self->mLiveIndexMap , lock);
181
+ }
182
+ return true ;
166
183
};
167
184
168
185
mNextLiveBucketIter ++;
@@ -181,10 +198,12 @@ DownloadBucketsWork::yieldMoreWork()
181
198
template bool DownloadBucketsWork::onSuccessCb<LiveBucket>(
182
199
Application&, FileTransferInfo const &, std::string const &, int ,
183
200
std::map<std::string, std::shared_ptr<LiveBucket>>&,
184
- std::map<int , std::unique_ptr<LiveBucketIndex const >>&);
201
+ std::map<int , std::unique_ptr<LiveBucketIndex const >>&,
202
+ std::lock_guard<std::mutex> const &);
185
203
186
204
template bool DownloadBucketsWork::onSuccessCb<HotArchiveBucket>(
187
205
Application&, FileTransferInfo const &, std::string const &, int ,
188
206
std::map<std::string, std::shared_ptr<HotArchiveBucket>>&,
189
- std::map<int , std::unique_ptr<HotArchiveBucketIndex const >>&);
207
+ std::map<int , std::unique_ptr<HotArchiveBucketIndex const >>&,
208
+ std::lock_guard<std::mutex> const &);
190
209
}
0 commit comments