@@ -6,11 +6,12 @@ import com.google.inject.name.Named
66import com .scalableminds .util .io .PathUtils
77import com .scalableminds .util .io .PathUtils .ensureDirectoryBox
88import com .scalableminds .util .mvc .Formatter
9+ import com .scalableminds .util .objectid .ObjectId
910import com .scalableminds .util .time .Instant
1011import com .scalableminds .util .tools .{Fox , FoxImplicits , JsonHelper }
1112import com .scalableminds .webknossos .datastore .DataStoreConfig
1213import com .scalableminds .webknossos .datastore .dataformats .{MagLocator , MappingProvider }
13- import com .scalableminds .webknossos .datastore .helpers .{DatasetDeleter , IntervalScheduler }
14+ import com .scalableminds .webknossos .datastore .helpers .{DatasetDeleter , IntervalScheduler , MagLinkInfo }
1415import com .scalableminds .webknossos .datastore .models .datasource ._
1516import com .scalableminds .webknossos .datastore .models .datasource .inbox .{InboxDataSource , UnusableDataSource }
1617import com .scalableminds .webknossos .datastore .storage .{
@@ -474,7 +475,7 @@ class DataSourceService @Inject()(
474475 res
475476 }
476477
477- def datasetInControlledS3 (dataSource : DataSource ) = {
478+ def datasetInControlledS3 (dataSource : DataSource ): Boolean = {
478479 def commonPrefix (strings : Seq [String ]): String = {
479480 if (strings.isEmpty) return " "
480481
@@ -514,9 +515,7 @@ class DataSourceService @Inject()(
514515 .requestChecksumCalculation(RequestChecksumCalculation .WHEN_REQUIRED )
515516 .build()
516517
517- def deleteFromControlledS3 (dataSource : DataSource ): Fox [Unit ] = {
518- // TODO: Do we handle other datasets using the same layers?
519-
518+ def deleteFromControlledS3 (dataSource : DataSource , datasetId : ObjectId ): Fox [Unit ] = {
520519 def deleteBatch (bucket : String , keys : Seq [String ]): Fox [DeleteObjectsResponse ] =
521520 if (keys.isEmpty) Fox .empty
522521 else {
@@ -562,18 +561,27 @@ class DataSourceService @Inject()(
562561
563562 for {
564563 _ <- Fox .successful(())
565- paths = dataSource.allExplicitPaths
566- // Assume everything is in the same bucket
567- firstPath <- paths.headOption.toFox ?~> " No explicit paths found for dataset in controlled S3"
568- bucket <- S3DataVault
569- .hostBucketFromUri(new URI (firstPath))
570- .toFox ?~> s " Could not determine S3 bucket from path $firstPath"
571- prefixes <- Fox .combined(paths.map(path => S3DataVault .objectKeyFromUri(new URI (path)).toFox))
572- keys : Seq [String ] <- Fox .serialCombined(prefixes)(listKeysAtPrefix(bucket, _)).map(_.flatten)
573- uniqueKeys = keys.distinct
574- _ = logger.info(
575- s " Deleting ${uniqueKeys.length} objects from controlled S3 bucket $bucket for dataset ${dataSource.id}" )
576- _ <- Fox .serialCombined(uniqueKeys.grouped(1000 ).toSeq)(deleteBatch(bucket, _)).map(_ => ())
564+ layersAndLinkedMags <- remoteWebknossosClient.fetchPaths(datasetId)
565+ magsLinkedByOtherDatasets : Set [MagLinkInfo ] = layersAndLinkedMags
566+ .flatMap(layerInfo => layerInfo.magLinkInfos.filter(_.linkedMags.nonEmpty))
567+ .toSet
568+ linkedMagPaths = magsLinkedByOtherDatasets.flatMap(_.linkedMags).flatMap(_.path)
569+ paths = dataSource.allExplicitPaths.filterNot(path => linkedMagPaths.contains(path))
570+ _ <- Fox .runIf(paths.nonEmpty)({
571+ for {
572+ // Assume everything is in the same bucket
573+ firstPath <- paths.headOption.toFox
574+ bucket <- S3DataVault
575+ .hostBucketFromUri(new URI (firstPath))
576+ .toFox ?~> s " Could not determine S3 bucket from path $firstPath"
577+ prefixes <- Fox .combined(paths.map(path => S3DataVault .objectKeyFromUri(new URI (path)).toFox))
578+ keys : Seq [String ] <- Fox .serialCombined(prefixes)(listKeysAtPrefix(bucket, _)).map(_.flatten)
579+ uniqueKeys = keys.distinct
580+ _ = logger.info(
581+ s " Deleting ${uniqueKeys.length} objects from controlled S3 bucket $bucket for dataset ${dataSource.id}" )
582+ _ <- Fox .serialCombined(uniqueKeys.grouped(1000 ).toSeq)(deleteBatch(bucket, _)).map(_ => ())
583+ } yield ()
584+ })
577585 } yield ()
578586 }
579587}
0 commit comments