Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public static int write(Map<String, String> map, ByteBuffer buffer) {
if (entry.getKey() == null) {
buffer.putInt(-1);
} else {
bytes = entry.getKey().getBytes();
bytes = entry.getKey().getBytes(TSFileConfig.STRING_CHARSET);
buffer.putInt(bytes.length);
buffer.put(bytes);
length += bytes.length;
Expand All @@ -194,7 +194,7 @@ public static int write(Map<String, String> map, ByteBuffer buffer) {
if (entry.getValue() == null) {
buffer.putInt(-1);
} else {
bytes = entry.getValue().getBytes();
bytes = entry.getValue().getBytes(TSFileConfig.STRING_CHARSET);
buffer.putInt(bytes.length);
buffer.put(bytes);
length += bytes.length;
Expand Down Expand Up @@ -509,7 +509,7 @@ public static int sizeToWrite(String s) {
if (s == null) {
return INT_LEN;
}
return INT_LEN + s.getBytes().length;
return INT_LEN + s.getBytes(TSFileConfig.STRING_CHARSET).length;
}

/** read a byte var from inputStream. */
Expand Down Expand Up @@ -1202,7 +1202,7 @@ public static void writeObject(Object value, DataOutputStream outputStream) {
outputStream.write(NONE.ordinal());
} else {
outputStream.write(STRING.ordinal());
byte[] bytes = value.toString().getBytes();
byte[] bytes = value.toString().getBytes(TSFileConfig.STRING_CHARSET);
outputStream.writeInt(bytes.length);
outputStream.write(bytes);
}
Expand Down Expand Up @@ -1238,7 +1238,7 @@ public static void writeObject(Object value, ByteBuffer byteBuffer) {
byteBuffer.putInt(NONE.ordinal());
} else {
byteBuffer.putInt(STRING.ordinal());
byte[] bytes = value.toString().getBytes();
byte[] bytes = value.toString().getBytes(TSFileConfig.STRING_CHARSET);
byteBuffer.putInt(bytes.length);
byteBuffer.put(bytes);
}
Expand Down Expand Up @@ -1271,7 +1271,7 @@ public static Object readObject(ByteBuffer buffer) {
length = buffer.getInt();
bytes = new byte[length];
buffer.get(bytes);
return new String(bytes);
return new String(bytes, TSFileConfig.STRING_CHARSET);
}
}

Expand Down
116 changes: 115 additions & 1 deletion java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -748,13 +748,26 @@ private Object createValueColumnOfDataType(TSDataType dataType, int capacity) {

/** Serialize {@link Tablet} */
public ByteBuffer serialize() throws IOException {
try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
final int serializedSize = serializedSize();
try (PublicBAOS byteArrayOutputStream = new PublicBAOS(serializedSize);
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
serialize(outputStream);
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
}

/** Return the exact serialized byte size of this tablet. */
public int serializedSize() {
int size = 0;
size = Math.addExact(size, ReadWriteIOUtils.sizeToWrite(insertTargetName));
size = Math.addExact(size, Integer.BYTES);
size = Math.addExact(size, serializedSizeOfMeasurementSchemas());
size = Math.addExact(size, serializedSizeOfTimes());
size = Math.addExact(size, serializedSizeOfBitMaps());
size = Math.addExact(size, serializedSizeOfValues());
return size;
}

public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(insertTargetName, stream);
ReadWriteIOUtils.write(rowSize, stream);
Expand All @@ -764,6 +777,107 @@ public void serialize(DataOutputStream stream) throws IOException {
writeValues(stream);
}

private int serializedSizeOfMeasurementSchemas() {
int size = Byte.BYTES;
if (schemas != null) {
size = Math.addExact(size, Integer.BYTES);
for (int i = 0; i < schemas.size(); i++) {
size = Math.addExact(size, Byte.BYTES);
final IMeasurementSchema schema = schemas.get(i);
if (schema != null) {
size = Math.addExact(size, schema.serializedSize());
size = Math.addExact(size, Byte.BYTES);
}
}
}
return size;
}

private int serializedSizeOfTimes() {
int size = Byte.BYTES;
if (timestamps != null) {
size = Math.addExact(size, Math.multiplyExact(Long.BYTES, rowSize));
}
return size;
}

private int serializedSizeOfBitMaps() {
int size = Byte.BYTES;
if (bitMaps != null) {
final int columnCount = schemas == null ? 0 : schemas.size();
for (int i = 0; i < columnCount; i++) {
if (bitMaps[i] == null || bitMaps[i].isAllUnmarked(rowSize)) {
size = Math.addExact(size, Byte.BYTES);
} else {
size = Math.addExact(size, Byte.BYTES);
size = Math.addExact(size, Integer.BYTES);
size =
Math.addExact(
size,
ReadWriteIOUtils.sizeToWrite(
new Binary(bitMaps[i].getTruncatedByteArray(rowSize))));
}
}
}
return size;
}

private int serializedSizeOfValues() {
int size = Byte.BYTES;
if (values != null) {
final int columnCount = schemas == null ? 0 : schemas.size();
for (int i = 0; i < columnCount; i++) {
size = Math.addExact(size, serializedSizeOfColumn(schemas.get(i).getType(), values[i]));
}
}
return size;
}

private int serializedSizeOfColumn(final TSDataType dataType, final Object column) {
int size = Byte.BYTES;
if (column == null) {
return size;
}
switch (dataType) {
case INT32:
return Math.addExact(size, Math.multiplyExact(Integer.BYTES, rowSize));
case DATE:
return Math.addExact(size, Math.multiplyExact(Integer.BYTES, rowSize));
case INT64:
case TIMESTAMP:
return Math.addExact(size, Math.multiplyExact(Long.BYTES, rowSize));
case FLOAT:
return Math.addExact(size, Math.multiplyExact(Float.BYTES, rowSize));
case DOUBLE:
return Math.addExact(size, Math.multiplyExact(Double.BYTES, rowSize));
case BOOLEAN:
return Math.addExact(size, rowSize);
case TEXT:
case STRING:
case BLOB:
case OBJECT:
return Math.addExact(size, serializedSizeOfBinaryValues((Binary[]) column));
default:
throw new UnSupportedDataTypeException(
Messages.format("error.write.type_not_supported", dataType));
}
}

private static int serializedSizeOfBinaryValues(final Binary[] binaryValues, final int rowSize) {
int size = 0;
for (int j = 0; j < rowSize; j++) {
size = Math.addExact(size, Byte.BYTES);
if (binaryValues[j] != null) {
size = Math.addExact(size, ReadWriteIOUtils.sizeToWrite(binaryValues[j]));
}
}
return size;
}

private int serializedSizeOfBinaryValues(final Binary[] binaryValues) {
return serializedSizeOfBinaryValues(binaryValues, rowSize);
}

/** Serialize {@link MeasurementSchema}s */
private void writeMeasurementSchemas(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(BytesUtils.boolToByte(schemas != null), stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,15 @@ public int serializeTo(OutputStream outputStream) throws IOException {
@Override
public int serializedSize() {
int byteLen = 0;
byteLen += ReadWriteIOUtils.sizeToWrite(measurementName);
byteLen += 3 * Byte.BYTES;
byteLen = Math.addExact(byteLen, ReadWriteIOUtils.sizeToWrite(measurementName));
byteLen = Math.addExact(byteLen, 3 * Byte.BYTES);
if (props == null) {
byteLen += Integer.BYTES;
byteLen = Math.addExact(byteLen, Integer.BYTES);
} else {
byteLen += Integer.BYTES;
byteLen = Math.addExact(byteLen, Integer.BYTES);
for (Map.Entry<String, String> entry : props.entrySet()) {
byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
byteLen = Math.addExact(byteLen, ReadWriteIOUtils.sizeToWrite(entry.getKey()));
byteLen = Math.addExact(byteLen, ReadWriteIOUtils.sizeToWrite(entry.getValue()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ public void mapSerdeTest() {
Assert.assertNotNull(result);
Assert.assertEquals(map, result);

ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
ReadWriteIOUtils.write(map, buffer);
buffer.flip();
result = ReadWriteIOUtils.readMap(buffer);
Assert.assertNotNull(result);
Assert.assertEquals(map, result);

// 7. null
map = null;
byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
Expand Down
Loading
Loading