Skip to content
30 changes: 29 additions & 1 deletion src/manifold/debug.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns manifold.debug
{:no-doc true})
{:no-doc true}
(:require [clojure.tools.logging :as log]))

(def ^:dynamic *dropped-error-logging-enabled?* true)

Expand All @@ -8,3 +9,30 @@

(defn disable-dropped-error-logging! []
(.bindRoot #'*dropped-error-logging-enabled?* false))

(def ^:dynamic *leak-aware-deferred-rate* 1024)

(defn set-leak-aware-deferred-rate! [n]
(.bindRoot #'*leak-aware-deferred-rate* n))

(def dropped-errors nil)

(defn log-dropped-error! [error]
(some-> dropped-errors (swap! inc))
(log/warn error "unconsumed deferred in error state, make sure you're using `catch`."))

(defn with-dropped-error-detection
"Calls f, then attempts to trigger dropped errors to be detected and finally calls
handle-dropped-errors with the number of detected dropped errors. Details about these are logged
as warnings."
[f handle-dropped-errors]
(assert (nil? dropped-errors) "with-dropped-error-detection may not be nested")
;; Flush out any pending dropped errors from before
(System/gc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was told before that System/gc is more of a suggestion to the VM than an actual call

Did you observe this working reliably?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, there is no guarantee that System/gc actually triggers a GC, let alone whether it reclaims all garbage. Quoting from the docs

Calling the gc method suggests that the Java Virtual Machine expend effort toward recycling unused objects in order to make the memory they currently occupy available for reuse by the Java Virtual Machine. When control returns from the method call, the Java Virtual Machine has made a best effort to reclaim space from all unused objects. There is no guarantee that this effort will recycle any particular number of unused objects, reclaim any particular amount of space, or complete at any particular time, if at all, before the method returns or ever. There is also no guarantee that this effort will determine the change of reachability in any particular number of objects, or that any particular number of Reference objects will be cleared and enqueued.

However, it does work pretty well in practice, see for example the failing tests for this PR which uses OpenJDK 8. Locally, I've also successfully used it with OpenJDK 21. So yeah, it's not deterministic but still quite useful as evidenced by the issues it uncovered 🙂

(System/runFinalization)
(with-redefs [dropped-errors (atom 0)]
(f)
;; Flush out any errors which were dropped during f
(System/gc)
(System/runFinalization)
(handle-dropped-errors @dropped-errors)))
29 changes: 19 additions & 10 deletions src/manifold/deferred.clj
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,10 @@
[Object
(finalize [_]
(utils/with-lock lock
(when (and (identical? ::error state) (not consumed?))
(log/warn val "unconsumed deferred in error state, make sure you're using `catch`."))))]
(when (and (identical? ::error state)
(not consumed?)
debug/*dropped-error-logging-enabled?*)
(debug/log-dropped-error! val))))]
nil)

clojure.lang.IReference
Expand Down Expand Up @@ -631,7 +633,7 @@
(when (and
(not consumed?)
debug/*dropped-error-logging-enabled?*)
(log/warn error "unconsumed deferred in error state, make sure you're using `catch`.")))
(debug/log-dropped-error! error)))

clojure.lang.IReference
(meta [_] mta)
Expand Down Expand Up @@ -690,8 +692,9 @@
([]
(deferred (ex/executor)))
([executor]
(if (and (p/zero? (rem (.incrementAndGet created) 1024))
debug/*dropped-error-logging-enabled?*)
(if (and debug/*dropped-error-logging-enabled?*
(p/zero? (rem (.incrementAndGet created)
^long debug/*leak-aware-deferred-rate*)))
(LeakAwareDeferred. nil ::unset nil (utils/mutex) (LinkedList.) nil false executor)
(Deferred. nil ::unset nil (utils/mutex) (LinkedList.) nil false executor)))))

Expand Down Expand Up @@ -1290,9 +1293,12 @@
(let [timeout-d (time/in interval
#(error! d
(TimeoutException.
(str "timed out after " interval " milliseconds"))))]
(chain d (fn [_]
(success! timeout-d true)))))
(str "timed out after " interval " milliseconds"))))]
(on-realized d
(fn [_]
(success! timeout-d true))
(fn [_]
(success! timeout-d false)))))
d)
([d interval timeout-value]
(cond
Expand All @@ -1305,8 +1311,11 @@
:else
(let [timeout-d (time/in interval
#(success! d timeout-value))]
(chain d (fn [_]
(success! timeout-d true)))))
(on-realized d
(fn [_]
(success! timeout-d true))
(fn [_]
(success! timeout-d false)))))
d))

(deftype+ Recur [s]
Expand Down
7 changes: 2 additions & 5 deletions src/manifold/go_off.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@
(defn ^:no-doc run-state-machine-wrapped [state]
(try (async-runtime/run-state-machine state)
(catch Throwable ex
(d/error! (async-runtime/aget-object state async-runtime/USER-START-IDX) ex)
(throw ex))))
(d/error! (async-runtime/aget-object state async-runtime/USER-START-IDX) ex))))

(defn ^:no-doc take! [state blk d]
(let [handler (fn [x]
Expand All @@ -69,9 +68,7 @@
:recur)

;; resume processing state machine once d has been realized
(do (-> d
(d/chain handler)
(d/catch handler))
(do (d/on-realized d handler handler)
nil))))))

(def ^:no-doc async-custom-terminators
Expand Down
6 changes: 3 additions & 3 deletions src/manifold/stream.clj
Original file line number Diff line number Diff line change
Expand Up @@ -816,14 +816,14 @@
(d/chain' (take! in ::none)
(fn [s']
(cond
(closed? out)
(close! s')

(identical? ::none s')
(do
(close! out)
s')

(closed? out)
(close! s')

:else
(d/loop []
(d/chain' (take! s' ::none)
Expand Down
16 changes: 12 additions & 4 deletions src/manifold/time.clj
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
;;;

(in-ns 'manifold.deferred)
(clojure.core/declare success! error! deferred realized? chain connect)
(clojure.core/declare success! error! deferred realized? chain connect on-realized)
(in-ns 'manifold.time)

;;;
Expand Down Expand Up @@ -191,8 +191,14 @@
(reify
IClock
(in [this interval-millis f]
(swap! events update-in [(p/+ ^double @now interval-millis)] #(conj (or % []) f))
(advance this 0))
(let [t (p/+ ^double @now interval-millis)
cancel-fn (fn []
(swap! events #(cond-> %
(contains? % t)
(update t disj f))))]
(swap! events update t (fnil conj #{}) f)
(advance this 0)
cancel-fn))
(every [this delay-millis period-millis f]
(assert (p/< 0.0 period-millis))
(let [period (atom period-millis)
Expand Down Expand Up @@ -263,7 +269,9 @@
(catch Throwable e
(manifold.deferred/error! d e)))))
cancel-fn (.in *clock* interval f)]
(manifold.deferred/chain d (fn [_] (cancel-fn)))
(manifold.deferred/on-realized d
(fn [_] (cancel-fn))
(fn [_] (cancel-fn)))
d))

(defn every
Expand Down
2 changes: 2 additions & 0 deletions test/manifold/bus_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
d (b/publish! b (long 1) 42)]
(is (= 42 @(s/take! s)))
(is (= true @d))))

(instrument-tests-with-dropped-error-detection!)
3 changes: 3 additions & 0 deletions test/manifold/deferred_stage_test.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns manifold.deferred-stage-test
(:require [manifold.deferred :as d]
[manifold.test-utils :refer :all]
[manifold.utils :refer
[fn->Function fn->Consumer fn->BiFunction fn->BiConsumer]]
[clojure.test :refer [deftest is testing]])
Expand Down Expand Up @@ -598,3 +599,5 @@
(d/success-deferred (d/success-deferred x)))))]

(is (d/deferred? @d2)))))

(instrument-tests-with-dropped-error-detection!)
87 changes: 65 additions & 22 deletions test/manifold/deferred_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
(:require
[clojure.test :refer :all]
[manifold.test-utils :refer :all]
[manifold.debug :as debug]
[manifold.deferred :as d]
[manifold.executor :as ex])
(:import
(java.util.concurrent
CompletableFuture
CompletionStage)
CompletionStage
TimeoutException)
(manifold.deferred IDeferred)))

(defmacro future' [& body]
Expand Down Expand Up @@ -237,6 +239,28 @@
(is (= 1 @d))
(is (= 1 (deref d 10 :foo)))))

(deftest test-timeout
(testing "exception by default"
(let [d (d/deferred)
t (d/timeout! d 1)]
(is (identical? d t))
(is (thrown-with-msg? TimeoutException
#"^timed out after 1 milliseconds$"
(deref d 100 ::error)))))

(testing "custom default value"
(let [d (d/deferred)
t (d/timeout! d 1 ::timeout)]
(is (identical? d t))
(is (deref (capture-success d ::timeout) 100 ::error))))

(testing "error before timeout"
(let [ex (Exception.)
d (d/deferred)
t (d/timeout! d 1000)]
(d/error! d ex)
(is (= ex (deref (capture-error t) 10 ::error))))))

(deftest test-loop
;; body produces a non-deferred value
(is @(capture-success
Expand Down Expand Up @@ -285,21 +309,35 @@
(deftest test-finally
(let [target-d (d/deferred)
d (d/deferred)
fd (d/finally
d
(fn []
(d/success! target-d ::delivered)))]
fd (-> d
(d/finally
(fn []
(d/success! target-d ::delivered)))
;; to silence dropped error detection
(d/catch identity))]
(d/error! d (Exception.))
(is (= ::delivered (deref target-d 0 ::not-delivered)))))

(deftest test-alt
(is (#{1 2 3} @(d/alt 1 2 3)))
(is (= 2 @(d/alt (d/future (Thread/sleep 10) 1) 2)))

(is (= 2 @(d/alt (d/future (Thread/sleep 10) (throw (Exception. "boom"))) 2)))

(is (thrown-with-msg? Exception #"boom"
@(d/alt (d/future (throw (Exception. "boom"))) (d/future (Thread/sleep 10)))))
(let [d (d/deferred)
a (d/alt d 2)]
(d/success! d 1)
(is (= 2 @a)))

(let [d (d/deferred)
a (d/alt d 2)]
(doto d
(d/error! (Exception. "boom 1"))
;; to silence dropped error detection
(d/catch identity))
(is (= 2 @a)))

(let [e (d/error-deferred (Exception. "boom 2"))
d (d/deferred)
a (d/alt e d)]
(d/success! d 1)
(is (thrown-with-msg? Exception #"boom" @a)))

(testing "uniformly distributed"
(let [results (atom {})
Expand All @@ -314,7 +352,7 @@

;;;

(deftest ^:benchmark benchmark-chain
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-chain
(bench "invoke comp x1"
((comp inc) 0))
(bench "chain x1"
Expand All @@ -334,7 +372,7 @@
(bench "chain' x5"
@(d/chain' 0 inc inc inc inc inc)))

(deftest ^:benchmark benchmark-deferred
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-deferred
(bench "create deferred"
(d/deferred))
(bench "add-listener and success"
Expand Down Expand Up @@ -381,16 +419,19 @@
(deliver d 1)
@d)))

(deftest ^:stress test-error-leak-detection

(d/error-deferred (Throwable.))
(System/gc)
(deftest ^:ignore-dropped-errors ^:stress test-error-leak-detection
(testing "error-deferred always detects dropped errors"
(expect-dropped-errors 1
(d/error-deferred (Throwable.))))

(dotimes [_ 2e3]
(d/error! (d/deferred) (Throwable.)))
(System/gc))
(testing "regular deferreds detect errors on every debug/*leak-aware-deferred-rate*'th instance (1024 by default)"
(expect-dropped-errors 2
;; Explicitly restating the (current) default here for clarity
(binding [debug/*leak-aware-deferred-rate* 1024]
(dotimes [_ 2048]
(d/error! (d/deferred) (Throwable.)))))))

(deftest ^:stress test-deferred-chain
(deftest ^:ignore-dropped-errors ^:stress test-deferred-chain
(dotimes [_ 1e4]
(let [d (d/deferred)
result (d/future
Expand All @@ -401,7 +442,7 @@
(d/connect % d')
d')
d))))]
(Thread/sleep (rand-int 10))
(Thread/sleep ^long (rand-int 10))
(d/success! d 1)
(is (= 1 @@result)))))

Expand Down Expand Up @@ -431,3 +472,5 @@

(finally
(remove-method print-method CompletionStage))))

(instrument-tests-with-dropped-error-detection!)
5 changes: 4 additions & 1 deletion test/manifold/executor_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns manifold.executor-test
(:require
[clojure.test :refer :all]
[manifold.executor :as e])
[manifold.executor :as e]
[manifold.test-utils :refer :all])
(:import
[io.aleph.dirigiste
Executor
Expand Down Expand Up @@ -68,3 +69,5 @@
500)
thread (.newThread tf (constantly nil))]
(is (= "custom-name" (.getName thread)))))

(instrument-tests-with-dropped-error-detection!)
12 changes: 7 additions & 5 deletions test/manifold/go_off_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@
(testing "timeouts"
(is (= ::timeout @(go-off (<!? (d/timeout! (d/deferred) 10 ::timeout)))))
(is (= ::timeout @(d/timeout! (go-off (<!? (d/deferred))) 10 ::timeout)))
(is (thrown? TimeoutException @(go-off (<!? (d/timeout! (d/deferred) 10)))))
(is (thrown? TimeoutException @(d/timeout! (go-off (<!? (d/deferred))) 10))))
(is (thrown? TimeoutException @(go-off (<!? (d/timeout! (d/deferred) 11)))))
(is (thrown? TimeoutException @(d/timeout! (go-off (<!? (d/deferred))) 12))))

(testing "alt"
(is (= ::timeout @(go-off (<!? (d/alt (d/deferred) (d/timeout! (d/deferred) 10 ::timeout))))))
(is (= ::timeout @(d/alt (go-off (<!? (d/deferred))) (d/timeout! (d/deferred) 10 ::timeout))))
(is (= ::timeout @(go-off (<!? (d/alt (d/deferred) (d/timeout! (d/deferred) 13 ::timeout))))))
(is (= ::timeout @(d/alt (go-off (<!? (d/deferred))) (d/timeout! (d/deferred) 14 ::timeout))))
(is (= 1 @(go-off (<!? (d/alt (d/deferred) (d/success-deferred 1))))))
(is (= 1 @(d/alt (go-off (<!? (d/deferred))) (d/success-deferred 1))))))

Expand Down Expand Up @@ -149,7 +149,7 @@
(s/close! test-stream)
(is (= @test-d [0 1 2 nil nil]))))

(deftest ^:benchmark benchmark-go-off
(deftest ^:ignore-dropped-errors ^:benchmark benchmark-go-off
(bench "invoke comp x1"
((comp inc) 0))
(bench "go-off x1"
Expand All @@ -174,3 +174,5 @@
@(go-off (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? (d/success-deferred 0))))))))))))
(bench "go-off future 200 x5"
@(go-off (inc (<!? (inc (<!? (inc (<!? (inc (<!? (inc (<!? (d/future (Thread/sleep 200) 0)))))))))))))))

(instrument-tests-with-dropped-error-detection!)
Loading