Skip to content

Commit

Permalink
text-files-util: [#9] add mergeFilesDirect; add optimization in mergi…
Browse files Browse the repository at this point in the history
…ng for case controlDiskspace = false
  • Loading branch information
sszuev committed Mar 15, 2024
1 parent 424dcb9 commit 79ee084
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 34 deletions.
71 changes: 68 additions & 3 deletions src/main/kotlin/files/FileMerge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import kotlin.math.min
* The [invert] method can be used to rewrite content in direct order.
*
* The method allocates `[allocatedMemorySizeInBytes] * [writeToTotalMemRatio]` bytes for write operation,
* and `sourceFileSize{i} * [allocatedMemorySizeInBytes] * (1 - [writeToTotalMemRatio]) / sum (sourceFileSize{1} + ... sourceFileSize{N})` bytes for each read operation.
* and `sourceFileSize{i} * [allocatedMemorySizeInBytes] * (1 - [writeToTotalMemRatio]) / sum (sourceFileSize{1} + ... sourceFileSize{N})`
* bytes for each read operation.
*
* Note that total memory consumption is greater than [allocatedMemorySizeInBytes], since each operation requires some temporal data.
* Note that total memory consumption is greater than [allocatedMemorySizeInBytes],
* since each operation requires some temporal data.
*
* If [controlDiskspace] = `true` then source files will be truncated while process and completely deleted at the end of process.
* When control diskspace is enabled, the method execution can take a long time.
Expand All @@ -44,6 +46,7 @@ import kotlin.math.min
* @param [writeToTotalMemRatio] ratio of memory allocated for write operations to [total allocated memory][allocatedMemorySizeInBytes]
* @param [coroutineScope][CoroutineScope]
*/
@Suppress("DuplicatedCode")
fun mergeFilesInverse(
sources: Set<Path>,
target: Path,
Expand All @@ -56,7 +59,7 @@ fun mergeFilesInverse(
coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.IO) + CoroutineName("mergeFilesInverse")
) {
require(sources.size > 1) { "Number of given sources (${sources.size}) must greater than 1" }
require(writeToTotalMemRatio > 0.0 && writeToTotalMemRatio < 1.0)
require(writeToTotalMemRatio > 0.0 && writeToTotalMemRatio < 1.0) { "writeToTotalMemRatio must fall within the range of 0 to 1" }

val writeBufferSize =
max((allocatedMemorySizeInBytes * writeToTotalMemRatio).toInt(), MERGE_FILES_MIN_WRITE_BUFFER_SIZE_IN_BYTES)
Expand All @@ -83,6 +86,68 @@ fun mergeFilesInverse(
)
}

/**
* Merges files into single one with fixed memory allocation.
* Source files must be sorted.
* In opposite [mergeFilesInverse], files are read from the beginning to the end.
*
* The method allocates `[allocatedMemorySizeInBytes] * [writeToTotalMemRatio]` bytes for write operation,
* and `sourceFileSize{i} * [allocatedMemorySizeInBytes] * (1 - [writeToTotalMemRatio]) / sum (sourceFileSize{1} + ... sourceFileSize{N})`
* bytes for each read operation.
*
* Note that total memory consumption is greater than [allocatedMemorySizeInBytes],
* since each operation requires some temporal data.
*
* @param [sources][Set]<[Path]>
* @param [target][Path]
* @param [comparator][Comparator]<[ByteArray]>
* @param [delimiter][ByteArray] e.g. for UTF-16 `" " = [0, 32]`
* @param [bomSymbols][ByteArray] e.g. for UTF-16 `[-2, -1]`
* @param [delimiter][String]
* @param [allocatedMemorySizeInBytes] = `chunkSizeSize + writeBufferSize + 2 * readBufferSize`, approximate memory consumption; number of bytes
* @param [writeToTotalMemRatio] ratio of memory allocated for write operations to [total allocated memory][allocatedMemorySizeInBytes]
* @param [coroutineScope][CoroutineScope]
*/
@Suppress("DuplicatedCode")
fun mergeFilesDirect(
sources: Set<Path>,
target: Path,
comparator: Comparator<ByteArray> = byteArrayStringComparator().reversed(),
delimiter: ByteArray = "\n".toByteArray(Charsets.UTF_8),
bomSymbols: ByteArray = byteArrayOf(),
allocatedMemorySizeInBytes: Int = 2 * MERGE_FILES_MIN_WRITE_BUFFER_SIZE_IN_BYTES,
writeToTotalMemRatio: Double = MERGE_FILES_WRITE_BUFFER_TO_TOTAL_MEMORY_ALLOCATION_RATIO,
coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.IO) + CoroutineName("mergeFilesInverse")
) {
require(sources.size > 1) { "Number of given sources (${sources.size}) must greater than 1" }
require(writeToTotalMemRatio > 0.0 && writeToTotalMemRatio < 1.0) { "writeToTotalMemRatio must fall within the range of 0 to 1" }

val writeBufferSize =
max((allocatedMemorySizeInBytes * writeToTotalMemRatio).toInt(), MERGE_FILES_MIN_WRITE_BUFFER_SIZE_IN_BYTES)
val readBuffersSize = max(allocatedMemorySizeInBytes - writeBufferSize, MERGE_FILES_MIN_READ_BUFFER_SIZE_IN_BYTES)
val filesSize = sources.sumOf { it.fileSize() }
val readFilesSizeRatio = readBuffersSize.toDouble() / filesSize

val writeBuffer = ByteBuffer.allocateDirect(writeBufferSize)
val sourceBuffers = sources.associateWith { file ->
val size = max((readFilesSizeRatio * file.fileSize()).toInt(), MERGE_FILES_MIN_READ_BUFFER_SIZE_IN_BYTES)
ByteBuffer.allocateDirect(size)
}

mergeFiles(
sources = sources,
target = target,
comparator = comparator,
delimiter = delimiter,
bomSymbols = bomSymbols,
sourceBuffer = { checkNotNull(sourceBuffers[it]) },
targetBuffer = { writeBuffer },
controlDiskspace = false,
direct = true,
coroutineScope = coroutineScope
)
}

/**
* Merges two files into a single one with fixed memory allocation.
* Source files must be sorted.
Expand Down
119 changes: 98 additions & 21 deletions src/main/kotlin/files/MergeSort.kt
Original file line number Diff line number Diff line change
Expand Up @@ -294,47 +294,122 @@ suspend fun suspendSort(
tmpTarget.createFile()
val parts = mutableSetOf<Path>()
try {
parts.addAll(
suspendSplitAndSort(
if (controlDiskspace) {
parts.addAll(
suspendSplitAndSort(
source = source,
comparator = comparator.reversed(),
delimiter = delimiter,
bomSymbols = bomSymbols,
allocatedMemorySizeInBytes = allocatedMemorySizeInBytes,
controlDiskspace = true,
coroutineScope = coroutineScope,
)
)
mergeInverseParts(
parts = parts,
source = source,
comparator = comparator.reversed(),
target = tmpTarget,
comparator = comparator,
delimiter = delimiter,
bomSymbols = bomSymbols,
numberOfOpenDescriptors = numberOfOpenDescriptors,
allocatedMemorySizeInBytes = allocatedMemorySizeInBytes,
controlDiskspace = controlDiskspace,
coroutineScope = coroutineScope,
)
)
mergeInverseParts(
parts = parts,
source = source,
tmpTarget = tmpTarget,
comparator = comparator,
delimiter = delimiter,
bomSymbols = bomSymbols,
controlDiskspace = controlDiskspace,
numberOfOpenDescriptors = numberOfOpenDescriptors,
allocatedMemorySizeInBytes = allocatedMemorySizeInBytes,
coroutineScope = coroutineScope,
)
} else {
parts.addAll(
suspendSplitAndSort(
source = source,
comparator = comparator,
delimiter = delimiter,
bomSymbols = bomSymbols,
allocatedMemorySizeInBytes = allocatedMemorySizeInBytes,
controlDiskspace = false,
coroutineScope = coroutineScope,
)
)
mergeDirectParts(
parts = parts,
source = source,
target = tmpTarget,
comparator = comparator,
delimiter = delimiter,
bomSymbols = bomSymbols,
numberOfOpenDescriptors = numberOfOpenDescriptors,
allocatedMemorySizeInBytes = allocatedMemorySizeInBytes,
coroutineScope = coroutineScope,
)
}
} finally {
parts.deleteAll()
}
tmpTarget.moveTo(target = target, overwrite = true)
}

private fun mergeDirectParts(
parts: MutableSet<Path>,
source: Path,
target: Path,
comparator: Comparator<ByteArray>,
delimiter: ByteArray,
bomSymbols: ByteArray,
numberOfOpenDescriptors: Int,
allocatedMemorySizeInBytes: Int,
coroutineScope: CoroutineScope,
) {
var fileCounter = parts.size
while (parts.isNotEmpty()) {
if (parts.size <= numberOfOpenDescriptors) {
mergeFilesDirect(
sources = parts,
target = target,
comparator = comparator,
delimiter = delimiter,
bomSymbols = bomSymbols,
allocatedMemorySizeInBytes = allocatedMemorySizeInBytes,
coroutineScope = coroutineScope,
)
parts.deleteAll()
parts.clear()
return
}
val chunkSize = calcChunkSize(parts.size.toLong(), numberOfOpenDescriptors)
val fileGroups = parts.chunked(chunkSize).map { it.toSet() }
fileGroups.forEach { files ->
val targetFile = source + ("." + ++fileCounter + ".part")
targetFile.createFile()
mergeFilesDirect(
sources = files,
target = targetFile,
comparator = comparator,
delimiter = delimiter,
bomSymbols = bomSymbols,
allocatedMemorySizeInBytes = allocatedMemorySizeInBytes,
coroutineScope = coroutineScope,
)
parts.add(targetFile)
files.deleteAll()
parts.removeAll(files)
}
}
}

private fun mergeInverseParts(
parts: MutableSet<Path>,
source: Path,
tmpTarget: Path,
target: Path,
comparator: Comparator<ByteArray>,
delimiter: ByteArray,
bomSymbols: ByteArray,
controlDiskspace: Boolean,
numberOfOpenDescriptors: Int,
allocatedMemorySizeInBytes: Int,
coroutineScope: CoroutineScope,
) {

//
// pseudocode:
//
// while (true) {
// if (reversed.size == 1 && direct.isEmpty) {
// return reversed(invert[0])
Expand All @@ -356,7 +431,9 @@ private fun mergeInverseParts(
// reverse.add(res)
// }
// }
//

val controlDiskspace = true
val reversed = mutableSetOf<Path>()
val direct = mutableSetOf<Path>()
reversed.addAll(parts)
Expand All @@ -366,14 +443,14 @@ private fun mergeInverseParts(
throw IllegalStateException("Should not happen")
}
if (reversed.isEmpty() && direct.size == 1) {
direct.single().moveTo(target = tmpTarget, overwrite = true)
direct.single().moveTo(target = target, overwrite = true)
parts.removeAll(direct)
break
}
if (reversed.size == 1 && direct.isEmpty()) {
invert(
source = reversed.single(),
target = tmpTarget,
target = target,
controlDiskspace = controlDiskspace,
delimiter = delimiter,
bomSymbols = bomSymbols,
Expand Down
40 changes: 30 additions & 10 deletions src/test/kotlin/MergeSortTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ internal class MergeSortTest {
}

@Test
fun `test split and sort (small file, IO dispatcher, direct, UTF-8, new-line delimiter)`(@TempDir dir: Path) =
runBlocking(Dispatchers.IO) {
fun `test split and sort (small file, default dispatcher, direct, UTF-8, new-line delimiter)`(@TempDir dir: Path) =
runBlocking(Dispatchers.Default) {
testSplitAndSort(
dir = dir,
content = (1..424).map { Random.nextInt().toString() },
Expand All @@ -164,8 +164,8 @@ internal class MergeSortTest {
}

@Test
fun `test split and sort (big file, default dispatcher, reverse, UTF-16, semicolon delimiter)`(@TempDir dir: Path) =
runBlocking(Dispatchers.Default) {
fun `test split and sort (big file, IO dispatcher, reverse, UTF-16, semicolon delimiter)`(@TempDir dir: Path) =
runBlocking(Dispatchers.IO) {
testSplitAndSort(
dir = dir,
content = (1..424242).map { Random.nextDouble().toString() },
Expand Down Expand Up @@ -204,21 +204,40 @@ internal class MergeSortTest {
}

@Test
fun `test suspended sort large file with big mem allocation`(@TempDir dir: Path): Unit =
fun `test suspended sort large file with big mem allocation and control diskspace`(@TempDir dir: Path): Unit =
runBlocking(Dispatchers.IO) {
testSuspendSortLargeFile(
dir = dir,
numLines = 200_000,
numDuplicates = 10_000,
approximateNumberOfParts = 4
approximateNumberOfParts = 4,
controlDiskspace = true,
)
}

@Test
fun `test suspended sort large file with small mem allocation`(@TempDir dir: Path): Unit =
fun `test suspended sort large file with small mem allocation and control diskspace`(@TempDir dir: Path): Unit =
runBlocking(Dispatchers.IO) {
testSuspendSortLargeFile(dir = dir, numLines = 10_000, numDuplicates = 5_000, approximateNumberOfParts = 90)
}
testSuspendSortLargeFile(
dir = dir,
numLines = 10_000,
numDuplicates = 5_000,
approximateNumberOfParts = 90,
controlDiskspace = true,
)
}

@Test
fun `test suspended sort large file with small mem allocation and without control diskspace`(@TempDir dir: Path): Unit =
runBlocking(Dispatchers.IO) {
testSuspendSortLargeFile(
dir = dir,
numLines = 15_000,
numDuplicates = 5_000,
approximateNumberOfParts = 100,
controlDiskspace = false,
)
}

@Test
fun `test blocking sort large file`(@TempDir dir: Path) {
Expand Down Expand Up @@ -491,6 +510,7 @@ internal class MergeSortTest {
numLines: Int = 200_000,
numDuplicates: Int = 10_000,
approximateNumberOfParts: Int = 3,
controlDiskspace: Boolean = true,
) {
val source = generateLargeFile(numLines, numDuplicates) {
Files.createTempFile(dir, "xxx-merge-sort-source-", ".xxx")
Expand All @@ -509,7 +529,7 @@ internal class MergeSortTest {
source = source,
target = target,
delimiter = "\n",
controlDiskspace = true,
controlDiskspace = controlDiskspace,
charset = Charsets.UTF_8,
allocatedMemorySizeInBytes = allocatedMemory,
comparator = comparator,
Expand Down

0 comments on commit 79ee084

Please sign in to comment.