Skip to content

Commit b976623

Browse files
committed
完善协程上传下载任务的取消逻辑
1 parent ab616b2 commit b976623

File tree

4 files changed

+118
-38
lines changed

4 files changed

+118
-38
lines changed

demo/src/main/kotlin/com/icuxika/bittersweet/demo/api/SuspendApi.kt

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,13 @@ package com.icuxika.bittersweet.demo.api
33
import com.google.gson.reflect.TypeToken
44
import com.icuxika.bittersweet.demo.api.Api.HTTPRequestMethod
55
import javafx.scene.control.ProgressIndicator
6+
import kotlinx.coroutines.CancellationException
7+
import kotlinx.coroutines.Dispatchers
68
import kotlinx.coroutines.channels.awaitClose
9+
import kotlinx.coroutines.channels.trySendBlocking
710
import kotlinx.coroutines.flow.callbackFlow
11+
import kotlinx.coroutines.flow.flowOn
12+
import kotlinx.coroutines.isActive
813
import kotlinx.coroutines.suspendCancellableCoroutine
914
import okio.use
1015
import java.io.BufferedInputStream
@@ -55,11 +60,11 @@ sealed class ProgressFlowState<T> {
5560
/**
5661
* 获取文件流并保存到指定Path
5762
*/
58-
suspend fun suspendGetFileFlow(
63+
fun suspendGetFileFlow(
5964
url: String,
6065
filePath: Path,
6166
data: Any? = null,
62-
) = callbackFlow<ProgressFlowState<Int>> {
67+
) = callbackFlow<ProgressFlowState<Double>> {
6368
val result = runCatching {
6469
suspendCancellableCoroutine { cancellableContinuation ->
6570
val type = object : TypeToken<Pair<InputStream, Double>>() {}.type
@@ -79,34 +84,37 @@ suspend fun suspendGetFileFlow(
7984
var bytesAllRead = 0
8085
var bytesRead: Int
8186
while (bufferedInputStream.read(buffer).also { bytesRead = it } != -1) {
87+
if (!isActive) {
88+
throw CancellationException("cancel")
89+
}
8290
fileOutputStream.write(buffer, 0, bytesRead)
8391
bytesAllRead += bytesRead
8492
if (contentLength == ProgressIndicator.INDETERMINATE_PROGRESS) {
8593
trySend(ProgressFlowState.Progress(contentLength))
8694
} else {
87-
trySend(
95+
trySendBlocking(
8896
ProgressFlowState.Progress(
8997
(bytesAllRead.toDouble() / contentLength).coerceIn(0.0, 1.0)
9098
)
9199
)
92100
}
93101
}
94102
fileOutputStream.flush()
95-
send(ProgressFlowState.Success(0))
103+
send(ProgressFlowState.Success(0.0))
96104
}
97105
}
98106
}
99107
result.exceptionOrNull()?.let {
100108
send(ProgressFlowState.Error(it))
101109
}
102110
close()
103-
awaitClose { }
104-
}
111+
awaitClose {}
112+
}.flowOn(Dispatchers.IO)
105113

106114
/**
107115
* 上传文件并同时监听进度,监听器会在请求真正发起之前就被调用
108116
*/
109-
suspend inline fun <reified T> suspendPostFileFlow(
117+
inline fun <reified T> suspendPostFileFlow(
110118
url: String,
111119
data: Any? = null,
112120
) = callbackFlow<ProgressFlowState<T>> {
@@ -140,4 +148,4 @@ suspend inline fun <reified T> suspendPostFileFlow(
140148
}
141149
close()
142150
awaitClose { }
143-
}
151+
}.flowOn(Dispatchers.IO)

demo/src/main/kotlin/com/icuxika/bittersweet/demo/controller/MainController.kt

Lines changed: 99 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,32 @@ import com.icuxika.bittersweet.control.KButton
44
import com.icuxika.bittersweet.demo.AppResource
55
import com.icuxika.bittersweet.demo.AppView
66
import com.icuxika.bittersweet.demo.annotation.AppFXML
7-
import com.icuxika.bittersweet.demo.api.ProgressFlowState
7+
import com.icuxika.bittersweet.demo.api.*
88
import com.icuxika.bittersweet.demo.system.Theme
99
import com.icuxika.bittersweet.demo.util.FileDownloader
1010
import com.icuxika.bittersweet.dsl.onAction
1111
import com.icuxika.bittersweet.extension.logger
12+
import javafx.beans.binding.When
1213
import javafx.beans.property.SimpleBooleanProperty
1314
import javafx.beans.property.SimpleDoubleProperty
15+
import javafx.beans.property.SimpleStringProperty
1416
import javafx.collections.FXCollections
1517
import javafx.fxml.FXML
1618
import javafx.fxml.Initializable
1719
import javafx.geometry.Pos
1820
import javafx.scene.control.*
1921
import javafx.scene.layout.BorderPane
22+
import javafx.scene.layout.HBox
2023
import javafx.scene.layout.StackPane
2124
import javafx.scene.layout.VBox
2225
import javafx.scene.paint.Color
26+
import javafx.scene.text.Text
2327
import javafx.stage.Stage
2428
import javafx.util.Callback
25-
import kotlinx.coroutines.CoroutineScope
26-
import kotlinx.coroutines.Dispatchers
29+
import kotlinx.coroutines.*
30+
import kotlinx.coroutines.flow.Flow
2731
import kotlinx.coroutines.javafx.JavaFx
28-
import kotlinx.coroutines.launch
32+
import java.io.File
2933
import java.net.URL
3034
import java.nio.file.Path
3135
import java.util.*
@@ -41,9 +45,33 @@ class MainController : Initializable {
4145

4246
private val scope = CoroutineScope(Dispatchers.JavaFx)
4347

48+
private lateinit var job: Job
49+
private val downloadUrl = "https://dldir1.qq.com/qqfile/qq/QQNT/Windows/QQ_9.9.9_240403_x64_01.exe"
50+
private val savePath = Path.of(System.getProperty("user.home")).resolve("Downloads").resolve("temp")
51+
.resolve("QQ.exe")
52+
4453
private val progressProperty = SimpleDoubleProperty(ProgressIndicator.INDETERMINATE_PROGRESS)
4554
private val cButtonBadgeVisibleProperty = SimpleBooleanProperty(true)
4655

56+
private suspend fun Flow<ProgressFlowState<Double>>.resolveFlowCollector() {
57+
collect {
58+
when (it) {
59+
is ProgressFlowState.Progress -> {
60+
progressProperty.set(it.progress)
61+
}
62+
63+
is ProgressFlowState.Success -> {
64+
LOGGER.info("下载完成[${Thread.currentThread().name}]-->${it.result}")
65+
LOGGER.info("文件保存路径->$savePath")
66+
}
67+
68+
is ProgressFlowState.Error -> {
69+
LOGGER.info("下载失败[${Thread.currentThread().name}]-->${it.throwable.message}")
70+
}
71+
}
72+
}
73+
}
74+
4775
override fun initialize(location: URL?, resources: ResourceBundle?) {
4876
container.sceneProperty().addListener { _, oldScene, newScene ->
4977
if (oldScene == null && newScene != null) {
@@ -59,33 +87,79 @@ class MainController : Initializable {
5987
ProgressBar().apply {
6088
progressProperty().bind(progressProperty)
6189
},
62-
Button("下载").apply {
63-
styleClass.add("test-button")
64-
onAction {
65-
scope.launch {
66-
val fileURL =
67-
"https://dldir1.qq.com/qqfile/qq/QQNT/Windows/QQ_9.9.9_240403_x64_01.exe"
68-
val filePath =
69-
Path.of(System.getProperty("user.home")).resolve("Downloads").resolve("temp")
70-
.resolve("result1.exe");
71-
FileDownloader.downloadFile(fileURL, filePath).collect {
72-
when (it) {
73-
is ProgressFlowState.Progress -> {
74-
progressProperty.set(it.progress)
75-
}
90+
HBox(
91+
Button("下载1").apply {
92+
styleClass.add("test-button")
93+
onAction {
94+
job = scope.launch {
95+
FileDownloader.downloadFile(downloadUrl, savePath).resolveFlowCollector()
96+
}
97+
}
98+
},
99+
Button("下载2").apply {
100+
styleClass.add("test-button")
101+
onAction {
102+
job = scope.launch {
103+
suspendGetFileFlow(downloadUrl, savePath).resolveFlowCollector()
104+
}
105+
}
106+
},
107+
Button("取消").apply {
108+
styleClass.add("test-button")
109+
onAction {
110+
scope.launch {
111+
job.cancelAndJoin()
112+
progressProperty.set(ProgressIndicator.INDETERMINATE_PROGRESS)
113+
savePath.toFile().delete()
114+
}
115+
}
116+
},
117+
).apply {
118+
alignment = Pos.CENTER
119+
spacing = 12.0
120+
},
121+
Text().apply {
122+
textProperty().bind(
123+
SimpleStringProperty("进度: ").concat(
124+
When(
125+
progressProperty.isEqualTo(
126+
SimpleDoubleProperty(ProgressIndicator.INDETERMINATE_PROGRESS)
127+
)
128+
).then(SimpleStringProperty("0"))
129+
.otherwise(progressProperty.multiply(100).asString("%.2f"))
130+
).concat("%")
131+
)
132+
},
133+
HBox(
134+
Button("上传").apply {
135+
onAction {
136+
job = scope.launch {
137+
suspendPostFileFlow<ApiData<Unit>>(
138+
"http://127.0.0.1:8080/users/upload", mapOf(
139+
Api.REQUEST_KEY_FILE to File("C:\\Users\\icuxika\\Downloads\\openjdk-14.0.2_windows-x64_bin.zip"),
140+
"id" to "11",
141+
)
142+
).collect {
143+
when (it) {
144+
is ProgressFlowState.Progress -> {
145+
progressProperty.set(it.progress)
146+
}
76147

77-
is ProgressFlowState.Success -> {
78-
LOGGER.info("下载完成[${Thread.currentThread().name}]-->${it.result}")
79-
LOGGER.info("文件保存路径->$filePath")
80-
}
148+
is ProgressFlowState.Error -> {
149+
LOGGER.info("上传失败[${Thread.currentThread().name}]-->${it.throwable.message}")
150+
}
81151

82-
is ProgressFlowState.Error -> {
83-
LOGGER.info("下载失败[${Thread.currentThread().name}]-->${it.throwable.message}")
152+
is ProgressFlowState.Success -> {
153+
println(it.result)
154+
}
84155
}
85156
}
86157
}
87158
}
88-
}
159+
},
160+
).apply {
161+
alignment = Pos.CENTER
162+
spacing = 12.0
89163
},
90164
ComboBox(FXCollections.observableArrayList(Theme.entries)).apply {
91165
valueProperty().bindBidirectional(AppResource.themeProperty())

demo/src/test/kotlin/ApiTest.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
import com.icuxika.bittersweet.demo.api.*
2-
import kotlinx.coroutines.Dispatchers
3-
import kotlinx.coroutines.flow.flowOn
42
import kotlinx.coroutines.test.runTest
53
import java.io.File
64
import java.nio.file.Path
@@ -80,7 +78,7 @@ class ApiTest {
8078
Api.REQUEST_KEY_FILE to File("build.gradle.kts"),
8179
"id" to "11",
8280
)
83-
).flowOn(Dispatchers.IO).collect {
81+
).collect {
8482
when (it) {
8583
is ProgressFlowState.Progress -> {
8684
println(it.progress)
@@ -102,7 +100,7 @@ class ApiTest {
102100
suspendGetFileFlow(
103101
"http://127.0.0.1:8080/users/download",
104102
Path.of(System.getProperty("user.home")).resolve("Downloads").resolve("temp").resolve("build.gradle.kts")
105-
).flowOn(Dispatchers.IO).collect {
103+
).collect {
106104
when (it) {
107105
is ProgressFlowState.Progress -> {
108106
println(it.progress)

server/src/main/kotlin/com/icuxika/bittersweet/server/Modules.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ fun Application.module() {
4646
}
4747
post<Users.Upload> {
4848
val receivedFiles = arrayListOf<Pair<String, Int>>()
49-
val multipartData = call.receiveMultipart()
49+
val multipartData = call.receiveMultipart(formFieldLimit = 200 * 1024 * 1024)
5050
multipartData.forEachPart { part ->
5151
val fileName: String
5252
val fileSize: Int

0 commit comments

Comments
 (0)