Skip to content

Commit 0eb829f

Browse files
committed
Merge branch 'feature/from_opfs_path' into feature/support_multi_window
# Conflicts: # packages/duckdb-wasm/test/opfs.test.ts
2 parents f53838f + d42ff67 commit 0eb829f

12 files changed

+326
-49
lines changed

lib/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,9 @@ if(EMSCRIPTEN)
292292
_malloc, \
293293
_calloc, \
294294
_free, \
295+
stringToUTF8, \
296+
lengthBytesUTF8, \
297+
stackAlloc, \
295298
_duckdb_web_clear_response, \
296299
_duckdb_web_collect_file_stats, \
297300
_duckdb_web_connect, \

lib/src/webdb_api.cc

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <exception>
12
#include <iostream>
23
#include <stdexcept>
34

@@ -94,9 +95,24 @@ void duckdb_web_fs_drop_file(WASMResponse* packed, const char* file_name) {
9495
WASMResponseBuffer::Get().Store(*packed, webdb.DropFile(file_name));
9596
}
9697
/// Drop a file
97-
void duckdb_web_fs_drop_files(WASMResponse* packed) {
98+
void duckdb_web_fs_drop_files(WASMResponse* packed, const char** names, int name_count) {
9899
GET_WEBDB(*packed);
99-
WASMResponseBuffer::Get().Store(*packed, webdb.DropFiles());
100+
if (name_count == 0) {
101+
WASMResponseBuffer::Get().Store(*packed, webdb.DropFiles());
102+
} else {
103+
for (int i = 0; i < name_count; i++) {
104+
const char* name = names[i];
105+
if (name == nullptr) {
106+
std::cerr << "Error: NULL pointer detected at index " << i << std::endl;
107+
continue;
108+
}
109+
if (std::strlen(name) == 0) {
110+
std::cerr << "Error: Empty string detected at index " << i << std::endl;
111+
continue;
112+
}
113+
WASMResponseBuffer::Get().Store(*packed, webdb.DropFile(name));
114+
}
115+
}
100116
}
101117
/// Glob file infos
102118
void duckdb_web_fs_glob_file_infos(WASMResponse* packed, const char* file_name) {

packages/duckdb-wasm/src/bindings/bindings_base.ts

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
525525
}
526526
}
527527
}
528-
return handle;
528+
return handle;
529529
}
530530
/** Register a file object URL async */
531531
public async registerFileHandleAsync<HandleType>(
@@ -583,12 +583,52 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
583583
dropResponseBuffers(this.mod);
584584
}
585585
/** Drop files */
586-
public dropFiles(): void {
587-
const [s, d, n] = callSRet(this.mod, 'duckdb_web_fs_drop_files', [], []);
588-
if (s !== StatusCode.SUCCESS) {
589-
throw new Error(readString(this.mod, d, n));
586+
public dropFiles(names?:string[]): void {
587+
const pointers:number[] = [];
588+
let pointerOfArray:number = -1;
589+
try {
590+
for (const str of (names ?? [])) {
591+
if (str !== null && str !== undefined && str.length > 0) {
592+
const size = this.mod.lengthBytesUTF8(str) + 1;
593+
const ret = this.mod._malloc(size);
594+
if (!ret) {
595+
throw new Error(`Failed to allocate memory for string: ${str}`);
596+
}
597+
this.mod.stringToUTF8(str, ret, size);
598+
pointers.push(ret);
599+
}
600+
}
601+
pointerOfArray = this.mod._malloc(pointers.length * 4);
602+
if (!pointerOfArray) {
603+
throw new Error(`Failed to allocate memory for pointers array`);
604+
}
605+
for (let i = 0; i < pointers.length; i++) {
606+
this.mod.HEAP32[(pointerOfArray >> 2) + i] = pointers[i];
607+
}
608+
const [s, d, n] = callSRet(
609+
this.mod,
610+
'duckdb_web_fs_drop_files',
611+
[
612+
'number',
613+
'number'
614+
],
615+
[
616+
pointerOfArray,
617+
pointers.length
618+
]
619+
);
620+
if (s !== StatusCode.SUCCESS) {
621+
throw new Error(readString(this.mod, d, n));
622+
}
623+
dropResponseBuffers(this.mod);
624+
} finally {
625+
for (const pointer of pointers) {
626+
this.mod._free(pointer);
627+
}
628+
if( pointerOfArray > 0 ){
629+
this.mod._free(pointerOfArray);
630+
}
590631
}
591-
dropResponseBuffers(this.mod);
592632
}
593633
/** Flush all files */
594634
public flushFiles(): void {
@@ -615,11 +655,11 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
615655
return copy;
616656
}
617657
/** Enable tracking of file statistics */
618-
public registerOPFSFileName(file: string): Promise<void> {
619-
if (file.startsWith("opfs://")) {
620-
return this.prepareFileHandle(file, DuckDBDataProtocol.BROWSER_FSACCESS);
621-
} else {
622-
throw new Error("Not an OPFS file name: " + file);
658+
public async registerOPFSFileName(file: string): Promise<void> {
659+
if (file.startsWith("opfs://")) {
660+
return await this.prepareFileHandle(file, DuckDBDataProtocol.BROWSER_FSACCESS);
661+
} else {
662+
throw new Error("Not an OPFS file name: " + file);
623663
}
624664
}
625665
public collectFileStatistics(file: string, enable: boolean): void {

packages/duckdb-wasm/src/bindings/bindings_interface.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ export interface DuckDBBindings {
5858
prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol, accessMode?: DuckDBAccessMode): Promise<void>;
5959
globFiles(path: string): WebFile[];
6060
dropFile(name: string): void;
61-
dropFiles(): void;
61+
dropFiles(names?: string[]): void;
6262
flushFiles(): void;
6363
copyFileToPath(name: string, path: string): void;
6464
copyFileToBuffer(name: string): Uint8Array;
65-
registerOPFSFileName(file: string): void;
65+
registerOPFSFileName(file: string): Promise<void>;
6666
collectFileStatistics(file: string, enable: boolean): void;
6767
exportFileStatistics(file: string): FileStatistics;
6868
}

packages/duckdb-wasm/src/bindings/config.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ export interface DuckDBFilesystemConfig {
2929
allowFullHTTPReads?: boolean;
3030
}
3131

32+
export interface DuckDBOPFSConfig {
33+
/**
34+
* Defines how `opfs://` files are handled during SQL execution.
35+
* - "auto": Automatically register `opfs://` files and drop them after execution.
36+
* - "manual": Files must be manually registered and dropped.
37+
*/
38+
fileHandling?: "auto" | "manual";
39+
}
40+
3241
export enum DuckDBAccessMode {
3342
UNDEFINED = 0,
3443
AUTOMATIC = 1,
@@ -70,4 +79,8 @@ export interface DuckDBConfig {
7079
* Custom user agent string
7180
*/
7281
customUserAgent?: string;
82+
/**
83+
* opfs string
84+
*/
85+
opfs?: DuckDBOPFSConfig;
7386
}

packages/duckdb-wasm/src/bindings/duckdb_module.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ export interface DuckDBModule extends EmscriptenModule {
77
stackSave: typeof stackSave;
88
stackAlloc: typeof stackAlloc;
99
stackRestore: typeof stackRestore;
10+
lengthBytesUTF8: typeof lengthBytesUTF8;
11+
stringToUTF8: typeof stringToUTF8;
1012

1113
ccall: typeof ccall;
1214
PThread: PThread;

packages/duckdb-wasm/src/parallel/async_bindings.ts

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { InstantiationProgress } from '../bindings/progress';
1818
import { arrowToSQLField } from '../json_typedef';
1919
import { WebFile } from '../bindings/web_file';
2020
import { DuckDBDataProtocol } from '../bindings';
21+
import { searchOPFSFiles, isOPFSProtocol } from "../utils/opfs_util";
2122

2223
const TEXT_ENCODER = new TextEncoder();
2324

@@ -45,6 +46,8 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
4546
protected _nextMessageId = 0;
4647
/** The pending requests */
4748
protected _pendingRequests: Map<number, WorkerTaskVariant> = new Map();
49+
/** The DuckDBConfig */
50+
protected _config: DuckDBConfig = {};
4851

4952
constructor(logger: Logger, worker: Worker | null = null) {
5053
this._logger = logger;
@@ -59,6 +62,11 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
5962
return this._logger;
6063
}
6164

65+
/** Get the logger */
66+
public get config(): DuckDBConfig {
67+
return this._config;
68+
}
69+
6270
/** Attach to worker */
6371
protected attach(worker: Worker): void {
6472
this._worker = worker;
@@ -100,7 +108,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
100108
transfer: ArrayBuffer[] = [],
101109
): Promise<WorkerTaskReturnType<W>> {
102110
if (!this._worker) {
103-
console.error('cannot send a message since the worker is not set!');
111+
console.error('cannot send a message since the worker is not set!:' + task.type+"," + task.data);
104112
return undefined as any;
105113
}
106114
const mid = this._nextMessageId++;
@@ -317,8 +325,8 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
317325
return await this.postTask(task);
318326
}
319327
/** Try to drop files */
320-
public async dropFiles(): Promise<null> {
321-
const task = new WorkerTask<WorkerRequestType.DROP_FILES, null, null>(WorkerRequestType.DROP_FILES, null);
328+
public async dropFiles(names?: string[]): Promise<null> {
329+
const task = new WorkerTask<WorkerRequestType.DROP_FILES, string[] | undefined, null>(WorkerRequestType.DROP_FILES, names);
322330
return await this.postTask(task);
323331
}
324332
/** Flush all files */
@@ -360,6 +368,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
360368

361369
/** Open a new database */
362370
public async open(config: DuckDBConfig): Promise<void> {
371+
this._config = config;
363372
const task = new WorkerTask<WorkerRequestType.OPEN, DuckDBConfig, null>(WorkerRequestType.OPEN, config);
364373
await this.postTask(task);
365374
}
@@ -394,6 +403,21 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
394403

395404
/** Run a query */
396405
public async runQuery(conn: ConnectionID, text: string): Promise<Uint8Array> {
406+
if( this.shouldOPFSFileHandling() ){
407+
const files = await this.registerOPFSFileFromSQL(text);
408+
try {
409+
return await this._runQueryAsync(conn, text);
410+
} finally {
411+
if( files.length > 0 ){
412+
await this.dropFiles(files);
413+
}
414+
}
415+
} else {
416+
return await this._runQueryAsync(conn, text);
417+
}
418+
}
419+
420+
private async _runQueryAsync(conn: ConnectionID, text: string): Promise<Uint8Array> {
397421
const task = new WorkerTask<WorkerRequestType.RUN_QUERY, [ConnectionID, string], Uint8Array>(
398422
WorkerRequestType.RUN_QUERY,
399423
[conn, text],
@@ -406,6 +430,25 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
406430
conn: ConnectionID,
407431
text: string,
408432
allowStreamResult: boolean = false,
433+
): Promise<Uint8Array | null> {
434+
if( this.shouldOPFSFileHandling() ){
435+
const files = await this.registerOPFSFileFromSQL(text);
436+
try {
437+
return await this._startPendingQueryAsync(conn, text, allowStreamResult);
438+
} finally {
439+
if( files.length > 0 ){
440+
await this.dropFiles(files);
441+
}
442+
}
443+
} else {
444+
return await this._startPendingQueryAsync(conn, text, allowStreamResult);
445+
}
446+
}
447+
448+
private async _startPendingQueryAsync(
449+
conn: ConnectionID,
450+
text: string,
451+
allowStreamResult: boolean = false,
409452
): Promise<Uint8Array | null> {
410453
const task = new WorkerTask<
411454
WorkerRequestType.START_PENDING_QUERY,
@@ -414,6 +457,7 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
414457
>(WorkerRequestType.START_PENDING_QUERY, [conn, text, allowStreamResult]);
415458
return await this.postTask(task);
416459
}
460+
417461
/** Poll a pending query */
418462
public async pollPendingQuery(conn: ConnectionID): Promise<Uint8Array | null> {
419463
const task = new WorkerTask<WorkerRequestType.POLL_PENDING_QUERY, ConnectionID, Uint8Array | null>(
@@ -647,4 +691,26 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
647691
);
648692
await this.postTask(task);
649693
}
694+
695+
private shouldOPFSFileHandling():boolean {
696+
if( isOPFSProtocol(this.config.path ?? "")){
697+
return this.config.opfs?.fileHandling == "auto";
698+
}
699+
return false;
700+
}
701+
702+
private async registerOPFSFileFromSQL(text: string) {
703+
const files = searchOPFSFiles(text);
704+
const result: string[] = [];
705+
for (const file of files) {
706+
try {
707+
await this.registerOPFSFileName(file);
708+
result.push(file);
709+
} catch (e) {
710+
console.error(e);
711+
throw new Error("File Not found:" + file);
712+
}
713+
}
714+
return result;
715+
}
650716
}

packages/duckdb-wasm/src/parallel/async_bindings_interface.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,7 @@ export interface AsyncDuckDBBindings {
3232
insertArrowFromIPCStream(conn: number, buffer: Uint8Array, options?: CSVInsertOptions): Promise<void>;
3333
insertCSVFromPath(conn: number, path: string, options: CSVInsertOptions): Promise<void>;
3434
insertJSONFromPath(conn: number, path: string, options: JSONInsertOptions): Promise<void>;
35+
36+
dropFile(name: string):Promise<null>;
37+
dropFiles(names?: string[]):Promise<null>;
3538
}

packages/duckdb-wasm/src/parallel/worker_dispatcher.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger {
150150
this.sendOK(request);
151151
break;
152152
case WorkerRequestType.DROP_FILES:
153-
this._bindings.dropFiles();
153+
this._bindings.dropFiles(request.data);
154154
this.sendOK(request);
155155
break;
156156
case WorkerRequestType.FLUSH_FILES:
@@ -362,7 +362,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger {
362362
break;
363363

364364
case WorkerRequestType.REGISTER_OPFS_FILE_NAME:
365-
this._bindings.registerOPFSFileName(request.data[0]);
365+
await this._bindings.registerOPFSFileName(request.data[0]);
366366
this.sendOK(request);
367367
break;
368368

packages/duckdb-wasm/src/parallel/worker_request.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ export type WorkerRequestVariant =
116116
| WorkerRequest<WorkerRequestType.CREATE_PREPARED, [ConnectionID, string]>
117117
| WorkerRequest<WorkerRequestType.DISCONNECT, number>
118118
| WorkerRequest<WorkerRequestType.DROP_FILE, string>
119-
| WorkerRequest<WorkerRequestType.DROP_FILES, null>
119+
| WorkerRequest<WorkerRequestType.DROP_FILES, string[] | undefined>
120120
| WorkerRequest<WorkerRequestType.EXPORT_FILE_STATISTICS, string>
121121
| WorkerRequest<WorkerRequestType.FETCH_QUERY_RESULTS, number>
122122
| WorkerRequest<WorkerRequestType.FLUSH_FILES, null>
@@ -176,7 +176,7 @@ export type WorkerTaskVariant =
176176
| WorkerTask<WorkerRequestType.CREATE_PREPARED, [number, string], number>
177177
| WorkerTask<WorkerRequestType.DISCONNECT, ConnectionID, null>
178178
| WorkerTask<WorkerRequestType.DROP_FILE, string, null>
179-
| WorkerTask<WorkerRequestType.DROP_FILES, null, null>
179+
| WorkerTask<WorkerRequestType.DROP_FILES, string[] | undefined, null>
180180
| WorkerTask<WorkerRequestType.EXPORT_FILE_STATISTICS, string, FileStatistics>
181181
| WorkerTask<WorkerRequestType.FETCH_QUERY_RESULTS, ConnectionID, Uint8Array>
182182
| WorkerTask<WorkerRequestType.FLUSH_FILES, null, null>
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
export const REGEX_OPFS_FILE = /'(opfs:\/\/\S*?)'/g;
2+
export const REGEX_OPFS_PROTOCOL = /(opfs:\/\/\S*?)/g;
3+
4+
export function isOPFSProtocol(path: string): boolean {
5+
return path.search(REGEX_OPFS_PROTOCOL) > -1;
6+
}
7+
8+
export function searchOPFSFiles(text: string) {
9+
return [...text.matchAll(REGEX_OPFS_FILE)].map(match => match[1]);
10+
}

0 commit comments

Comments
 (0)