Skip to content

Commit a784703

Browse files
committed
Fix: Dao.listBackupSetsToExecute
BackupSetExecutorTest.doesn't execute backup set twice in parallel
1 parent 0367406 commit a784703

File tree

5 files changed

+105
-9
lines changed

5 files changed

+105
-9
lines changed

.travis.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ function client_test() {
2727
export RBACKUP_IP=$(docker inspect citests_tests_1 | jq -r '.[0] | .NetworkSettings.Ports."3369/tcp" | .[0].HostIp') && \
2828
wait_for_server && \
2929
cd .. && \
30-
mkdir public && mkdir public/bundle && touch public/bundle/js.bundle.txt && touch public/bundle/style.bundle.txt && \
30+
mkdir -p public/bundle && touch public/bundle/js.bundle.txt && touch public/bundle/style.bundle.txt && \
3131
sbt ";clean;test" && \
3232
cd ci-tests && \
3333
docker-compose down

app/lib/BackupSetsExecutor.scala

+9-4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import lib.settings.Settings
1414
import monix.eval.Task
1515
import monix.execution.{Cancelable, Scheduler}
1616

17+
import scala.concurrent.Future
1718
import scala.concurrent.duration._
1819

1920
class BackupSetsExecutor @Inject()(dao: Dao,
@@ -34,8 +35,10 @@ class BackupSetsExecutor @Inject()(dao: Dao,
3435
} yield {
3536
session match {
3637
case Some(sid) =>
37-
if (!suspended) executeWaitingBackupSets()(sid)
38-
else {
38+
if (!suspended) {
39+
executeWaitingBackupSets()(sid)
40+
()
41+
} else {
3942
logger.info("Execution of backup sets is suspended")
4043
}
4144

@@ -46,13 +49,13 @@ class BackupSetsExecutor @Inject()(dao: Dao,
4649
}
4750
}
4851

49-
private def executeWaitingBackupSets()(implicit session: ServerSession): Unit = {
52+
private[lib] def executeWaitingBackupSets()(implicit session: ServerSession): Future[Either[AppException, List[BackupSet]]] = {
5053
logger.debug("Executing waiting backup sets")
5154

5255
(for {
5356
sets <- dao.listBackupSetsToExecute()
5457
_ = logger.debug(s"Backup sets to be executed: ${sets.mkString("\n")}")
55-
_ <- sets.map(execute).sequentially
58+
_ <- sets.map(execute).inparallel
5659
} yield {
5760
sets
5861
}).runAsync {
@@ -92,10 +95,12 @@ class BackupSetsExecutor @Inject()(dao: Dao,
9295
.start(RunningTask.BackupSetUpload(bs.name)) {
9396
(for {
9497
_ <- dao.markAsProcessing(bs.id)
98+
_ = logger.debug(s"Backup set $bs marked as processing")
9599
_ <- updateUi()
96100
files <- dao.listFilesInBackupSet(bs.id)
97101
_ <- files.map(filesHandler.upload(_)).inparallel
98102
_ <- dao.markAsExecutedNow(bs.id)
103+
_ = logger.debug(s"Backup set $bs marked as executed")
99104
_ <- updateUi()
100105
_ <- wsApiController.send(
101106
"backupFinish",

app/lib/db/Dao.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ class Dao(blockingScheduler: Scheduler) extends StrictLogging {
332332

333333
def listFilesInBackupSet(id: Long): Result[List[File]] = EitherT {
334334
Task {
335-
logger.debug(s"Listing files from backup set from DB")
335+
logger.debug(s"Listing files from backup set $id from DB")
336336

337337
val files = DB.readOnly { implicit session =>
338338
sql"""select * from backup_sets_files where set_id=${id}"""
@@ -343,7 +343,7 @@ class Dao(blockingScheduler: Scheduler) extends StrictLogging {
343343
.apply()
344344
}
345345

346-
logger.debug(s"Retrieved backup set files: $files")
346+
logger.debug(s"Retrieved backup set files for BS $id: $files")
347347

348348
Right(files): Either[AppException, List[File]]
349349
}.executeOnScheduler(blockingScheduler)
@@ -369,7 +369,7 @@ class Dao(blockingScheduler: Scheduler) extends StrictLogging {
369369

370370
def markAsProcessing(backupSetId: Long, processing: Boolean = true): Result[Unit] = EitherT {
371371
Task {
372-
logger.debug(s"Updating backed up set $backupSetId processing flag in DB tp $processing")
372+
logger.debug(s"Updating backed up set $backupSetId processing flag in DB to $processing")
373373

374374
DB.autoCommit { implicit session =>
375375
sql"""update backup_sets set processing = ${processing} where id = ${backupSetId} """.update().apply()
@@ -402,7 +402,7 @@ class Dao(blockingScheduler: Scheduler) extends StrictLogging {
402402
logger.debug(s"Listing files from backup set from DB")
403403

404404
val sets = DB.readOnly { implicit session =>
405-
sql"""select * from backup_sets where last_execution is null OR (last_execution < DATEADD('MINUTE',-1 * frequency, now())) AND processing = false"""
405+
sql"""select * from backup_sets where (last_execution is null OR (last_execution < DATEADD('MINUTE',-1 * frequency, now()))) AND processing = false"""
406406
.map(BackupSet.apply)
407407
.list()
408408
.apply()

test/lib/BackupSetExecutorTest.scala

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package lib
2+
3+
import better.files.File
4+
import cats.data.EitherT
5+
import com.typesafe.scalalogging.StrictLogging
6+
import controllers.WsApiController
7+
import lib.App._
8+
import lib.db.Dao
9+
import lib.server.serverapi.{RemoteFile, UploadResponse}
10+
import lib.settings.Settings
11+
import monix.eval.Task
12+
import monix.execution.Scheduler
13+
import monix.execution.Scheduler.Implicits.global
14+
import org.http4s.Uri
15+
import org.mockito.ArgumentMatchers
16+
import org.mockito.Mockito._
17+
import org.mockito.invocation.InvocationOnMock
18+
import org.scalatest.concurrent.{Eventually, ScalaFutures}
19+
import org.scalatest.mockito.MockitoSugar
20+
import org.scalatest.time.{Seconds, Span}
21+
import scalikejdbc.{ConnectionPool, DB, _}
22+
import utils.TestOps.ResultOps
23+
24+
import scala.concurrent.duration._
25+
26+
class BackupSetExecutorTest extends TestWithDB with MockitoSugar with ScalaFutures with StrictLogging with Eventually {
27+
28+
private implicit val pat: PatienceConfig = PatienceConfig(timeout = Span(5, Seconds))
29+
30+
private implicit val ss: ServerSession = ServerSession(Uri.unsafeFromString("http://localhost"), "sesssionId", AppVersion(0, 1, 0))
31+
private val blockingScheduler = Scheduler.io()
32+
private val dao = new Dao(blockingScheduler)
33+
34+
test("doesn't execute backup set twice in parallel") {
35+
val filesHandler: FilesHandler = mock[FilesHandler]
36+
when(filesHandler.upload(ArgumentMatchers.any())(ArgumentMatchers.any())).thenReturn(EitherT {
37+
Task {
38+
Right(List(Some(UploadResponse.Uploaded(mock[RemoteFile])))): Either[AppException, List[Option[UploadResponse]]]
39+
}.delayResult(1.second)
40+
})
41+
42+
val tasksManager: TasksManager = mock[TasksManager]
43+
when(tasksManager.start(ArgumentMatchers.any())(ArgumentMatchers.any())).thenAnswer((invocation: InvocationOnMock) => {
44+
invocation.getArgument[Result[Unit]](1)
45+
})
46+
47+
val wsApiController: WsApiController = mock[WsApiController]
48+
when(wsApiController.send(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(pureResult(()))
49+
50+
val bse = new BackupSetsExecutor(dao, filesHandler, tasksManager, wsApiController, blockingScheduler, new Settings(dao))
51+
52+
assertResult(Right(List.empty))(bse.executeWaitingBackupSets().futureValue)
53+
54+
val bs1 = dao.createBackupSet("ahoj").unwrappedFutureValue
55+
val bs2 = dao.createBackupSet("ahoj2").unwrappedFutureValue
56+
57+
dao.updateFilesInBackupSet(bs1.id, Seq(File("/tmp/1"), File("/tmp/2"))).unwrappedFutureValue
58+
dao.updateFilesInBackupSet(bs2.id, Seq(File("/tmp/3"), File("/tmp/4"))).unwrappedFutureValue
59+
60+
dao.markAsProcessing(bs1.id, processing = false).unwrappedFutureValue
61+
dao.markAsProcessing(bs2.id, processing = false).unwrappedFutureValue
62+
63+
// execute the backup
64+
val executed = bse.executeWaitingBackupSets()
65+
66+
Thread.sleep(300) // delay which the backup sets need to be started
67+
68+
assertResult(Right(List.empty))(bse.executeWaitingBackupSets().futureValue)
69+
70+
assertResult(Right(List(bs1, bs2).map(_.id)))(executed.futureValue.map(_.map(_.id))) // test this AFTER the previous test
71+
}
72+
}

test/lib/db/DaoTest.scala

+19
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,25 @@ class DaoTest extends TestWithDB {
112112
bs2.copy(frequency = Duration.ofMinutes(120)),
113113
bs4.copy(frequency = Duration.ofMinutes(400))
114114
))(dao.listBackupSetsToExecute().unwrappedFutureValue.map(_.copy(lastExecution = None)))
115+
}
116+
117+
test("markAsProcessing -> listBackupSetsToExecute") {
118+
val dao = new Dao(Scheduler.io())
119+
120+
dao.createBackupSet("ahoj").unwrappedFutureValue
121+
dao.createBackupSet("ahoj2").unwrappedFutureValue
122+
dao.createBackupSet("ahoj3").unwrappedFutureValue
123+
dao.createBackupSet("ahoj4").unwrappedFutureValue
124+
dao.createBackupSet("ahoj5").unwrappedFutureValue
125+
126+
val List(bs, bs2, bs3, bs4, bs5) = dao.listAllBackupSets().unwrappedFutureValue
115127

128+
dao.markAsProcessing(bs.id).unwrappedFutureValue
129+
dao.markAsProcessing(bs3.id).unwrappedFutureValue
130+
dao.markAsProcessing(bs5.id).unwrappedFutureValue
131+
132+
assertResult(List(bs2, bs4).map(_.id)) {
133+
dao.listBackupSetsToExecute().unwrappedFutureValue.map(_.id)
134+
}
116135
}
117136
}

0 commit comments

Comments
 (0)