@@ -31,6 +31,7 @@ export class BulkInsertOperation {
31
31
32
32
private _first : boolean = true ;
33
33
private _operationId = - 1 ;
34
+ private _nodeTag : string ;
34
35
35
36
private _useCompression : boolean = false ;
36
37
@@ -45,6 +46,9 @@ export class BulkInsertOperation {
45
46
46
47
public constructor ( database : string , store : IDocumentStore ) {
47
48
this . _conventions = store . conventions ;
49
+ if ( StringUtil . isNullOrEmpty ( database ) ) {
50
+ this . _throwNoDatabase ( ) ;
51
+ }
48
52
this . _requestExecutor = store . getRequestExecutor ( database ) ;
49
53
50
54
this . _generateEntityIdOnTheClient = new GenerateEntityIdOnTheClient ( this . _requestExecutor . conventions ,
@@ -65,8 +69,23 @@ export class BulkInsertOperation {
65
69
}
66
70
67
71
private async _throwBulkInsertAborted ( e : Error ) {
68
- const error = await this . _getExceptionFromOperation ( ) ;
69
- throwError ( "BulkInsertAbortedException" , "Failed to execute bulk insert" , error || e ) ;
72
+ let errorFromServer : Error ;
73
+ try {
74
+ errorFromServer = await this . _getExceptionFromOperation ( ) ;
75
+ } catch ( ee ) {
76
+ // server is probably down, will propagate the original exception
77
+ }
78
+
79
+ if ( errorFromServer ) {
80
+ throw errorFromServer ;
81
+ }
82
+
83
+ throwError ( "BulkInsertAbortedException" , "Failed to execute bulk insert" , e ) ;
84
+ }
85
+
86
+ private _throwNoDatabase ( ) : void {
87
+ throwError ( "InvalidOperationException" , "Cannot start bulk insert operation without specifying a name of a database to operate on."
88
+ + "Database name can be passed as an argument when bulk insert is being created or default database can be defined using 'DocumentStore.setDatabase' method." ) ;
70
89
}
71
90
72
91
private async _waitForId ( ) : Promise < void > {
@@ -77,6 +96,7 @@ export class BulkInsertOperation {
77
96
const bulkInsertGetIdRequest = new GetNextOperationIdCommand ( ) ;
78
97
await this . _requestExecutor . execute ( bulkInsertGetIdRequest ) ;
79
98
this . _operationId = bulkInsertGetIdRequest . result ;
99
+ this . _nodeTag = bulkInsertGetIdRequest . nodeTag ;
80
100
}
81
101
82
102
private static _typeCheckStoreArgs (
@@ -242,7 +262,7 @@ export class BulkInsertOperation {
242
262
}
243
263
244
264
private async _getExceptionFromOperation ( ) : Promise < Error > {
245
- const stateRequest = new GetOperationStateCommand ( this . _conventions , this . _operationId ) ;
265
+ const stateRequest = new GetOperationStateCommand ( this . _conventions , this . _operationId , this . _nodeTag ) ;
246
266
await this . _requestExecutor . execute ( stateRequest ) ;
247
267
if ( ! stateRequest . result ) {
248
268
return null ;
@@ -263,7 +283,8 @@ export class BulkInsertOperation {
263
283
264
284
this . _requestBodyStream = new stream . PassThrough ( ) ;
265
285
const bulkCommand =
266
- new BulkInsertCommand ( this . _operationId , this . _requestBodyStream , this . _useCompression ) ;
286
+ new BulkInsertCommand ( this . _operationId , this . _requestBodyStream , this . _nodeTag ) ;
287
+ bulkCommand . useCompression = this . _useCompression ;
267
288
268
289
const bulkCommandPromise = this . _requestExecutor . execute ( bulkCommand ) ;
269
290
@@ -298,7 +319,7 @@ export class BulkInsertOperation {
298
319
await this . _waitForId ( ) ;
299
320
300
321
try {
301
- await this . _requestExecutor . execute ( new KillOperationCommand ( this . _operationId ) ) ;
322
+ await this . _requestExecutor . execute ( new KillOperationCommand ( this . _operationId , this . _nodeTag ) ) ;
302
323
} catch ( err ) {
303
324
const bulkInsertError = getError ( "BulkInsertAbortedException" ,
304
325
"Unable to kill bulk insert operation, because it was not found on the server." , err ) ;
@@ -365,14 +386,14 @@ export class BulkInsertCommand extends RavenCommand<void> {
365
386
366
387
private readonly _stream : stream . Readable ;
367
388
private readonly _id : number ;
368
- private _useCompression : boolean ;
389
+ public useCompression : boolean ;
369
390
370
- public constructor ( id : number , stream : stream . Readable , useCompression : boolean ) {
391
+ public constructor ( id : number , stream : stream . Readable , nodeTag : string ) {
371
392
super ( ) ;
372
393
373
394
this . _stream = stream ;
374
395
this . _id = id ;
375
- this . _useCompression = useCompression ;
396
+ this . _selectedNodeTag = nodeTag ;
376
397
}
377
398
378
399
public createRequest ( node : ServerNode ) : HttpRequestParameters {
0 commit comments