-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy paththread_queue.h
188 lines (173 loc) · 5.19 KB
/
thread_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// The core library - copyright GarageGames. The core library is released under the MIT Open Source software license. See /license.txt in this distribution for specific license terms.
/// Managing object for a queue of worker threads that pass
/// messages back and forth to the main thread. A request to a thread_queue will return a token that can be used to track the status of the call as well as cancel operations in flight.
class thread_queue : public ref_object
{
public:
/// thread_queue constructor. threadCount specifies the number of worker threads that will be created.
thread_queue(uint32 threadCount)
{
_current_index = 0;
_storage.set((void *) 1);
for(uint32 i = 0; i < threadCount; i++)
{
thread *theThread = new thread_queue_thread(this);
_threads.push_back(theThread);
theThread->start();
}
}
~thread_queue()
{
}
/// Dispatches all thread_queue calls queued by worker threads. This should
/// be called periodically from a main loop.
bool get_next_result(byte_buffer_ptr &result_buffer, uint32 &request_index)
{
bool found = false;
lock();
for(uint32 i = 0; i < _process_list.size();)
{
if(_process_list[i]->state == process_record::process_complete)
{
process_record *rec = _process_list[i];
if(rec->cancelled)
{
delete rec;
_process_list.erase(i);
continue;
}
result_buffer = rec->response_buffer;
request_index = rec->request_index;
found = true;
delete rec;
_process_list.erase(i);
break;
}
i++;
}
unlock();
return found;
}
/// Posts a request to be handled.
uint32 post_request(const byte_buffer_ptr &the_request)
{
lock();
uint32 index = _current_index++;
process_record *record = new process_record;
record->request_buffer = the_request;
record->state = process_record::not_yet_processed;
record->request_index = index;
record->cancelled = false;
record->progress = 0;
_process_list.push_back(record);
_semaphore.increment();
unlock();
return index;
}
/// Cancels a request in flight -- worker process must periodically check cancellation flag to actually stop the process.
bool cancel_request(uint32 request_index)
{
bool found = false;
lock();
for(uint32 i = 0; i < _process_list.size(); i++)
{
process_record *rec = _process_list[i];
if(rec->request_index == request_index)
{
if(rec->state == process_record::not_yet_processed || rec->state == process_record::process_complete)
{
delete rec;
_process_list.erase(i);
}
else
rec->cancelled = true;
found = true;
break;
}
}
unlock();
return found;
}
/// Process one request on this queue -- should periodically check the value of *cancelled and early out if it's set to true.
virtual void process_request(const byte_buffer_ptr &the_request, byte_buffer_ptr &the_response, bool *cancelled, float *progress) = 0;
protected:
struct process_record
{
enum {
not_yet_processed,
in_process,
process_complete,
} state;
uint32 request_index;
byte_buffer_ptr request_buffer;
byte_buffer_ptr response_buffer;
float progress;
bool cancelled;
};
class thread_queue_thread : public thread
{
thread_queue *_thread_queue;
public:
thread_queue_thread(thread_queue *q)
{
_thread_queue = q;
}
uint32 run()
{
_thread_queue->thread_start();
_thread_queue->lock();
thread_storage &sto = _thread_queue->get_storage();
sto.set((void *) 0);
_thread_queue->unlock();
for(;;)
_thread_queue->dispatch_next_call();
return 0;
}
};
uint32 _current_index;
friend class thread_queue_thread;
/// list of worker threads on this thread_queue
array<thread *> _threads;
/// list of elements in process
array<process_record *> _process_list;
/// Synchronization variable that manages worker threads
semaphore _semaphore;
/// Internal mutex for synchronizing access to thread call vectors.
mutex _lock;
/// Storage variable that tracks whether this is the main thread or a worker thread.
thread_storage _storage;
protected:
/// Locks the thread_queue for access to member variables.
void lock() { _lock.lock(); }
/// Unlocks the thread_queue.
void unlock() { _lock.unlock(); }
/// Dispatches the next available worker thread call. Called internally by the worker threads when they awaken from the semaphore.
void dispatch_next_call()
{
_semaphore.wait();
lock();
process_record *the_record = 0;
for(uint32 i = 0; i < _process_list.size(); i++)
{
if(_process_list[i]->state == process_record::not_yet_processed)
{
the_record = _process_list[i];
break;
}
}
if(!the_record)
{
unlock();
return;
}
the_record->state = process_record::in_process;
unlock();
process_request(the_record->request_buffer, the_record->response_buffer, &(the_record->cancelled), &(the_record->progress));
the_record->state = process_record::process_complete;
}
/// helper function to determine if the currently executing thread is a worker thread or the main thread.
bool is_main_thread() { return (bool) _storage.get(); }
thread_storage &get_storage() { return _storage; }
/// called by each worker thread when it starts for subclass initialization of worker threads.
virtual void thread_start() { }
};