|
| 1 | +// Package pahlimiter contains a PollAndHistoryLimiter, used to share resources between polls and history loading, |
| 2 | +// to prevent flooding the server with history requests that will not complete in a reasonable time. |
| 3 | +package pahlimiter |
| 4 | + |
| 5 | +import ( |
| 6 | + "context" |
| 7 | + "errors" |
| 8 | + "sync" |
| 9 | +) |
| 10 | + |
| 11 | +type ( |
| 12 | + // PollAndHistoryLimiter defines an interface used to share request resources between pollers and history iterator |
| 13 | + // funcs, to prevent unsustainable growth of history-loading requests. |
| 14 | + // |
| 15 | + // this is intended to be used with other poller limiters and retry backoffs, not on its own. |
| 16 | + // |
| 17 | + // implementations include: |
| 18 | + // - NewUnlimited (historical behavior, a noop) |
| 19 | + // - NewHistoryLimited (limits history requests, does not limit polls) |
| 20 | + // - NewWeighted (history requests "consume" poll requests, and can reduce or stop polls) |
| 21 | + PollAndHistoryLimiter interface { |
| 22 | + // Poll will try to acquire a poll resource, |
| 23 | + // blocking until it succeeds or the context is canceled. |
| 24 | + // |
| 25 | + // The done func will release the resource - it will always be returned and can be called multiple times, |
| 26 | + // only the first will have an effect. |
| 27 | + // TODO: see if this is necessary... but it's easy and safe. |
| 28 | + Poll(context.Context) (ok bool, done func()) |
| 29 | + // GetHistory will try to acquire a history-downloading resource, |
| 30 | + // blocking until it succeeds or the context is canceled. |
| 31 | + // |
| 32 | + // The done func will release the resource - it will always be returned and can be called multiple times, |
| 33 | + // only the first will have an effect. |
| 34 | + // TODO: see if this is necessary... but it's easy and safe. |
| 35 | + GetHistory(context.Context) (ok bool, done func()) |
| 36 | + |
| 37 | + // Close will clean up any resources, call at worker shutdown. |
| 38 | + // This blocks until they are cleaned up. |
| 39 | + Close() |
| 40 | + } |
| 41 | + unlimited struct{} |
| 42 | + history struct { |
| 43 | + tokens chan struct{} // sized at startup |
| 44 | + } |
| 45 | + weighted struct { |
| 46 | + stopOnce sync.Once |
| 47 | + |
| 48 | + // close to clean up resources |
| 49 | + stop chan struct{} |
| 50 | + // closed when cleaned up |
| 51 | + stopped chan struct{} |
| 52 | + |
| 53 | + // used to signal history requests starting and stopping |
| 54 | + historyStart, historyDone chan struct{} |
| 55 | + // used to signal poll requests starting and stopping |
| 56 | + pollStart, pollDone chan struct{} |
| 57 | + } |
| 58 | +) |
| 59 | + |
| 60 | +var _ PollAndHistoryLimiter = (*unlimited)(nil) |
| 61 | +var _ PollAndHistoryLimiter = (*history)(nil) |
| 62 | +var _ PollAndHistoryLimiter = (*weighted)(nil) |
| 63 | + |
| 64 | +// NewUnlimited creates a new "unlimited" poll-and-history limiter, which does not constrain either operation. |
| 65 | +// This is the default, historical behavior. |
| 66 | +func NewUnlimited() (PollAndHistoryLimiter, error) { |
| 67 | + return (*unlimited)(nil), nil |
| 68 | +} |
| 69 | + |
| 70 | +func (*unlimited) Poll(_ context.Context) (ok bool, done func()) { return true, func() {} } |
| 71 | +func (*unlimited) GetHistory(_ context.Context) (ok bool, done func()) { return true, func() {} } |
| 72 | +func (*unlimited) Close() {} |
| 73 | + |
| 74 | +// NewHistoryLimited creates a simple limiter, which allows a specified number of concurrent history requests, |
| 75 | +// and does not limit polls at all. |
| 76 | +// |
| 77 | +// This implementation is NOT expected to be used widely, but it exists as a trivially-safe fallback implementation |
| 78 | +// that will still behave better than the historical default. |
| 79 | +// |
| 80 | +// This is very simple and should be sufficient to stop request floods during rate-limiting with many pending decision |
| 81 | +// tasks, but seems likely to allow too many workflows to *attempt* to make progress on a host, starving progress |
| 82 | +// when the sticky cache is higher than this size and leading to interrupted or timed out decision tasks. |
| 83 | +func NewHistoryLimited(concurrentHistoryRequests int) (PollAndHistoryLimiter, error) { |
| 84 | + l := &history{ |
| 85 | + tokens: make(chan struct{}, concurrentHistoryRequests), |
| 86 | + } |
| 87 | + // fill the token buffer |
| 88 | + for i := 0; i < concurrentHistoryRequests; i++ { |
| 89 | + l.tokens <- struct{}{} |
| 90 | + } |
| 91 | + return l, nil |
| 92 | +} |
| 93 | + |
| 94 | +func (p *history) Poll(_ context.Context) (ok bool, done func()) { return true, func() {} } |
| 95 | +func (p *history) Close() {} |
| 96 | +func (p *history) GetHistory(ctx context.Context) (ok bool, done func()) { |
| 97 | + select { |
| 98 | + case <-p.tokens: |
| 99 | + var once sync.Once |
| 100 | + return true, func() { |
| 101 | + once.Do(func() { |
| 102 | + p.tokens <- struct{}{} |
| 103 | + }) |
| 104 | + } |
| 105 | + case <-ctx.Done(): |
| 106 | + return false, func() {} // canceled, nothing to release |
| 107 | + } |
| 108 | +} |
| 109 | + |
| 110 | +// NewWeighted creates a new "weighted" poll-and-handler limiter, which shares resources between history requests |
| 111 | +// and polls. |
| 112 | +// |
| 113 | +// Each running poll or history request consumes its weight in total available (capped at max) resources, and one |
| 114 | +// request type is allowed to reduce resources for or starve the other completely. |
| 115 | +// |
| 116 | +// Since this runs "inside" other poller limiting, having equal or lesser poll-resources than the poller limiter |
| 117 | +// will allow history requests to block polls... and if history weights are lower, they can perpetually starve polls |
| 118 | +// by not releasing enough resources. |
| 119 | +// |
| 120 | +// **This is intended behavior**, as it can be used to cause a heavily-history-loading worker to stop pulling more |
| 121 | +// workflows that may also need their history loaded, until some resources free up. |
| 122 | +// |
| 123 | +// --- |
| 124 | +// |
| 125 | +// The reverse situation, where history resources cannot prevent polls, may lead to some undesirable behavior. |
| 126 | +// Continually adding workflows while not allowing them to pull history degrades to NewHistoryLimited behavior: |
| 127 | +// it is easily possible to have hundreds or thousands of workflows trying to load history, but few or none of them |
| 128 | +// are allowed through this limiter to actually perform that request. |
| 129 | +// |
| 130 | +// In this situation it will still limit the number of actual concurrent requests to load history, but with a very |
| 131 | +// large increase in complexity. If you want this, strongly consider just using NewHistoryLimited. |
| 132 | +// |
| 133 | +// --- |
| 134 | +// |
| 135 | +// All that said: this is NOT built to be a reliable blocker of polls for at least two reasons: |
| 136 | +// - History iterators do not hold their resources between loading (and executing) pages of history, causing a gap |
| 137 | +// where a poller could claim resources despite the service being "too busy" loading history from a human's view. |
| 138 | +// - History iterators race with polls. If enough resources are available and both possibilities can be satisfied, |
| 139 | +// Go chooses fairly between them. |
| 140 | +// |
| 141 | +// To reduce the chance of this happening, keep history weights relatively small compared to polls, so many concurrent |
| 142 | +// workflows loading history will be unlikely to free up enough resources for a poll to occur. |
| 143 | +func NewWeighted(pollRequestWeight, historyRequestWeight, maxResources int) (PollAndHistoryLimiter, error) { |
| 144 | + if historyRequestWeight > maxResources || pollRequestWeight > maxResources { |
| 145 | + return nil, errors.New("weights must be less than max resources, or no requests can be sent") |
| 146 | + } |
| 147 | + |
| 148 | + l := &weighted{ |
| 149 | + stopOnce: sync.Once{}, |
| 150 | + stop: make(chan struct{}), |
| 151 | + stopped: make(chan struct{}), |
| 152 | + historyStart: make(chan struct{}), |
| 153 | + historyDone: make(chan struct{}), |
| 154 | + pollStart: make(chan struct{}), |
| 155 | + pollDone: make(chan struct{}), |
| 156 | + } |
| 157 | + l.init(pollRequestWeight, historyRequestWeight, maxResources) |
| 158 | + return l, nil |
| 159 | +} |
| 160 | + |
| 161 | +func (p *weighted) init(pollRequestWeight, historyRequestWeight, maxResources int) { |
| 162 | + // mutated only by the actor goroutine |
| 163 | + available := maxResources |
| 164 | + |
| 165 | + // start an actor-goroutine to simplify concurrency logic with many possibilities at any time. |
| 166 | + // all logic is decided single-threaded, run by this goroutine, and every operation (except stop) is blocking. |
| 167 | + // |
| 168 | + // this actor only sends to history/poll channels. |
| 169 | + // modifying functions only read from them. |
| 170 | + // both read from "stop" and "stopped". |
| 171 | + // |
| 172 | + // - by reading from a channel, the caller has successfully acquired or released resources, and it can immediately proceed. |
| 173 | + // - by sending on a channel, this actor has observed that resources are changed, and it must update its state. |
| 174 | + // - by closing `p.stop`, this limiter will stop reading from channels. |
| 175 | + // - ALL channel operations (except stop) will block forever. |
| 176 | + // - this means "xDone" resource-releasing must also read from `p.stop`. |
| 177 | + // - because `p.stop` races with other channel operations, stop DOES NOT guarantee no further polls will start, |
| 178 | + // even on the same goroutine, until `Close()` returns. |
| 179 | + // - this is one reason why `Close()` waits for the actor to exit. without it, you do not have sequential |
| 180 | + // logic guarantees. |
| 181 | + // - you can `Close()` any number of times from any goroutines, all calls will wait for the actor to stop. |
| 182 | + // |
| 183 | + // all operations are "fast", and it must remain this way. |
| 184 | + // callers block while waiting on this actor, including when releasing resources. |
| 185 | + go func() { |
| 186 | + defer func() { close(p.stopped) }() |
| 187 | + for { |
| 188 | + // every branch must: |
| 189 | + // 1. read from `p.stop`, so this limiter can be stopped. |
| 190 | + // 2. write to "done" chans, so resources can be freed. |
| 191 | + // 3. optionally write to "start" chans, so resources can be acquired |
| 192 | + // |
| 193 | + // doing otherwise for any reason risks deadlocks or invalid resource values. |
| 194 | + |
| 195 | + if available >= pollRequestWeight && available >= historyRequestWeight { |
| 196 | + // resources available for either == wait for either |
| 197 | + select { |
| 198 | + case <-p.stop: |
| 199 | + return |
| 200 | + |
| 201 | + case p.historyStart <- struct{}{}: |
| 202 | + available -= historyRequestWeight |
| 203 | + case p.pollStart <- struct{}{}: |
| 204 | + available -= pollRequestWeight |
| 205 | + |
| 206 | + case p.historyDone <- struct{}{}: |
| 207 | + available += historyRequestWeight |
| 208 | + case p.pollDone <- struct{}{}: |
| 209 | + available += pollRequestWeight |
| 210 | + } |
| 211 | + } else if available >= pollRequestWeight && available < historyRequestWeight { |
| 212 | + // only poll resources available |
| 213 | + select { |
| 214 | + case <-p.stop: |
| 215 | + return |
| 216 | + |
| 217 | + // case p.historyStart <- struct{}{}: // insufficient resources |
| 218 | + case p.pollStart <- struct{}{}: |
| 219 | + available -= pollRequestWeight |
| 220 | + |
| 221 | + case p.historyDone <- struct{}{}: |
| 222 | + available += historyRequestWeight |
| 223 | + case p.pollDone <- struct{}{}: |
| 224 | + available += pollRequestWeight |
| 225 | + } |
| 226 | + } else if available < pollRequestWeight && available >= historyRequestWeight { |
| 227 | + // only history resources available |
| 228 | + select { |
| 229 | + case <-p.stop: |
| 230 | + return |
| 231 | + |
| 232 | + case p.historyStart <- struct{}{}: |
| 233 | + available -= historyRequestWeight |
| 234 | + // case p.pollStart <- struct{}{}: // insufficient resources |
| 235 | + |
| 236 | + case p.historyDone <- struct{}{}: |
| 237 | + available += historyRequestWeight |
| 238 | + case p.pollDone <- struct{}{}: |
| 239 | + available += pollRequestWeight |
| 240 | + } |
| 241 | + } else { |
| 242 | + // no resources for either, wait for something to finish |
| 243 | + select { |
| 244 | + case <-p.stop: |
| 245 | + return |
| 246 | + |
| 247 | + // case p.historyStart <- struct{}{}: // insufficient resources |
| 248 | + // case p.pollStart <- struct{}{}: // insufficient resources |
| 249 | + |
| 250 | + case p.historyDone <- struct{}{}: |
| 251 | + available += historyRequestWeight |
| 252 | + case p.pollDone <- struct{}{}: |
| 253 | + available += pollRequestWeight |
| 254 | + } |
| 255 | + } |
| 256 | + } |
| 257 | + }() |
| 258 | +} |
| 259 | + |
| 260 | +func (p *weighted) Close() { |
| 261 | + p.stopOnce.Do(func() { |
| 262 | + close(p.stop) |
| 263 | + }) |
| 264 | + <-p.stopped |
| 265 | +} |
| 266 | + |
| 267 | +func (p *weighted) Poll(ctx context.Context) (ok bool, done func()) { |
| 268 | + select { |
| 269 | + case <-ctx.Done(): |
| 270 | + return false, func() {} // canceled |
| 271 | + case <-p.stop: |
| 272 | + return false, func() {} // shutting down |
| 273 | + case <-p.pollStart: |
| 274 | + // resource acquired |
| 275 | + var once sync.Once |
| 276 | + return true, func() { |
| 277 | + once.Do(func() { |
| 278 | + select { |
| 279 | + case <-p.pollDone: // released |
| 280 | + case <-p.stop: // shutting down |
| 281 | + } |
| 282 | + }) |
| 283 | + } |
| 284 | + } |
| 285 | +} |
| 286 | + |
| 287 | +func (p *weighted) GetHistory(ctx context.Context) (ok bool, done func()) { |
| 288 | + select { |
| 289 | + case <-ctx.Done(): |
| 290 | + return false, func() {} // canceled |
| 291 | + case <-p.stop: |
| 292 | + return false, func() {} // shutting down |
| 293 | + case <-p.historyStart: |
| 294 | + // resource acquired |
| 295 | + var once sync.Once |
| 296 | + return true, func() { |
| 297 | + once.Do(func() { |
| 298 | + select { |
| 299 | + case <-p.historyDone: // released |
| 300 | + case <-p.stop: // shutting down |
| 301 | + } |
| 302 | + }) |
| 303 | + } |
| 304 | + } |
| 305 | +} |
0 commit comments