Skip to content
This repository has been archived by the owner on Apr 17, 2024. It is now read-only.

Commit

Permalink
Use use-as-needed ByteBuf to reduce the memory footprint of each core
Browse files Browse the repository at this point in the history
  • Loading branch information
Eugene-Mark committed Jun 28, 2020
1 parent b63a27c commit d2e26ee
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,9 @@ object NettyByteBufferPool extends Logging {
private val allocatedBufferPool: Stack[ByteBuf] = Stack[ByteBuf]()
private var reachRead = false
private val allocator = UnpooledByteBufAllocator.DEFAULT
private var bufferMap: Map[ByteBuf, Long] = Map()

def allocateNewBuffer(bufSize: Int): ByteBuf = synchronized {
if (fixedBufferSize == 0) {
fixedBufferSize = bufSize
} else if (bufSize > fixedBufferSize) {
throw new RuntimeException(s"allocateNewBuffer, expected size is ${fixedBufferSize}, actual size is ${bufSize}")
}
allocatedBufRenCnt.getAndIncrement()
allocatedBytes.getAndAdd(bufSize)
if (allocatedBytes.get > peakAllocatedBytes.get) {
Expand All @@ -33,17 +29,35 @@ object NettyByteBufferPool extends Logging {
} else {
allocator.directBuffer(bufSize, bufSize)
}*/
allocator.directBuffer(bufSize, bufSize)

val byteBuf = allocator.directBuffer(bufSize, bufSize)
bufferMap += (byteBuf -> bufSize)
byteBuf

} catch {
case e : Throwable =>
logError(s"allocateNewBuffer size is ${bufSize}")
throw e
}
}

def allocateFlexibleNewBuffer(bufSize: Int): ByteBuf = synchronized {
val initialCapacity = 65536
val maxCapacity = bufSize * 2
val byteBuf = allocator.directBuffer(initialCapacity, maxCapacity)
bufferMap += (byteBuf -> bufSize)
byteBuf
}

def releaseBuffer(buf: ByteBuf): Unit = synchronized {
allocatedBufRenCnt.getAndDecrement()
allocatedBytes.getAndAdd(0 - fixedBufferSize)
try {
val bufSize = bufferMap(buf)
allocatedBytes.getAndAdd(bufSize)

} catch {
case e: NoSuchElementException => {}
}
buf.clear()
//allocatedBufferPool.push(buf)
buf.release(buf.refCnt())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.apache.spark.storage.pmof

import java.io.OutputStream
import java.nio.ByteBuffer

import io.netty.buffer.{ByteBuf, PooledByteBufAllocator}
import org.apache.spark.internal.Logging
Expand All @@ -19,21 +18,25 @@ class PmemOutputStream(
val length: Int = bufferSize
var bufferFlushedSize: Int = 0
var bufferRemainingSize: Int = 0
val buf: ByteBuf = NettyByteBufferPool.allocateNewBuffer(length)
val byteBuffer: ByteBuffer = buf.nioBuffer(0, length)
val buf: ByteBuf = NettyByteBufferPool.allocateFlexibleNewBuffer(length);
/**
* Fix size byteBuffer, it will make each core occupy unwanted extra memory space
* val byteBuffer: ByteBuffer = buf.nioBuffer(0, length)
*/

override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
byteBuffer.put(bytes, off, len)
buf.writeBytes(bytes, off, len)
bufferRemainingSize += len
}

override def write(byte: Int): Unit = {
byteBuffer.putInt(byte)
buf.writeInt(byte)
bufferRemainingSize += 4
}

override def flush(): Unit = {
if (bufferRemainingSize > 0) {
val byteBuffer = buf.nioBuffer()
persistentMemoryWriter.setPartition(numPartitions, blockId, byteBuffer, bufferRemainingSize, set_clean)
bufferFlushedSize += bufferRemainingSize
bufferRemainingSize = 0
Expand All @@ -54,7 +57,7 @@ class PmemOutputStream(
def reset(): Unit = {
bufferRemainingSize = 0
bufferFlushedSize = 0
byteBuffer.clear()
buf.clear()
}

override def close(): Unit = synchronized {
Expand Down

0 comments on commit d2e26ee

Please sign in to comment.