Skip to content

Commit 8b83f6e

Browse files
committed
Using buffer recycler to minimize memory allocation cost
1 parent 50164bc commit 8b83f6e

File tree

3 files changed

+196
-11
lines changed

3 files changed

+196
-11
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Copyright (C) 2011 the original author or authors.
3+
* See the NOTICE file distributed with this work for additional
4+
* information regarding copyright ownership.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.xerial.snappy;
19+
import java.lang.ref.SoftReference;
20+
21+
/**
22+
* Simple helper class to encapsulate details of basic buffer
23+
* recycling scheme, which helps a lot (as per profiling) for
24+
* smaller encoding cases.
25+
*
26+
* @author tatu
27+
*/
28+
class BufferRecycler
29+
{
30+
private final static int MIN_ENCODING_BUFFER = 4000;
31+
32+
private final static int MIN_OUTPUT_BUFFER = 8000;
33+
34+
/**
35+
* This <code>ThreadLocal</code> contains a {@link java.lang.ref.SoftReference}
36+
* to a {@link BufferRecycler} used to provide a low-cost
37+
* buffer recycling for buffers we need for encoding, decoding.
38+
*/
39+
final protected static ThreadLocal<SoftReference<BufferRecycler>> recyclerRef
40+
= new ThreadLocal<SoftReference<BufferRecycler>>();
41+
42+
43+
private byte[] inputBuffer;
44+
private byte[] outputBuffer;
45+
46+
private byte[] decodingBuffer;
47+
private byte[] encodingBuffer;
48+
49+
private short[] encodingHash;
50+
51+
52+
/**
53+
* Accessor to get thread-local recycler instance
54+
*/
55+
public static BufferRecycler instance()
56+
{
57+
SoftReference<BufferRecycler> ref = recyclerRef.get();
58+
59+
BufferRecycler bufferRecycler;
60+
if (ref == null) {
61+
bufferRecycler = null;
62+
}
63+
else {
64+
bufferRecycler = ref.get();
65+
}
66+
67+
if (bufferRecycler == null) {
68+
bufferRecycler = new BufferRecycler();
69+
recyclerRef.set(new SoftReference<BufferRecycler>(bufferRecycler));
70+
}
71+
return bufferRecycler;
72+
}
73+
74+
///////////////////////////////////////////////////////////////////////
75+
// Buffers for encoding (output)
76+
///////////////////////////////////////////////////////////////////////
77+
78+
public byte[] allocEncodingBuffer(int minSize)
79+
{
80+
byte[] buf = encodingBuffer;
81+
if (buf == null || buf.length < minSize) {
82+
buf = new byte[Math.max(minSize, MIN_ENCODING_BUFFER)];
83+
}
84+
else {
85+
encodingBuffer = null;
86+
}
87+
return buf;
88+
}
89+
90+
public void releaseEncodeBuffer(byte[] buffer)
91+
{
92+
if (encodingBuffer == null || buffer.length > encodingBuffer.length) {
93+
encodingBuffer = buffer;
94+
}
95+
}
96+
97+
public byte[] allocOutputBuffer(int minSize)
98+
{
99+
byte[] buf = outputBuffer;
100+
if (buf == null || buf.length < minSize) {
101+
buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)];
102+
}
103+
else {
104+
outputBuffer = null;
105+
}
106+
return buf;
107+
}
108+
109+
public void releaseOutputBuffer(byte[] buffer)
110+
{
111+
if (outputBuffer == null || (buffer != null && buffer.length > outputBuffer.length)) {
112+
outputBuffer = buffer;
113+
}
114+
}
115+
116+
public short[] allocEncodingHash(int suggestedSize)
117+
{
118+
short[] buf = encodingHash;
119+
if (buf == null || buf.length < suggestedSize) {
120+
buf = new short[suggestedSize];
121+
}
122+
else {
123+
encodingHash = null;
124+
}
125+
return buf;
126+
}
127+
128+
public void releaseEncodingHash(short[] buffer)
129+
{
130+
if (encodingHash == null || (buffer != null && buffer.length > encodingHash.length)) {
131+
encodingHash = buffer;
132+
}
133+
}
134+
135+
///////////////////////////////////////////////////////////////////////
136+
// Buffers for decoding (input)
137+
///////////////////////////////////////////////////////////////////////
138+
139+
public byte[] allocInputBuffer(int minSize)
140+
{
141+
byte[] buf = inputBuffer;
142+
if (buf == null || buf.length < minSize) {
143+
buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)];
144+
}
145+
else {
146+
inputBuffer = null;
147+
}
148+
return buf;
149+
}
150+
151+
public void releaseInputBuffer(byte[] buffer)
152+
{
153+
if (inputBuffer == null || (buffer != null && buffer.length > inputBuffer.length)) {
154+
inputBuffer = buffer;
155+
}
156+
}
157+
158+
public byte[] allocDecodeBuffer(int size)
159+
{
160+
byte[] buf = decodingBuffer;
161+
if (buf == null || buf.length < size) {
162+
buf = new byte[size];
163+
}
164+
else {
165+
decodingBuffer = null;
166+
}
167+
return buf;
168+
}
169+
170+
public void releaseDecodeBuffer(byte[] buffer)
171+
{
172+
if (decodingBuffer == null || (buffer != null && buffer.length > decodingBuffer.length)) {
173+
decodingBuffer = buffer;
174+
}
175+
}
176+
}

src/main/java/org/xerial/snappy/SnappyOutputStream.java

+19-10
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,13 @@ public class SnappyOutputStream extends OutputStream {
5656
static final int DEFAULT_BLOCK_SIZE = 32 * 1024; // Use 32kb for the default block size
5757

5858
protected final OutputStream out;
59+
60+
private final BufferRecycler recycler;
5961
private final int blockSize;
62+
protected final byte[] inputBuffer;
63+
protected final byte[] outputBuffer;
6064
private int inputCursor = 0;
61-
protected byte[] uncompressed;
6265
private int outputCursor = 0;
63-
protected byte[] outputBuffer;
6466

6567
public SnappyOutputStream(OutputStream out) {
6668
this(out, DEFAULT_BLOCK_SIZE);
@@ -73,9 +75,10 @@ public SnappyOutputStream(OutputStream out) {
7375
*/
7476
public SnappyOutputStream(OutputStream out, int blockSize) {
7577
this.out = out;
78+
this.recycler = BufferRecycler.instance();
7679
this.blockSize = Math.max(MIN_BLOCK_SIZE, blockSize);
77-
uncompressed = new byte[this.blockSize];
78-
outputBuffer = new byte[SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(this.blockSize)];
80+
inputBuffer = recycler.allocInputBuffer(this.blockSize);
81+
outputBuffer = recycler.allocOutputBuffer(SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(this.blockSize));
7982
outputCursor = SnappyCodec.currentHeader.writeHeader(outputBuffer, 0);
8083
}
8184

@@ -214,7 +217,7 @@ public void rawWrite(Object array, int byteOffset, int byteLength) throws IOExce
214217

215218
if(inputCursor + byteLength < MIN_BLOCK_SIZE) {
216219
// copy the input data to uncompressed buffer
217-
Snappy.arrayCopy(array, byteOffset, byteLength, uncompressed, inputCursor);
220+
Snappy.arrayCopy(array, byteOffset, byteLength, inputBuffer, inputCursor);
218221
inputCursor += byteLength;
219222
return;
220223
}
@@ -244,10 +247,10 @@ public void rawWrite(Object array, int byteOffset, int byteLength) throws IOExce
244247
*/
245248
@Override
246249
public void write(int b) throws IOException {
247-
if(inputCursor >= uncompressed.length) {
250+
if(inputCursor >= inputBuffer.length) {
248251
compressInput();
249252
}
250-
uncompressed[inputCursor++] = (byte) b;
253+
inputBuffer[inputCursor++] = (byte) b;
251254
}
252255

253256
/* (non-Javadoc)
@@ -291,7 +294,7 @@ protected void compressInput() throws IOException {
291294
if(!hasSufficientOutputBufferFor(inputCursor)) {
292295
dumpOutput();
293296
}
294-
int compressedSize = Snappy.compress(uncompressed, 0, inputCursor, outputBuffer, outputCursor + 4);
297+
int compressedSize = Snappy.compress(inputBuffer, 0, inputCursor, outputBuffer, outputCursor + 4);
295298
// Write compressed data size
296299
writeInt(outputBuffer, outputCursor, compressedSize);
297300
outputCursor += 4 + compressedSize;
@@ -306,8 +309,14 @@ protected void compressInput() throws IOException {
306309
*/
307310
@Override
308311
public void close() throws IOException {
309-
flush();
310-
out.close();
312+
try {
313+
flush();
314+
out.close();
315+
}
316+
finally {
317+
recycler.releaseInputBuffer(inputBuffer);
318+
recycler.releaseOutputBuffer(outputBuffer);
319+
}
311320
}
312321

313322
}

version.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version in ThisBuild := "1.1.1.2"
1+
version in ThisBuild := "1.1.1.3-SNAPSHOT"

0 commit comments

Comments
 (0)