Skip to content

Commit 7f4857e

Browse files
committed
Add Queue module
1 parent 85d6bc1 commit 7f4857e

File tree

3 files changed

+571
-0
lines changed

3 files changed

+571
-0
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ STDLIB_TESTS := \
105105
test/test_mutex \
106106
test/test_operator \
107107
test/test_quopri \
108+
test/test_queue \
108109
test/test_rfc822 \
109110
test/test_sched \
110111
test/test_select \

third_party/stdlib/Queue.py

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
"""A multi-producer, multi-consumer queue."""
2+
3+
from time import time as _time
4+
#try:
5+
import threading as _threading
6+
#except ImportError:
7+
# import dummy_threading as _threading
8+
from collections import deque
9+
import heapq
10+
11+
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
12+
13+
class Empty(Exception):
14+
"Exception raised by Queue.get(block=0)/get_nowait()."
15+
pass
16+
17+
class Full(Exception):
18+
"Exception raised by Queue.put(block=0)/put_nowait()."
19+
pass
20+
21+
class Queue(object):
22+
"""Create a queue object with a given maximum size.
23+
24+
If maxsize is <= 0, the queue size is infinite.
25+
"""
26+
def __init__(self, maxsize=0):
27+
self.maxsize = maxsize
28+
self._init(maxsize)
29+
# mutex must be held whenever the queue is mutating. All methods
30+
# that acquire mutex must release it before returning. mutex
31+
# is shared between the three conditions, so acquiring and
32+
# releasing the conditions also acquires and releases mutex.
33+
self.mutex = _threading.Lock()
34+
# Notify not_empty whenever an item is added to the queue; a
35+
# thread waiting to get is notified then.
36+
self.not_empty = _threading.Condition(self.mutex)
37+
# Notify not_full whenever an item is removed from the queue;
38+
# a thread waiting to put is notified then.
39+
self.not_full = _threading.Condition(self.mutex)
40+
# Notify all_tasks_done whenever the number of unfinished tasks
41+
# drops to zero; thread waiting to join() is notified to resume
42+
self.all_tasks_done = _threading.Condition(self.mutex)
43+
self.unfinished_tasks = 0
44+
45+
def task_done(self):
46+
"""Indicate that a formerly enqueued task is complete.
47+
48+
Used by Queue consumer threads. For each get() used to fetch a task,
49+
a subsequent call to task_done() tells the queue that the processing
50+
on the task is complete.
51+
52+
If a join() is currently blocking, it will resume when all items
53+
have been processed (meaning that a task_done() call was received
54+
for every item that had been put() into the queue).
55+
56+
Raises a ValueError if called more times than there were items
57+
placed in the queue.
58+
"""
59+
self.all_tasks_done.acquire()
60+
try:
61+
unfinished = self.unfinished_tasks - 1
62+
if unfinished <= 0:
63+
if unfinished < 0:
64+
raise ValueError('task_done() called too many times')
65+
self.all_tasks_done.notify_all()
66+
self.unfinished_tasks = unfinished
67+
finally:
68+
self.all_tasks_done.release()
69+
70+
def join(self):
71+
"""Blocks until all items in the Queue have been gotten and processed.
72+
73+
The count of unfinished tasks goes up whenever an item is added to the
74+
queue. The count goes down whenever a consumer thread calls task_done()
75+
to indicate the item was retrieved and all work on it is complete.
76+
77+
When the count of unfinished tasks drops to zero, join() unblocks.
78+
"""
79+
self.all_tasks_done.acquire()
80+
try:
81+
while self.unfinished_tasks:
82+
self.all_tasks_done.wait()
83+
finally:
84+
self.all_tasks_done.release()
85+
86+
def qsize(self):
87+
"""Return the approximate size of the queue (not reliable!)."""
88+
self.mutex.acquire()
89+
n = self._qsize()
90+
self.mutex.release()
91+
return n
92+
93+
def empty(self):
94+
"""Return True if the queue is empty, False otherwise (not reliable!)."""
95+
self.mutex.acquire()
96+
n = not self._qsize()
97+
self.mutex.release()
98+
return n
99+
100+
def full(self):
101+
"""Return True if the queue is full, False otherwise (not reliable!)."""
102+
self.mutex.acquire()
103+
n = 0 < self.maxsize == self._qsize()
104+
self.mutex.release()
105+
return n
106+
107+
def put(self, item, block=True, timeout=None):
108+
"""Put an item into the queue.
109+
110+
If optional args 'block' is true and 'timeout' is None (the default),
111+
block if necessary until a free slot is available. If 'timeout' is
112+
a non-negative number, it blocks at most 'timeout' seconds and raises
113+
the Full exception if no free slot was available within that time.
114+
Otherwise ('block' is false), put an item on the queue if a free slot
115+
is immediately available, else raise the Full exception ('timeout'
116+
is ignored in that case).
117+
"""
118+
self.not_full.acquire()
119+
try:
120+
if self.maxsize > 0:
121+
if not block:
122+
if self._qsize() == self.maxsize:
123+
raise Full
124+
elif timeout is None:
125+
while self._qsize() == self.maxsize:
126+
self.not_full.wait()
127+
elif timeout < 0:
128+
raise ValueError("'timeout' must be a non-negative number")
129+
else:
130+
endtime = _time() + timeout
131+
while self._qsize() == self.maxsize:
132+
remaining = endtime - _time()
133+
if remaining <= 0.0:
134+
raise Full
135+
self.not_full.wait(remaining)
136+
self._put(item)
137+
self.unfinished_tasks += 1
138+
self.not_empty.notify()
139+
finally:
140+
self.not_full.release()
141+
142+
def put_nowait(self, item):
143+
"""Put an item into the queue without blocking.
144+
145+
Only enqueue the item if a free slot is immediately available.
146+
Otherwise raise the Full exception.
147+
"""
148+
return self.put(item, False)
149+
150+
def get(self, block=True, timeout=None):
151+
"""Remove and return an item from the queue.
152+
153+
If optional args 'block' is true and 'timeout' is None (the default),
154+
block if necessary until an item is available. If 'timeout' is
155+
a non-negative number, it blocks at most 'timeout' seconds and raises
156+
the Empty exception if no item was available within that time.
157+
Otherwise ('block' is false), return an item if one is immediately
158+
available, else raise the Empty exception ('timeout' is ignored
159+
in that case).
160+
"""
161+
self.not_empty.acquire()
162+
try:
163+
if not block:
164+
if not self._qsize():
165+
raise Empty
166+
elif timeout is None:
167+
while not self._qsize():
168+
self.not_empty.wait()
169+
elif timeout < 0:
170+
raise ValueError("'timeout' must be a non-negative number")
171+
else:
172+
endtime = _time() + timeout
173+
while not self._qsize():
174+
remaining = endtime - _time()
175+
if remaining <= 0.0:
176+
raise Empty
177+
self.not_empty.wait(remaining)
178+
item = self._get()
179+
self.not_full.notify()
180+
return item
181+
finally:
182+
self.not_empty.release()
183+
184+
def get_nowait(self):
185+
"""Remove and return an item from the queue without blocking.
186+
187+
Only get an item if one is immediately available. Otherwise
188+
raise the Empty exception.
189+
"""
190+
return self.get(False)
191+
192+
# Override these methods to implement other queue organizations
193+
# (e.g. stack or priority queue).
194+
# These will only be called with appropriate locks held
195+
196+
# Initialize the queue representation
197+
def _init(self, maxsize):
198+
self.queue = deque()
199+
200+
def _qsize(self, len=len):
201+
return len(self.queue)
202+
203+
# Put a new item in the queue
204+
def _put(self, item):
205+
self.queue.append(item)
206+
207+
# Get an item from the queue
208+
def _get(self):
209+
return self.queue.popleft()
210+
211+
212+
class PriorityQueue(Queue):
213+
'''Variant of Queue that retrieves open entries in priority order (lowest first).
214+
215+
Entries are typically tuples of the form: (priority number, data).
216+
'''
217+
218+
def _init(self, maxsize):
219+
self.queue = []
220+
221+
def _qsize(self, len=len):
222+
return len(self.queue)
223+
224+
def _put(self, item, heappush=heapq.heappush):
225+
heappush(self.queue, item)
226+
227+
def _get(self, heappop=heapq.heappop):
228+
return heappop(self.queue)
229+
230+
231+
class LifoQueue(Queue):
232+
'''Variant of Queue that retrieves most recently added entries first.'''
233+
234+
def _init(self, maxsize):
235+
self.queue = []
236+
237+
def _qsize(self, len=len):
238+
return len(self.queue)
239+
240+
def _put(self, item):
241+
self.queue.append(item)
242+
243+
def _get(self):
244+
return self.queue.pop()

0 commit comments

Comments
 (0)