|
39 | 39 | ;;; We need to walk two pointers, current node and next node. Check the current node, delete if required.
|
40 | 40 | ;;; Then move the pointers.
|
41 | 41 |
|
42 |
| -(deftype ManyToManyChannel [^|LinkedList`1| takes ^|LinkedList`1| puts ^|Queue`1| buf closed ^clojure.core.async.impl.mutex.Lock mutex add!] |
| 42 | +(deftype ManyToManyChannel [^|LinkedList`1| takes ^|LinkedList`1| puts ^|Queue`1| buf closed ^clojure.core.async.impl.mutex.ILock mutex add!] |
43 | 43 | MMC
|
44 | 44 | (cleanup
|
45 | 45 | [_]
|
|
87 | 87 | (let [iter (.GetEnumerator puts)]
|
88 | 88 | (when (.MoveNext iter)
|
89 | 89 | (loop [[^ILock putter] (.Current iter)] ;;; ^Lock
|
90 |
| - (let [put-cb (and (impl/active? putter) (impl/commit putter))] |
91 |
| - (.unlock putter) |
92 |
| - (when put-cb |
93 |
| - (dispatch/run (fn [] (put-cb true)))) |
94 |
| - (when (.MoveNext iter) |
95 |
| - (recur (.Current iter))))))) |
| 90 | + (.lock putter) |
| 91 | + (let [put-cb (and (impl/active? putter) (impl/commit putter))] |
| 92 | + (.unlock putter) |
| 93 | + (when put-cb |
| 94 | + (dispatch/run (fn [] (put-cb true)))) |
| 95 | + (when (.MoveNext iter) |
| 96 | + (recur (.Current iter))))))) |
96 | 97 | (.Clear puts) ;;; .clear
|
97 | 98 | (impl/close! this))
|
98 | 99 |
|
|
135 | 136 | [take-cbs (loop [takers []
|
136 | 137 | curr-node (.First takes)]
|
137 | 138 | (if (and curr-node (pos? (count buf)))
|
138 |
| - (let [^ILock taker curr-node ;;; ^Lock |
| 139 | + (let [^ILock taker (.Value curr-node) ;;; ^Lock |
139 | 140 | next-node (.Next curr-node)]
|
140 | 141 | (.lock taker)
|
141 | 142 | (let [ret (and (impl/active? taker) (impl/commit taker))]
|
|
181 | 182 | ;;; (when (.hasNext iter)
|
182 | 183 | ;;; (recur (.next iter)))))))]
|
183 | 184 | [[put-cb take-cb] (when (.First takes)
|
184 |
| - (loop [^ILock taker (.First takes) ;;; ^Lock |
185 |
| - curr-node (.First takes)] |
| 185 | + (loop [curr-node (.First takes) |
| 186 | + ^ILock taker (.Value curr-node)] ;;; ^Lock |
186 | 187 | (if (< (impl/lock-id handler) (impl/lock-id taker))
|
187 | 188 | (do (.lock handler) (.lock taker))
|
188 | 189 | (do (.lock taker) (.lock handler)))
|
|
196 | 197 | ret)
|
197 | 198 | (let [next-node (.Next curr-node)]
|
198 | 199 | (when next-node
|
199 |
| - (recur next-node next-node)))))))] |
| 200 | + (recur next-node (.Value next-node))))))))] |
200 | 201 |
|
201 | 202 | (if (and put-cb take-cb)
|
202 | 203 | (do
|
|
0 commit comments