Skip to content

Commit f21f3b4

Browse files
committed
Fix chunk upload with ChunkedStreamReader to cancel underlying stream
1 parent 5d08f44 commit f21f3b4

File tree

6 files changed

+87
-51
lines changed

6 files changed

+87
-51
lines changed

data/lib/src/repository/flow/flow_uploader_impl.dart

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import 'dart:async';
3535
import 'dart:developer' as developer;
3636
import 'dart:io';
3737

38+
import 'package:async/async.dart';
3839
import 'package:dartz/dartz.dart';
3940
import 'package:data/src/network/config/endpoint.dart';
4041
import 'package:data/src/network/dio_client.dart';
@@ -43,7 +44,6 @@ import 'package:data/src/network/model/query/query_parameter.dart';
4344
import 'package:data/src/repository/flow/flow_response.dart';
4445
import 'package:dio/dio.dart';
4546
import 'package:domain/domain.dart';
46-
import 'package:domain/src/model/async_task/async_task.dart';
4747

4848
class FlowUploaderImpl extends FlowUploader {
4949
final DioClient _dioClient;
@@ -102,7 +102,7 @@ class FlowUploaderImpl extends FlowUploader {
102102

103103
@override
104104
Future<Flow> uploadChunk(
105-
File file,
105+
ChunkedStreamReader<int> chunkedStreamReaderFile,
106106
int chunkNumber,
107107
int chunkSize,
108108
int currentChunkSize,
@@ -116,30 +116,51 @@ class FlowUploaderImpl extends FlowUploader {
116116
{String? sharedSpaceId,
117117
String? parentNodeId}
118118
) async {
119-
final _fileSize = file.lengthSync();
120-
developer.log('uploadChunk(): chunk: $chunkNumber - currentChunkSize: $currentChunkSize', name: 'FlowUploaderImpl');
121-
122-
final formData = generateFormData(file, chunkNumber, chunkSize, currentChunkSize,
123-
startByte, endByte, _fileSize, flowIdentifier, flowFile, totalChunk, uploadedByte,
124-
onSendController, sharedSpaceId: sharedSpaceId, parentNodeId: parentNodeId);
125-
126-
final response = await _dioClient.post(
127-
Endpoint.flow.generateEndpointPath(),
128-
data: formData,
129-
options: Options(
130-
headers: _getRangeHeadersForChunkUpload(startByte, endByte, _fileSize)
131-
),
132-
onSendProgress: (progress, total) {
133-
onSendController.add(Right(UploadingFlowUploadState(flowFile, uploadedByte + progress, flowFile.fileInfo.fileSize)));
134-
}
135-
);
136-
final flowResponse = FlowResponse.fromJson(response);
137-
developer.log('uploadChunk(): ${flowResponse.toString()}', name: 'FlowUploaderImpl');
138-
return flowResponse.toFlow();
119+
try {
120+
final _fileSize = flowFile.fileInfo.fileSize;
121+
developer.log(
122+
'uploadChunk(): chunk: $chunkNumber - currentChunkSize: $currentChunkSize',
123+
name: 'FlowUploaderImpl');
124+
125+
final formData = generateFormData(
126+
chunkedStreamReaderFile,
127+
chunkNumber,
128+
chunkSize,
129+
currentChunkSize,
130+
startByte,
131+
endByte,
132+
_fileSize,
133+
flowIdentifier,
134+
flowFile,
135+
totalChunk,
136+
uploadedByte,
137+
onSendController,
138+
sharedSpaceId: sharedSpaceId,
139+
parentNodeId: parentNodeId);
140+
141+
final response = await _dioClient.post(
142+
Endpoint.flow.generateEndpointPath(),
143+
data: formData,
144+
options: Options(
145+
headers: _getRangeHeadersForChunkUpload(startByte, endByte, _fileSize)
146+
),
147+
onSendProgress: (progress, total) {
148+
onSendController.add(Right(UploadingFlowUploadState(
149+
flowFile, uploadedByte + progress,
150+
flowFile.fileInfo.fileSize)));
151+
}
152+
);
153+
final flowResponse = FlowResponse.fromJson(response);
154+
developer.log('uploadChunk(): ${flowResponse.toString()}', name: 'FlowUploaderImpl');
155+
return flowResponse.toFlow();
156+
} catch (e) {
157+
developer.log('uploadChunk(): exception vkl ${e}', name: 'FlowUploaderImpl');
158+
rethrow;
159+
}
139160
}
140161

141162
FormData generateFormData(
142-
File file,
163+
ChunkedStreamReader<int> chunkedStreamReaderFile,
143164
int chunkNumber,
144165
int chunkSize,
145166
int currentChunkSize,
@@ -163,7 +184,7 @@ class FlowUploaderImpl extends FlowUploader {
163184
IDENTIFIER: flowIdentifier,
164185
FILENAME: flowFile.fileInfo.fileName,
165186
RELATIVE_PATH: flowFile.fileInfo.fileName,
166-
FILE: MultipartFile(file.openRead(startByte, endByte), endByte - startByte),
187+
FILE: MultipartFile(chunkedStreamReaderFile.readStream(currentChunkSize), currentChunkSize),
167188
ASYNC_TASK: true,
168189
};
169190

domain/lib/src/model/flow/flow_chunk.dart

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
*/
3333

3434
import 'dart:async';
35+
import 'dart:developer' as developer;
3536
import 'dart:io';
3637
import 'dart:math';
3738

39+
import 'package:async/async.dart';
3840
import 'package:dartz/dartz.dart';
3941
import 'package:domain/domain.dart';
4042
import 'package:domain/src/model/flow/flow_chunk_upload_state.dart';
@@ -58,6 +60,7 @@ class FlowChunk extends Equatable {
5860
FlowChunkUploadState status = FlowChunkUploadState.pending;
5961

6062
bool error = false;
63+
ChunkedStreamReader<int>? _chunkedStreamReader;
6164

6265
final FlowUploader _flowUploader;
6366

@@ -75,8 +78,9 @@ class FlowChunk extends Equatable {
7578

7679
Future<Flow> upload(int uploadedByte, StreamController<Either<Failure, Success>> onSendController) async {
7780
status = FlowChunkUploadState.uploading;
81+
_chunkedStreamReader = ChunkedStreamReader(file.openRead(startByte, endByte));
7882
return _flowUploader.uploadChunk(
79-
file,
83+
_chunkedStreamReader!,
8084
offset + 1,
8185
chunkSize,
8286
currentChunkSize,
@@ -108,11 +112,16 @@ class FlowChunk extends Equatable {
108112
if (isSuccess) {
109113
status = FlowChunkUploadState.success;
110114
} else {
115+
developer.log('test(): update status error for offset $offset', name: 'FlowChunk');
111116
status = FlowChunkUploadState.error;
112117
}
113118
return isSuccess;
114119
}
115120

121+
void completed() {
122+
_chunkedStreamReader?.cancel();
123+
}
124+
116125
@override
117126
List<Object?> get props => [offset, flowFile, identifier];
118127
}

domain/lib/src/model/flow/flow_file.dart

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,12 @@ import 'dart:async';
3535
import 'dart:developer' as developer;
3636
import 'dart:io';
3737

38+
import 'package:async/async.dart';
3839
import 'package:dartz/dartz.dart';
3940
import 'package:domain/domain.dart';
40-
import 'package:domain/src/model/async_task/async_task.dart';
4141
import 'package:domain/src/model/async_task/async_task_exception.dart';
4242
import 'package:domain/src/model/async_task/async_task_status.dart';
43-
import 'package:domain/src/model/flow/flow_chunk.dart';
4443
import 'package:domain/src/model/flow/flow_chunk_upload_state.dart';
45-
import 'package:domain/src/repository/flow/flow_uploader.dart';
46-
import 'package:domain/src/usecases/upload_file/flow_upload_state.dart';
4744
import 'package:equatable/equatable.dart';
4845
import 'package:retry/retry.dart';
4946

@@ -110,21 +107,26 @@ class FlowFile extends Equatable {
110107
}
111108

112109
void _updateEvent(Either<Failure, Success> flowUploadState) {
110+
developer.log('_updateEvent() for $this = $flowUploadState', name: 'FlowFile');
113111
_progressStateController.add(flowUploadState);
114112
}
115113

116114
void _handleCompleted() {
117-
if (chunks.every((chunk) => chunk.status == FlowChunkUploadState.success)) {
118-
final asyncTaskId = _getProcessingTaskId();
119-
if (asyncTaskId != null) {
120-
_handleProcessing(asyncTaskId);
115+
try {
116+
if (chunks.every((chunk) => chunk.status == FlowChunkUploadState.success)) {
117+
final asyncTaskId = _getProcessingTaskId();
118+
if (asyncTaskId != null) {
119+
_handleProcessing(asyncTaskId);
120+
} else {
121+
_handleSuccess();
122+
}
121123
} else {
122-
_handleSuccess();
124+
developer.log('handleCompleted(): error', name: 'FlowFile');
125+
_updateEvent(Left(ErrorFlowUploadState(this)));
126+
_progressStateController.close();
123127
}
124-
} else {
125-
developer.log('handleCompleted(): error', name: 'FlowFile');
126-
_updateEvent(Left(ErrorFlowUploadState(this)));
127-
_progressStateController.close();
128+
} catch (e) {
129+
developer.log('_handleCompleted(): exception $e', name: 'FlowFile');
128130
}
129131
}
130132

@@ -176,6 +178,7 @@ class FlowFile extends Equatable {
176178
final success = await chunk.test();
177179
if (success) {
178180
_updateProgress(chunk);
181+
chunk.completed();
179182
}
180183
}
181184
}

domain/lib/src/repository/flow/flow_uploader.dart

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,19 @@
3232
*/
3333

3434
import 'dart:async';
35-
import 'dart:io';
3635

36+
import 'package:async/async.dart';
3737
import 'package:dartz/dartz.dart';
38-
import 'package:domain/domain.dart';
3938
import 'package:domain/src/model/async_task/async_task.dart';
39+
import 'package:domain/src/model/file_info.dart';
4040
import 'package:domain/src/model/flow/flow.dart';
41+
import 'package:domain/src/model/flow/flow_file.dart';
42+
import 'package:domain/src/state/failure.dart';
43+
import 'package:domain/src/state/success.dart';
4144

4245
abstract class FlowUploader {
4346
Future<Flow> uploadChunk(
44-
File file,
47+
ChunkedStreamReader<int> chunkedStreamReaderFile,
4548
int chunkNumber,
4649
int chunkSize,
4750
int currentChunkSize,

lib/presentation/manager/upload_and_share_file/upload_and_share_file_manager.dart

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ class UploadShareFileManager {
144144
_progressStateStreamGroup.stream.listen((flowUploadState) {
145145
flowUploadState.fold(
146146
(failure) {
147+
developer.log('_handleFlowProgressState(): [Failure] $failure', name: 'UploadShareFileManager');
147148
if (failure is ErrorFlowUploadState) {
148149
_uploadingStateFiles.updateElementByUploadTaskId(
149150
failure.flowFile.uploadTaskId,
@@ -154,8 +155,6 @@ class UploadShareFileManager {
154155
},
155156
(success) {
156157
if (success is UploadingFlowUploadState) {
157-
developer.log('_handleFlowProgressState(): uploading: ${success.progress}', name: 'UploadShareFileManager');
158-
159158
_uploadingStateFiles.updateElementByUploadTaskId(
160159
success.flowFile.uploadTaskId,
161160
(currentState) => (currentState?.uploadStatus.completed ?? false)
@@ -177,7 +176,7 @@ class UploadShareFileManager {
177176
}
178177

179178
void _handleUploadFileSucceedWithResource(SuccessWithResourceFlowUploadState successWithResourceFlowUploadState) {
180-
developer.log('_handleUploadFileSucceed()', name: 'UploadShareFileManager');
179+
developer.log('_handleUploadFileSucceedWithResource()', name: 'UploadShareFileManager');
181180
final fileState = _uploadingStateFiles.getElementByUploadTaskId(successWithResourceFlowUploadState.flowFile.uploadTaskId);
182181
if (fileState != null) {
183182
_fileHelper.deleteFile(fileState.file);
@@ -227,6 +226,7 @@ class UploadShareFileManager {
227226
}
228227

229228
void _handleFlowUploadFileFailure(ErrorFlowUploadState errorFlowUploadState) {
229+
developer.log('_handleFlowUploadFileFailure(): $errorFlowUploadState', name: 'UploadShareFileManager');
230230
final fileState = _uploadingStateFiles.getElementByUploadTaskId(errorFlowUploadState.flowFile.uploadTaskId);
231231
if (fileState != null) {
232232
_fileHelper.deleteFile(fileState.file);

lib/presentation/util/helper/file_helper.dart

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,19 @@
3030
// the Additional Terms applicable to LinShare software.
3131
//
3232

33+
import 'dart:developer' as developer;
3334
import 'dart:io';
3435

3536
import 'package:domain/domain.dart';
3637

3738
class FileHelper {
3839
void deleteFile(FileInfo fileInfo) async {
39-
if (fileInfo != null) {
40-
final file = File(fileInfo.filePath + fileInfo.fileName);
41-
try {
42-
await file.delete();
43-
} catch (exception) {
44-
print('error when delete file: ${fileInfo.fileName} ' + exception.toString());
45-
}
40+
developer.log('deleteFile(): $fileInfo', name: 'FileHelper');
41+
final file = File(fileInfo.filePath + fileInfo.fileName);
42+
try {
43+
await file.delete();
44+
} catch (exception) {
45+
developer.log('deleteFile(): error when delete file: ${fileInfo.fileName}' + exception.toString(), name: 'FileHelper');
4646
}
4747
}
4848
}

0 commit comments

Comments
 (0)