diff --git a/racket/repl-output.rkt b/racket/repl-output.rkt index c7857cf2..df50da60 100644 --- a/racket/repl-output.rkt +++ b/racket/repl-output.rkt @@ -4,6 +4,7 @@ #lang racket/base (require racket/async-channel + racket/match "repl-session.rkt") (provide repl-output-channel @@ -14,6 +15,7 @@ repl-output-exit repl-output-value repl-output-value-special + make-repl-output-manager make-repl-output-port make-repl-error-port repl-error-port?) @@ -37,9 +39,93 @@ ;; A channel from which the command-server can sync. (define repl-output-channel (make-async-channel)) +;; This manager thread mediates between the `repl-output' function and +;; the `repl-output-channel` async-channel. It seeks a warm bowl of +;; porridge for the number and size of stdout and stderr outputs. +;; +;; - stdout/stderr items may be held awhile in case the next item is +;; the same kind. A run of consecutive items within a time span are +;; consolidated into one. +;; +;; - On the other hand a very large stdout/stderr item is split into +;; multiple smaller ones. +;; +;; So this is a kind of buffering or "batching", but using a timer +;; instead of needing explicit flushing. At the same time, any +;; non-stdout/stderr kind of output will automatically "flush", +;; including items like 'prompt or 'run, so this works out fine as +;; well. +(struct repl-output-item (kind value)) +(define ((repl-output-manager-thread session-id)) + (define msec-threshold 500) + (define size-threshold 4096) + + (define (put* kind value) + (async-channel-put repl-output-channel + (list 'repl-output session-id kind value))) + (define (put item) + (match item + [(repl-output-item (and kind (or (== 'stdout) (== 'stderr))) bstr) + (define len (bytes-length bstr)) + (for ([beg (in-range 0 len size-threshold)]) + (put* kind (subbytes bstr beg (min len (+ beg size-threshold)))))] + [(repl-output-item kind value) + (put* kind value)])) + + (define pending-item #f) + (define pending-flush-alarm-evt never-evt) + + (define (queue item) + (match-define (repl-output-item kind value) item) + (match pending-item + ;; No pending item. When the new item is stdout or stderr, and + ;; doesn't already exceed the size-threshold, set it as the + ;; pending item and start our countdown. + [#f + #:when (and (memq kind '(stdout stderr)) + (< (bytes-length value) size-threshold)) + (set! pending-item item) + (set! pending-flush-alarm-evt + (alarm-evt (+ (current-inexact-milliseconds) + msec-threshold)))] + ;; No pending item. Just send new item now. + [#f + (put item)] + ;; There's a pending item. New item is same kind. When appending + ;; their values is under the size-threshold, combine them. + [(repl-output-item (== kind) pending-value) + #:when (< (+ (bytes-length pending-value) (bytes-length value)) + size-threshold) + (set! pending-item + (repl-output-item kind + (bytes-append pending-value value)))] + ;; There's a pending item. Send it then the new item, now. + [(? repl-output-item?) + (flush-pending) + (put item)])) + + (define (flush-pending) + (when pending-item + (put pending-item) + (set! pending-item #f)) + (set! pending-flush-alarm-evt never-evt)) + + (let loop () + (sync (handle-evt (thread-receive-evt) + (λ (_evt) (queue (thread-receive)))) + (handle-evt pending-flush-alarm-evt + (λ (_evt) (flush-pending)))) + (loop))) + +(define (make-repl-output-manager session-id) + (thread (repl-output-manager-thread session-id))) + (define (repl-output kind value) - (async-channel-put repl-output-channel - (list 'repl-output (current-session-id) kind value))) + (define t (current-repl-output-manager)) + (when t + (thread-send t + (repl-output-item kind value) + void))) ;; Various wrappers around repl-output: @@ -89,10 +175,8 @@ (define (make-repl-port kind) (define name (format "racket-mode-repl-~a" kind)) - (define special-kind (string->symbol (format "~a-special" kind))) (define (write-out bstr start end non-block? breakable?) - (async-channel-put repl-output-channel - (repl-output kind (subbytes bstr start end))) + (repl-output kind (subbytes bstr start end)) (- end start)) (define close void) (make-output-port name diff --git a/racket/repl-session.rkt b/racket/repl-session.rkt index 245466d6..014be91f 100644 --- a/racket/repl-session.rkt +++ b/racket/repl-session.rkt @@ -12,6 +12,7 @@ current-repl-msg-chan current-submissions current-session-maybe-mod + current-repl-output-manager (struct-out session) get-session set-session! @@ -24,6 +25,7 @@ (struct session (thread ;thread? the repl manager thread + repl-out-mgr ;thread? the repl output manager thread repl-msg-chan ;channel? submissions ;channel? maybe-mod ;(or/c #f module-path?) @@ -35,6 +37,7 @@ (define (set-session! sid maybe-mod) (hash-set! sessions sid (session (current-thread) + (current-repl-output-manager) (current-repl-msg-chan) (current-submissions) maybe-mod @@ -49,6 +52,7 @@ (define current-repl-msg-chan (make-parameter #f)) (define current-submissions (make-parameter #f)) (define current-session-maybe-mod (make-parameter #f)) +(define current-repl-output-manager (make-parameter #f)) ;; A way to parameterize e.g. commands that need to work with a ;; specific REPL session. Called from e.g. a command-server thread. @@ -57,6 +61,7 @@ [(? session? s) (log-racket-mode-debug @~a{@~v[@car[args]]: using session ID @~v[sid]}) (parameterize ([current-session-id sid] + [current-repl-output-manager (session-repl-out-mgr s)] [current-repl-msg-chan (session-repl-msg-chan s)] [current-submissions (session-submissions s)] [current-session-maybe-mod (session-maybe-mod s)] diff --git a/racket/repl.rkt b/racket/repl.rkt index abae84e3..7c964113 100644 --- a/racket/repl.rkt +++ b/racket/repl.rkt @@ -167,10 +167,11 @@ (log-racket-mode-info "starting repl session ~v" session-id) ;; Make pipe for user program input (as distinct form repl-submit ;; input). - (parameterize* ([current-session-id session-id] - [current-repl-msg-chan (make-channel)] - [current-submissions (make-channel)] - [error-display-handler racket-mode-error-display-handler]) + (parameterize* ([current-session-id session-id] + [current-repl-output-manager (make-repl-output-manager session-id)] + [current-repl-msg-chan (make-channel)] + [current-submissions (make-channel)] + [error-display-handler racket-mode-error-display-handler]) (set-session! session-id #f) (do-run (initial-run-config