-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathringbuffer.py
More file actions
137 lines (105 loc) · 5.12 KB
/
ringbuffer.py
File metadata and controls
137 lines (105 loc) · 5.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
class RingBuffer:
'''
A class that implements a circular buffer (ring buffer) with a fixed maximum capacity.
The ring buffer operates in a FIFO (First In, First Out) manner, where new items
overwrite the oldest ones if the buffer is full. It allows enqueuing items to the rear
and dequeuing items from the front efficiently using modular arithmetic.
'''
def __init__(self, capacity: int):
'''
Initialize an empty ring buffer with a given maximum capacity.
Args:
capacity (int): The maximum number of elements the buffer can hold.
The buffer is initialized with empty slots, and internal pointers (_front and _rear)
are used to track the front and rear positions within the buffer.
'''
self.MAX_CAP = capacity # Maximum capacity of the ring buffer
self._front = 0 # Index of the front element (dequeue position)
self._rear = 0 # Index of the rear element (enqueue position)
self._size = 0 # Current number of elements in the buffer
self.buffer = [None] * capacity # Allocate buffer space with None as placeholders
def size(self) -> int:
'''
Get the number of items currently in the buffer.
Returns:
int: The current number of items in the buffer.
'''
return self._size
def is_empty(self) -> bool:
'''
Check whether the buffer is empty.
Returns:
bool: True if the buffer is empty, False otherwise.
The buffer is considered empty when the number of elements (_size) is zero.
'''
return self._size == 0
def is_full(self) -> bool:
'''
Check whether the buffer is full.
Returns:
bool: True if the buffer is full, False otherwise.
The buffer is considered full when the number of elements (_size) equals the
maximum capacity.
'''
return self._size == self.MAX_CAP
def enqueue(self, x: float):
'''
Add an item `x` to the rear of the buffer.
Args:
x (float): The item to be added to the buffer.
Raises:
RingBufferFull: If the buffer is full and cannot accommodate new items.
The item is added to the position indicated by the rear pointer, and the rear
pointer is moved forward using modular arithmetic. If the buffer is full, an
exception is raised.
'''
if self.is_full():
raise RingBufferFull(f"The buffer is full, cannot add {x} to the buffer.")
self.buffer[self._rear] = x # Add the item at the rear position
self._rear = (self._rear + 1) % self.MAX_CAP # Move rear pointer forward (wrap around if needed)
self._size += 1 # Increment the size of the buffer
def dequeue(self) -> float:
'''
Remove and return the item from the front of the buffer.
Returns:
float: The item that was at the front of the buffer.
Raises:
RingBufferEmpty: If the buffer is empty and there are no items to remove.
The front item is returned and removed from the buffer. The front pointer is moved
forward using modular arithmetic, and the size of the buffer is decreased.
'''
if self.is_empty():
raise RingBufferEmpty("The buffer is empty, there is nothing to remove.")
# Get the item at the front position
item = self.buffer[self._front]
# Move the front pointer forward (wrap around if needed)
self._front = (self._front + 1) % self.MAX_CAP
self._size -= 1 # Decrease the size of the buffer
return item # Return the dequeued item
def peek(self) -> float:
'''
Return (but do not remove) the item from the front of the buffer.
Returns:
float: The item currently at the front of the buffer.
Raises:
RingBufferEmpty: If the buffer is empty and there is nothing to peek at.
This method allows you to inspect the front item without removing it from the buffer.
'''
if self.is_empty():
raise RingBufferEmpty("The buffer is empty, there is nothing to see.")
return self.buffer[self._front] # Return the item at the front without removing it
class RingBufferFull(Exception):
'''
Custom exception raised when attempting to add an item to a full ring buffer.
This exception is thrown by the `enqueue` method when the buffer is full, signaling
that no more items can be added until space is freed.
'''
pass
class RingBufferEmpty(Exception):
'''
Custom exception raised when attempting to remove or peek an item from an empty ring buffer.
This exception is thrown by the `dequeue` and `peek` methods when the buffer is empty,
signaling that there are no items available to remove or inspect.
'''
pass