-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathindex.ts
163 lines (141 loc) · 4.33 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import { CustomProgressEvent } from 'progress-events'
import { InvalidContentError } from '../errors.js'
import { defaultDirBuilder, type DirBuilder, type DirBuilderOptions } from './dir.js'
import { defaultFileBuilder, type FileBuilder, type FileBuilderOptions } from './file.js'
import type { ChunkValidator } from './validate-chunks.js'
import type { Chunker } from '../chunker/index.js'
import type {
Directory,
File,
FileCandidate,
ImportCandidate,
ImporterProgressEvents,
InProgressImportResult,
WritableStorage
} from '../index.js'
import type { ProgressEvent, ProgressOptions } from 'progress-events'
/**
* Passed to the onProgress callback while importing files
*/
export interface ImportReadProgress {
/**
* How many bytes we have read from this source so far
*/
bytesRead: bigint
/**
* The size of the current chunk
*/
chunkSize: bigint
/**
* The path of the file being imported, if one was specified
*/
path?: string
}
export type DagBuilderProgressEvents = ProgressEvent<
'unixfs:importer:progress:file:read',
ImportReadProgress
>
function isIterable (thing: any): thing is Iterable<any> {
return Symbol.iterator in thing
}
function isAsyncIterable (thing: any): thing is AsyncIterable<any> {
return Symbol.asyncIterator in thing
}
function contentAsAsyncIterable (
content: Uint8Array | AsyncIterable<Uint8Array> | Iterable<Uint8Array>
): AsyncIterable<Uint8Array> {
try {
if (content instanceof Uint8Array) {
return (async function * () {
yield content
})()
} else if (isIterable(content)) {
return (async function * () {
yield * content
})()
} else if (isAsyncIterable(content)) {
return content
}
} catch {
throw new InvalidContentError('Content was invalid')
}
throw new InvalidContentError('Content was invalid')
}
export interface DagBuilderOptions
extends FileBuilderOptions,
DirBuilderOptions,
ProgressOptions<ImporterProgressEvents> {
chunker: Chunker
chunkValidator: ChunkValidator
wrapWithDirectory: boolean
dirBuilder?: DirBuilder
fileBuilder?: FileBuilder
}
export type ImporterSourceStream =
| AsyncIterable<ImportCandidate>
| Iterable<ImportCandidate>
export interface DAGBuilder {
(source: ImporterSourceStream, blockstore: WritableStorage): AsyncIterable<
() => Promise<InProgressImportResult>
>
}
export function defaultDagBuilder (options: DagBuilderOptions): DAGBuilder {
return async function * dagBuilder (source, blockstore) {
for await (const entry of source) {
let originalPath: string | undefined
if (entry.path != null) {
originalPath = entry.path
entry.path = entry.path
.split('/')
.filter((path) => path != null && path !== '.')
.join('/')
}
if (isFileCandidate(entry)) {
const file: File = {
path: entry.path,
mtime: entry.mtime,
mode: entry.mode,
content: (async function * () {
let bytesRead = 0n
for await (const chunk of options.chunker(
options.chunkValidator(contentAsAsyncIterable(entry.content))
)) {
const currentChunkSize = BigInt(chunk.byteLength)
bytesRead += currentChunkSize
options.onProgress?.(
new CustomProgressEvent<ImportReadProgress>(
'unixfs:importer:progress:file:read',
{
bytesRead,
chunkSize: currentChunkSize,
path: entry.path
}
)
)
yield chunk
}
})(),
originalPath
}
const fileBuilder = options.fileBuilder ?? defaultFileBuilder
yield async () => fileBuilder(file, blockstore, options)
} else if (entry.path != null) {
const dir: Directory = {
path: entry.path,
mtime: entry.mtime,
mode: entry.mode,
originalPath
}
const dirBuilder =
options.dirBuilder ??
defaultDirBuilder
yield async () => dirBuilder(dir, blockstore, options)
} else {
throw new Error('Import candidate must have content or path or both')
}
}
}
}
function isFileCandidate (entry: any): entry is FileCandidate {
return entry.content != null
}