Skip to content

Commit aeccdf8

Browse files
committed
Address review comments
1 parent d62498d commit aeccdf8

File tree

3 files changed

+108
-2
lines changed

3 files changed

+108
-2
lines changed

src/api/workspace.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,30 @@ import { type CoderApi } from "./coderApi";
1818
/** Opens a stream once; subsequent open() calls are no-ops until closed. */
1919
export class LazyStream<T> {
2020
private stream: UnidirectionalStream<T> | null = null;
21+
private opening: Promise<void> | null = null;
2122

2223
async open(factory: () => Promise<UnidirectionalStream<T>>): Promise<void> {
23-
this.stream ??= await factory();
24+
if (this.stream) return;
25+
26+
// Deduplicate concurrent calls; close() clears the reference to cancel.
27+
if (!this.opening) {
28+
const promise = factory().then((s) => {
29+
if (this.opening === promise) {
30+
this.stream = s;
31+
this.opening = null;
32+
} else {
33+
s.close();
34+
}
35+
});
36+
this.opening = promise;
37+
}
38+
await this.opening;
2439
}
2540

2641
close(): void {
2742
this.stream?.close();
2843
this.stream = null;
44+
this.opening = null;
2945
}
3046
}
3147

src/webviews/tasks/tasksPanel.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,8 @@ export class TasksPanel
480480

481481
const onOutput = (line: string) => {
482482
const clean = stripAnsi(line);
483-
if (clean.length === 0) return;
483+
// Skip lines that were purely ANSI codes, but keep intentional blank lines.
484+
if (line.length > 0 && clean.length === 0) return;
484485
this.sendNotification({
485486
type: TasksApi.workspaceLogsAppend.method,
486487
data: [clean],
@@ -644,6 +645,7 @@ export class TasksPanel
644645
dispose(): void {
645646
this.buildLogStream.close();
646647
this.agentLogStream.close();
648+
this.streamingTaskId = null;
647649
for (const d of this.disposables) {
648650
d.dispose();
649651
}

test/unit/api/workspace.test.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
import { LazyStream } from "@/api/workspace";
4+
import { type UnidirectionalStream } from "@/websocket/eventStreamConnection";
5+
6+
function mockStream(): UnidirectionalStream<unknown> {
7+
return {
8+
url: "ws://test",
9+
addEventListener: vi.fn(),
10+
removeEventListener: vi.fn(),
11+
close: vi.fn(),
12+
};
13+
}
14+
15+
type StreamFactory = () => Promise<UnidirectionalStream<unknown>>;
16+
17+
/** Creates a factory whose promise can be resolved manually. */
18+
function deferredFactory() {
19+
let resolve!: (s: UnidirectionalStream<unknown>) => void;
20+
const factory: StreamFactory = vi.fn().mockReturnValue(
21+
new Promise<UnidirectionalStream<unknown>>((r) => {
22+
resolve = r;
23+
}),
24+
);
25+
return {
26+
factory,
27+
resolve: (s?: UnidirectionalStream<unknown>) => resolve(s ?? mockStream()),
28+
};
29+
}
30+
31+
describe("LazyStream", () => {
32+
it("opens once and ignores subsequent calls", async () => {
33+
const factory: StreamFactory = vi.fn().mockResolvedValue(mockStream());
34+
const lazy = new LazyStream();
35+
36+
await lazy.open(factory);
37+
await lazy.open(factory);
38+
39+
expect(factory).toHaveBeenCalledOnce();
40+
});
41+
42+
it("can reopen after close", async () => {
43+
const factory: StreamFactory = vi.fn().mockResolvedValue(mockStream());
44+
const lazy = new LazyStream();
45+
46+
await lazy.open(factory);
47+
lazy.close();
48+
await lazy.open(factory);
49+
50+
expect(factory).toHaveBeenCalledTimes(2);
51+
});
52+
53+
it("closes the underlying stream", async () => {
54+
const stream = mockStream();
55+
const lazy = new LazyStream();
56+
57+
await lazy.open(() => Promise.resolve(stream));
58+
lazy.close();
59+
60+
expect(stream.close).toHaveBeenCalledOnce();
61+
});
62+
63+
it("deduplicates concurrent opens", async () => {
64+
const { factory, resolve } = deferredFactory();
65+
const lazy = new LazyStream();
66+
67+
const p1 = lazy.open(factory);
68+
const p2 = lazy.open(factory);
69+
resolve();
70+
await Promise.all([p1, p2]);
71+
72+
expect(factory).toHaveBeenCalledOnce();
73+
});
74+
75+
it("allows reopening after close during pending open", async () => {
76+
const { factory, resolve } = deferredFactory();
77+
const lazy = new LazyStream();
78+
79+
const p = lazy.open(factory);
80+
lazy.close();
81+
resolve();
82+
await p.catch(() => {});
83+
84+
const factory2: StreamFactory = vi.fn().mockResolvedValue(mockStream());
85+
await lazy.open(factory2);
86+
expect(factory2).toHaveBeenCalledOnce();
87+
});
88+
});

0 commit comments

Comments
 (0)