Skip to content

Commit a616f13

Browse files
authored
feat(server): support interrupted by ctrl+C (#8)
1 parent bbf918c commit a616f13

File tree

10 files changed

+163
-30
lines changed

10 files changed

+163
-30
lines changed

browser/src/store/scanning_store.ts

+19-5
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ type Event = {
1515
} | {
1616
readonly kind: "startHandingFile";
1717
readonly path: string;
18+
readonly operation: FileOperation;
1819
}| {
1920
readonly kind: "completeHandingFile";
2021
readonly path: string;
22+
readonly operation: FileOperation;
2123
} | {
2224
readonly kind: "completeParsePdfPage";
2325
readonly index: number;
@@ -35,7 +37,7 @@ export type ScanningStore$ = {
3537
readonly phase: ReadonlyVal<ScanningPhase>;
3638
readonly scanCount: ReadonlyVal<number>;
3739
readonly handlingFile: ReadonlyVal<HandingFile | null>;
38-
readonly completedFiles: ReadonlyVal<readonly string[]>;
40+
readonly completedFiles: ReadonlyVal<readonly File[]>;
3941
readonly error: ReadonlyVal<string | null>;
4042
readonly isInterrupting: ReadonlyVal<boolean>;
4143
readonly isInterrupted: ReadonlyVal<boolean>;
@@ -49,8 +51,14 @@ export enum ScanningPhase {
4951
Completed,
5052
}
5153

52-
export type HandingFile = {
54+
export type File = {
5355
readonly path: string;
56+
readonly operation: FileOperation;
57+
};
58+
59+
export type FileOperation = "create" | "update" | "remove";
60+
61+
export type HandingFile = File & {
5462
readonly handlePdfPage?: {
5563
readonly index: number;
5664
readonly total: number;
@@ -68,7 +76,7 @@ export class ScanningStore {
6876
readonly #phase$: Val<ScanningPhase> = val(ScanningPhase.Ready);
6977
readonly #scanCount$: Val<number> = val(0);
7078
readonly #handlingFile$: Val<HandingFile | null> = val<HandingFile | null>(null);
71-
readonly #completedFiles$: Val<readonly string[]> = val<readonly string[]>([]);
79+
readonly #completedFiles$: Val<readonly File[]> = val<readonly File[]>([]);
7280
readonly #error$: Val<string | null> = val<string | null>(null);
7381
readonly #isInterrupting$: Val<boolean> = val(false);
7482
readonly #isInterrupted$: Val<boolean> = val(false);
@@ -138,14 +146,20 @@ export class ScanningStore {
138146
break;
139147
}
140148
case "startHandingFile": {
141-
this.#handlingFile$.set({ path: event.path });
149+
this.#handlingFile$.set({
150+
path: event.path,
151+
operation: event.operation,
152+
});
142153
break;
143154
}
144155
case "completeHandingFile": {
145156
this.#handlingFile$.set(null);
146157
this.#completedFiles$.set([
147158
...this.#completedFiles$.value,
148-
event.path,
159+
{
160+
path: event.path,
161+
operation: event.operation,
162+
},
149163
]);
150164
break;
151165
}

browser/src/views/App.module.less

-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ body * {
1616
align-items: stretch;
1717

1818
> div {
19-
padding: 0 24px;
2019
flex-grow: 0;
2120
flex-shrink: 1;
2221
flex-basis: 1024px;

browser/src/views/ScannerPage.module.less

-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
.root {
2-
margin-bottom: 68px;
32
overflow-y: scroll;
43
}
54

browser/src/views/ScannerPage.tsx

+24-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { Skeleton, Result, Steps, List, Button, Divider, Progress, Typography }
55
import { ScanOutlined, ProfileTwoTone, SyncOutlined, FilePdfTwoTone, PauseOutlined } from "@ant-design/icons";
66
import { val } from "value-enhancer";
77
import { useVal } from "use-value-enhancer";
8-
import { ScannerStore, ScanningStore, ScanningPhase } from "../store";
8+
import { ScannerStore, ScanningStore, ScanningPhase, FileOperation } from "../store";
99
import { Sources } from "./Sources";
1010

1111
const { Title, Paragraph } = Typography;
@@ -149,15 +149,15 @@ const ScanningPanel: React.FC<ScanningPanelProps> = ({ store }) => {
149149
for (const file of completedFiles) {
150150
records.push({
151151
icon: <FilePdfTwoTone />,
152-
title: "录入 PDF 文件",
153-
content: file,
152+
title: `${textWithOperation(file.operation)} PDF 文件`,
153+
content: file.path,
154154
loading: false,
155155
});
156156
}
157157
if (handlingFile) {
158158
records.push({
159159
icon: <FilePdfTwoTone />,
160-
title: "录入 PDF 文件",
160+
title: `${textWithOperation(handlingFile.operation)} PDF 文件`,
161161
content: handlingFile.path,
162162
loading: true,
163163
});
@@ -242,4 +242,23 @@ const ProgressBar: React.FC<ProgressBarProps> = ({ name, error, pdfPage }) => {
242242
status={status} />
243243
</div>
244244
);
245-
};
245+
};
246+
247+
function textWithOperation(operation: FileOperation): string {
248+
let text: string = "";
249+
switch (operation) {
250+
case "create": {
251+
text = "录入";
252+
break;
253+
}
254+
case "update": {
255+
text = "更新";
256+
break;
257+
}
258+
case "remove": {
259+
text = "删除";
260+
break;
261+
}
262+
}
263+
return text;
264+
}

index_package/service/service.py

-3
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,6 @@ def query(self, text: str, results_limit: Optional[int]) -> QueryResult:
4646
def page_content(self, pdf_hash: str, page_index: int) -> str:
4747
return self._get_service_in_thread().page_content(pdf_hash, page_index)
4848

49-
def freeze_database(self):
50-
pass # TODO: 因为强制退出导致数据结构损坏,此处需要冻结数据库并重新开始
51-
5249
def scan_job(self, max_workers: int = 1, progress_event_listener: Optional[ProgressEventListener] = None) -> ServiceScanJob:
5350
if progress_event_listener is None:
5451
progress_event_listener = lambda _: None

server/launcher.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ def launch():
2222
app_dir = os.path.abspath(app_dir)
2323
sources = Sources(os.path.join(app_dir, "app.sqlite3"))
2424
service = ServiceRef(
25+
app=app,
2526
workspace_path=app_dir,
26-
embedding_model="shibing624/text2vec-base-chinese",
2727
sources=sources,
28+
embedding_model="shibing624/text2vec-base-chinese",
2829
)
2930
routes(app, service)
3031
app.run(host="0.0.0.0", port=port)

server/progress_events.py

+31-12
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
CompleteHandleFileEvent,
1111
PDFFileProgressEvent,
1212
PDFFileStep,
13+
HandleFileOperation,
1314
)
1415

1516

@@ -27,9 +28,15 @@ class InterruptionStatus(IntEnum):
2728
@dataclass
2829
class HandingFile:
2930
path: str
31+
operation: HandleFileOperation
3032
pdf_handing: tuple[int, int] | None = None
3133
pdf_indexing: tuple[int, int] | None = None
3234

35+
@dataclass
36+
class File:
37+
path: str
38+
operation: HandleFileOperation
39+
3340
class ProgressEvents:
3441
def __init__(self):
3542
self._phase: ProgressPhase = ProgressPhase.READY
@@ -38,7 +45,7 @@ def __init__(self):
3845
self._handing_file: HandingFile | None = None
3946
self._error: str | None = None
4047
self._interruption_status: InterruptionStatus = InterruptionStatus.No
41-
self._completed_files: list[str] = []
48+
self._completed_files: list[File] = []
4249
self._fetcher_lock: Lock = Lock()
4350
self._fetcher_queues: list[Queue[dict]] = []
4451

@@ -69,10 +76,11 @@ def _init_events(self) -> list[dict]:
6976
"kind": "scanCompleted",
7077
"count": self._updated_files,
7178
})
72-
for path in self._completed_files:
79+
for file in self._completed_files:
7380
events.append({
7481
"kind": "completeHandingFile",
75-
"path": path,
82+
"path": file.path,
83+
"operation": file.operation.value,
7684
})
7785
if self._phase == ProgressPhase.COMPLETED:
7886
events.append({ "kind": "completed" })
@@ -82,6 +90,7 @@ def _init_events(self) -> list[dict]:
8290
events.append({
8391
"kind": "startHandingFile",
8492
"path": self._handing_file.path,
93+
"operation": self._handing_file.operation.value,
8594
})
8695
if self._handing_file.pdf_handing is not None:
8796
index, total = self._handing_file.pdf_handing
@@ -114,7 +123,7 @@ def receive_event(self, event: ProgressEvent):
114123
if isinstance(event, ScanCompletedEvent):
115124
self._on_scan_completed(event.updated_files)
116125
elif isinstance(event, StartHandleFileEvent):
117-
self._on_start_handle_file(event.path)
126+
self._on_start_handle_file(event.path, event.operation)
118127
elif isinstance(event, CompleteHandleFileEvent):
119128
self._on_complete_handle_file(event.path)
120129
elif isinstance(event, PDFFileProgressEvent):
@@ -133,25 +142,35 @@ def _on_scan_completed(self, updated_files: int):
133142
"count": updated_files,
134143
})
135144

136-
def _on_start_handle_file(self, path: str):
145+
def _on_start_handle_file(self, path: str, operation: HandleFileOperation):
137146
with self._status_lock:
138-
self._handing_file = HandingFile(path=path)
139-
147+
self._handing_file = HandingFile(
148+
path=path,
149+
operation=operation,
150+
)
140151
self._emit_event({
141152
"kind": "startHandingFile",
142153
"path": path,
154+
"operation": operation.value,
143155
})
144156

145157
def _on_complete_handle_file(self, path: str):
158+
file: File | None = None
146159
with self._status_lock:
147-
self._completed_files.append(path)
148160
if self._handing_file is not None and self._handing_file.path == path:
161+
file = File(
162+
path=path,
163+
operation=self._handing_file.operation,
164+
)
165+
self._completed_files.append(file)
149166
self._handing_file = None
150167

151-
self._emit_event({
152-
"kind": "completeHandingFile",
153-
"path": path,
154-
})
168+
if file is not None:
169+
self._emit_event({
170+
"kind": "completeHandingFile",
171+
"path": file.path,
172+
"operation": file.operation.value,
173+
})
155174

156175
def _on_pdf_parse_progress(self, page_index: int, total_pages: int):
157176
with self._status_lock:

server/routes.py

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import sys
23

34
from .service import ServiceRef
45
from flask import (

server/service.py

+17-2
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,31 @@
11
from threading import Thread, Lock, Event
22
from typing import Generator
33
from json import dumps
4+
from flask import Flask
45
from index_package import Service, ServiceScanJob
56
from .sources import Sources
67
from .progress_events import ProgressEvents
8+
from .signal_handler import SignalHandler
79

810

911
class ServiceRef:
10-
def __init__(self, workspace_path: str, embedding_model: str, sources: Sources):
12+
def __init__(self,
13+
app: Flask,
14+
sources: Sources,
15+
workspace_path: str,
16+
embedding_model: str,
17+
):
18+
self._app: Flask = app
19+
self._sources: Sources = sources
1120
self._workspace_path: str = workspace_path
1221
self._embedding_model: str = embedding_model
13-
self._sources: Sources = sources
1422
self._lock: Lock = Lock()
1523
self._service: Service | None = None
1624
self._is_scanning: bool = False
1725
self._scan_job: ServiceScanJob | None = None
1826
self._scan_job_event: Event | None = None
1927
self._progress_events: ProgressEvents = ProgressEvents()
28+
self._signal_handler = SignalHandler()
2029

2130
@property
2231
def ref(self) -> Service:
@@ -70,6 +79,11 @@ def _scan(self):
7079
self._scan_job_event.set()
7180
self._scan_job_event = None
7281

82+
success_bind = self._signal_handler.bind_scan_job(scan_job)
83+
if not success_bind:
84+
self._progress_events.set_interrupted()
85+
return
86+
7387
try:
7488
try:
7589
completed = scan_job.start({
@@ -89,6 +103,7 @@ def _scan(self):
89103
self._progress_events.set_interrupted()
90104

91105
finally:
106+
self._signal_handler.unbind_scan_job()
92107
with self._lock:
93108
self._is_scanning = False
94109
self._scan_job = None

server/signal_handler.py

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import signal
2+
import time
3+
import sys
4+
import threading
5+
6+
from typing import Optional
7+
from index_package import ServiceScanJob
8+
9+
class SignalHandler:
10+
def __init__(self):
11+
self._scan_job: Optional[ServiceScanJob] = None
12+
self._first_interrupted_at: Optional[float] = None
13+
self._lock: threading.Lock = threading.Lock()
14+
self._scan_unbidden_event: threading.Event | None = None
15+
signal.signal(signal.SIGINT, self._on_sigint)
16+
17+
@property
18+
def is_interrupting(self) -> bool:
19+
with self._lock:
20+
return self._first_interrupted_at is not None
21+
22+
# return False when is interrupting
23+
def bind_scan_job(self, scan_job: ServiceScanJob) -> bool:
24+
with self._lock:
25+
if self._scan_job is not None:
26+
raise Exception("SignalHandler already watching a scan job")
27+
if self._first_interrupted_at is not None:
28+
return False
29+
self._scan_job = scan_job
30+
return True
31+
32+
def unbind_scan_job(self):
33+
with self._lock:
34+
self._scan_job = None
35+
if self._scan_unbidden_event is not None:
36+
self._scan_unbidden_event.set()
37+
self._scan_unbidden_event = None
38+
39+
def _on_sigint(self, sig, frame):
40+
limit_seconds = 12.0
41+
with self._lock:
42+
scan_job = self._scan_job
43+
first_interrupted_at = self._first_interrupted_at
44+
45+
if scan_job is not None and \
46+
first_interrupted_at is None:
47+
event = threading.Event()
48+
print("\nInterrupting...")
49+
with self._lock:
50+
self._first_interrupted_at = time.time()
51+
self._scan_unbidden_event = event
52+
scan_job.interrupt()
53+
event.wait()
54+
sys.exit(0)
55+
56+
elif first_interrupted_at is None:
57+
print("\nExiting...")
58+
sys.exit(130)
59+
60+
else:
61+
duration_seconds = time.time() - first_interrupted_at
62+
if duration_seconds <= limit_seconds:
63+
str_seconds = "{:.2f}".format(limit_seconds - duration_seconds)
64+
print(f"\nForce stopping... (press again to force stop after {str_seconds}s)")
65+
else:
66+
print("\nForce stopping...")
67+
print("It may corrupt the data structure of the database")
68+
# TODO: 因为强制退出导致数据结构损坏,此处需要冻结数据库并重新开始
69+
sys.exit(1)

0 commit comments

Comments
 (0)