Skip to content

Commit

Permalink
racket-repl-output: buffer/"batch" stdout/stderr
Browse files Browse the repository at this point in the history
Performance can be bad at either of two extremes: Very many outputs,
or very large outputs. This can arise with stdout or stderr outputs:
e.g. (for ([_ huge-number]) (print something)) or (print huge-thing).
Use a manager thread to mediate and buffer/batch.

(Note that very long lines can still be slow in any Emacs buffer, but
(a) that can be mitigated by so-long and IIUC Emacs 29.1, and (b)
anyway that's not a problem we were contributing.)
  • Loading branch information
greghendershott committed Nov 21, 2023
1 parent 2d1fb86 commit 1d108ed
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 9 deletions.
94 changes: 89 additions & 5 deletions racket/repl-output.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#lang racket/base

(require racket/async-channel
racket/match
"repl-session.rkt")

(provide repl-output-channel
Expand All @@ -14,6 +15,7 @@
repl-output-exit
repl-output-value
repl-output-value-special
make-repl-output-manager
make-repl-output-port
make-repl-error-port
repl-error-port?)
Expand All @@ -37,9 +39,93 @@
;; A channel from which the command-server can sync.
(define repl-output-channel (make-async-channel))

;; This manager thread mediates between the `repl-output' function and
;; the `repl-output-channel` async-channel. It seeks a warm bowl of
;; porridge for the number and size of stdout and stderr outputs.
;;
;; - stdout/stderr items may be held awhile in case the next item is
;; the same kind. A run of consecutive items within a time span are
;; consolidated into one.
;;
;; - On the other hand a very large stdout/stderr item is split into
;; multiple smaller ones.
;;
;; So this is a kind of buffering or "batching", but using a timer
;; instead of needing explicit flushing. At the same time, any
;; non-stdout/stderr kind of output will automatically "flush",
;; including items like 'prompt or 'run, so this works out fine as
;; well.
(struct repl-output-item (kind value))
(define ((repl-output-manager-thread session-id))
(define msec-threshold 500)
(define size-threshold 4096)

(define (put* kind value)
(async-channel-put repl-output-channel
(list 'repl-output session-id kind value)))
(define (put item)
(match item
[(repl-output-item (and kind (or (== 'stdout) (== 'stderr))) bstr)
(define len (bytes-length bstr))
(for ([beg (in-range 0 len size-threshold)])
(put* kind (subbytes bstr beg (min len (+ beg size-threshold)))))]
[(repl-output-item kind value)
(put* kind value)]))

(define pending-item #f)
(define pending-flush-alarm-evt never-evt)

(define (queue item)
(match-define (repl-output-item kind value) item)
(match pending-item
;; No pending item. When the new item is stdout or stderr, and
;; doesn't already exceed the size-threshold, set it as the
;; pending item and start our countdown.
[#f
#:when (and (memq kind '(stdout stderr))
(< (bytes-length value) size-threshold))
(set! pending-item item)
(set! pending-flush-alarm-evt
(alarm-evt (+ (current-inexact-milliseconds)
msec-threshold)))]
;; No pending item. Just send new item now.
[#f
(put item)]
;; There's a pending item. New item is same kind. When appending
;; their values is under the size-threshold, combine them.
[(repl-output-item (== kind) pending-value)
#:when (< (+ (bytes-length pending-value) (bytes-length value))
size-threshold)
(set! pending-item
(repl-output-item kind
(bytes-append pending-value value)))]
;; There's a pending item. Send it then the new item, now.
[(? repl-output-item?)
(flush-pending)
(put item)]))

(define (flush-pending)
(when pending-item
(put pending-item)
(set! pending-item #f))
(set! pending-flush-alarm-evt never-evt))

(let loop ()
(sync (handle-evt (thread-receive-evt)
(λ (_evt) (queue (thread-receive))))
(handle-evt pending-flush-alarm-evt
(λ (_evt) (flush-pending))))
(loop)))

(define (make-repl-output-manager session-id)
(thread (repl-output-manager-thread session-id)))

(define (repl-output kind value)
(async-channel-put repl-output-channel
(list 'repl-output (current-session-id) kind value)))
(define t (current-repl-output-manager))
(when t
(thread-send t
(repl-output-item kind value)
void)))

;; Various wrappers around repl-output:

Expand Down Expand Up @@ -89,10 +175,8 @@

(define (make-repl-port kind)
(define name (format "racket-mode-repl-~a" kind))
(define special-kind (string->symbol (format "~a-special" kind)))
(define (write-out bstr start end non-block? breakable?)
(async-channel-put repl-output-channel
(repl-output kind (subbytes bstr start end)))
(repl-output kind (subbytes bstr start end))
(- end start))
(define close void)
(make-output-port name
Expand Down
5 changes: 5 additions & 0 deletions racket/repl-session.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
current-repl-msg-chan
current-submissions
current-session-maybe-mod
current-repl-output-manager
(struct-out session)
get-session
set-session!
Expand All @@ -24,6 +25,7 @@

(struct session
(thread ;thread? the repl manager thread
repl-out-mgr ;thread? the repl output manager thread
repl-msg-chan ;channel?
submissions ;channel?
maybe-mod ;(or/c #f module-path?)
Expand All @@ -35,6 +37,7 @@

(define (set-session! sid maybe-mod)
(hash-set! sessions sid (session (current-thread)
(current-repl-output-manager)
(current-repl-msg-chan)
(current-submissions)
maybe-mod
Expand All @@ -49,6 +52,7 @@
(define current-repl-msg-chan (make-parameter #f))
(define current-submissions (make-parameter #f))
(define current-session-maybe-mod (make-parameter #f))
(define current-repl-output-manager (make-parameter #f))

;; A way to parameterize e.g. commands that need to work with a
;; specific REPL session. Called from e.g. a command-server thread.
Expand All @@ -57,6 +61,7 @@
[(? session? s)
(log-racket-mode-debug @~a{@~v[@car[args]]: using session ID @~v[sid]})
(parameterize ([current-session-id sid]
[current-repl-output-manager (session-repl-out-mgr s)]
[current-repl-msg-chan (session-repl-msg-chan s)]
[current-submissions (session-submissions s)]
[current-session-maybe-mod (session-maybe-mod s)]
Expand Down
9 changes: 5 additions & 4 deletions racket/repl.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,11 @@
(log-racket-mode-info "starting repl session ~v" session-id)
;; Make pipe for user program input (as distinct form repl-submit
;; input).
(parameterize* ([current-session-id session-id]
[current-repl-msg-chan (make-channel)]
[current-submissions (make-channel)]
[error-display-handler racket-mode-error-display-handler])
(parameterize* ([current-session-id session-id]
[current-repl-output-manager (make-repl-output-manager session-id)]
[current-repl-msg-chan (make-channel)]
[current-submissions (make-channel)]
[error-display-handler racket-mode-error-display-handler])
(set-session! session-id #f)
(do-run
(initial-run-config
Expand Down

0 comments on commit 1d108ed

Please sign in to comment.