|
39 | 39 | #include "core/typedefs.h" |
40 | 40 |
|
41 | 41 | class CommandQueueMT { |
| 42 | + static const size_t MAX_COMMAND_SIZE = 1024; |
| 43 | + |
42 | 44 | struct CommandBase { |
43 | 45 | bool sync = false; |
44 | 46 | virtual void call() = 0; |
@@ -105,6 +107,7 @@ class CommandQueueMT { |
105 | 107 |
|
106 | 108 | static const uint32_t DEFAULT_COMMAND_MEM_SIZE_KB = 64; |
107 | 109 |
|
| 110 | + bool unique_flusher = false; |
108 | 111 | BinaryMutex mutex; |
109 | 112 | LocalVector<uint8_t> command_mem; |
110 | 113 | ConditionVariable sync_cond_var; |
@@ -154,29 +157,46 @@ class CommandQueueMT { |
154 | 157 | } |
155 | 158 |
|
156 | 159 | void _flush() { |
| 160 | + MutexLock lock(mutex); |
| 161 | + |
157 | 162 | if (unlikely(flush_read_ptr)) { |
158 | 163 | // Re-entrant call. |
159 | 164 | return; |
160 | 165 | } |
161 | 166 |
|
162 | | - MutexLock lock(mutex); |
| 167 | + char cmd_backup[MAX_COMMAND_SIZE]; |
163 | 168 |
|
164 | 169 | while (flush_read_ptr < command_mem.size()) { |
165 | 170 | uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr]; |
166 | | - flush_read_ptr += 8; |
| 171 | + flush_read_ptr += sizeof(uint64_t); |
| 172 | + |
167 | 173 | CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]); |
168 | | - uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock); |
169 | | - cmd->call(); |
170 | | - WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id); |
| 174 | + |
| 175 | + // Protect against race condition between this thread |
| 176 | + // during the call to the command and other threads potentially |
| 177 | + // invalidating the pointer due to reallocs. |
| 178 | + memcpy(cmd_backup, (char *)cmd, size); |
| 179 | + |
| 180 | + if (unique_flusher) { |
| 181 | + // A single thread will pump; the lock is only needed for the command queue itself. |
| 182 | + lock.temp_unlock(); |
| 183 | + ((CommandBase *)cmd_backup)->call(); |
| 184 | + lock.temp_relock(); |
| 185 | + } else { |
| 186 | + // At least we can unlock during WTP operations. |
| 187 | + uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock); |
| 188 | + ((CommandBase *)cmd_backup)->call(); |
| 189 | + WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id); |
| 190 | + } |
171 | 191 |
|
172 | 192 | // Handle potential realloc due to the command and unlock allowance. |
173 | 193 | cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]); |
174 | 194 |
|
175 | 195 | if (unlikely(cmd->sync)) { |
176 | 196 | sync_head++; |
177 | | - lock.~MutexLock(); // Give an opportunity to awaiters right away. |
| 197 | + lock.temp_unlock(); // Give an opportunity to awaiters right away. |
178 | 198 | sync_cond_var.notify_all(); |
179 | | - new (&lock) MutexLock(mutex); |
| 199 | + lock.temp_relock(); |
180 | 200 | // Handle potential realloc happened during unlock. |
181 | 201 | cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]); |
182 | 202 | } |
@@ -210,20 +230,23 @@ class CommandQueueMT { |
210 | 230 | void push(T *p_instance, M p_method, Args &&...p_args) { |
211 | 231 | // Standard command, no sync. |
212 | 232 | using CommandType = Command<T, M, false, Args...>; |
| 233 | + static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE); |
213 | 234 | _push_internal<CommandType, false>(p_instance, p_method, std::forward<Args>(p_args)...); |
214 | 235 | } |
215 | 236 |
|
216 | 237 | template <typename T, typename M, typename... Args> |
217 | 238 | void push_and_sync(T *p_instance, M p_method, Args... p_args) { |
218 | 239 | // Standard command, sync. |
219 | 240 | using CommandType = Command<T, M, true, Args...>; |
| 241 | + static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE); |
220 | 242 | _push_internal<CommandType, true>(p_instance, p_method, std::forward<Args>(p_args)...); |
221 | 243 | } |
222 | 244 |
|
223 | 245 | template <typename T, typename M, typename R, typename... Args> |
224 | 246 | void push_and_ret(T *p_instance, M p_method, R *r_ret, Args... p_args) { |
225 | 247 | // Command with return value, sync. |
226 | 248 | using CommandType = CommandRet<T, M, R, Args...>; |
| 249 | + static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE); |
227 | 250 | _push_internal<CommandType, true>(p_instance, p_method, r_ret, std::forward<Args>(p_args)...); |
228 | 251 | } |
229 | 252 |
|
@@ -252,7 +275,8 @@ class CommandQueueMT { |
252 | 275 | pump_task_id = p_task_id; |
253 | 276 | } |
254 | 277 |
|
255 | | - CommandQueueMT() { |
| 278 | + CommandQueueMT(bool p_unique_flusher = false) : |
| 279 | + unique_flusher(p_unique_flusher) { |
256 | 280 | command_mem.reserve(DEFAULT_COMMAND_MEM_SIZE_KB * 1024); |
257 | 281 | } |
258 | 282 | }; |
0 commit comments