Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class ConstantValue {
public static final Integer HTTP_CHECK_TIMEOUT_DEFAULT = 10 * 1000;
public static final Integer QUEUE_OFFER_TIMEOUT_DEFAULT = 60 * 1000;
public static final Integer QUEUE_POLL_TIMEOUT_DEFAULT = 60 * 1000;
public static final Long SINK_BATCH_MAX_BYTES_DEFAULT = 2 * 1024 * 1024 * 1024L;
// 50mb, If you need to set a larger value, you need to set a larger taskmanager memory,
// otherwise OOM may occur.
public static final Long SINK_BATCH_MAX_BYTES_DEFAULT = 50 * 1024 * 1024L;
public static final Long SINK_BATCH_MAX_ROWS_DEFAULT = 2048 * 100L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.connector.starrocks.config.StarRocksConfig;
import com.dtstack.chunjun.connector.starrocks.connection.StarRocksJdbcConnectionProvider;
import com.dtstack.chunjun.connector.starrocks.options.ConstantValue;
import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksQueryVisitor;
import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksSinkBufferEntity;
import com.dtstack.chunjun.connector.starrocks.streamload.StarRocksStreamLoadFailedException;
import com.dtstack.chunjun.connector.starrocks.streamload.StreamLoadManager;
import com.dtstack.chunjun.constants.Metrics;
import com.dtstack.chunjun.element.ColumnRowData;
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
import com.dtstack.chunjun.util.GsonUtil;
Expand Down Expand Up @@ -142,13 +144,18 @@ public synchronized void writeRecord(RowData rowData) {
checkTimerWriteException();
int size = 0;
rows.add(rowData);
if (rows.size() >= batchSize) {
if (rowData instanceof ColumnRowData) {
batchMaxByteSize += ((ColumnRowData) rowData).getByteSize();
} else {
batchMaxByteSize += rowSizeCalculator.getObjectSize(rowData);
}
if (rows.size() >= starRocksConfig.getBatchSize()
|| batchMaxByteSize >= ConstantValue.SINK_BATCH_MAX_BYTES_DEFAULT) {
writeRecordInternal();
size = batchSize;
}

updateDuration();
bytesWriteCounter.add(rowSizeCalculator.getObjectSize(rowData));
if (checkpointEnabled) {
snapshotWriteCounter.add(size);
}
Expand All @@ -160,6 +167,7 @@ protected synchronized void writeRecordInternal() {
try {
writeMultipleRecordsInternal();
numWriteCounter.add(rows.size());
bytesWriteCounter.add(batchMaxByteSize);
} catch (Exception e) {
if (e instanceof StarRocksStreamLoadFailedException) {
StarRocksStreamLoadFailedException exception =
Expand All @@ -178,6 +186,7 @@ protected synchronized void writeRecordInternal() {
} finally {
// Data is either recorded dirty data or written normally
rows.clear();
batchMaxByteSize = 0;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

import java.util.stream.Collectors;

import static com.dtstack.chunjun.connector.starrocks.options.ConstantValue.SINK_BATCH_MAX_ROWS_DEFAULT;

public class StarRocksSinkFactory extends SinkFactory {

private final StarRocksConfig starRocksConfig;
Expand All @@ -47,7 +49,8 @@ public StarRocksSinkFactory(SyncConfig syncConfig) {
JsonUtil.toJson(syncConfig.getWriter().getParameter()),
StarRocksConfig.class);

int batchSize = syncConfig.getWriter().getIntVal("batchSize", 1024);
int batchSize =
(int) syncConfig.getWriter().getLongVal("batchSize", SINK_BATCH_MAX_ROWS_DEFAULT);
starRocksConfig.setBatchSize(batchSize);
super.initCommonConf(starRocksConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ private boolean tryHttpConnection(String host) {
}

private byte[] joinRows(List<byte[]> rows, int totalBytes) {
if (totalBytes < 0) {
throw new RuntimeException(
"The ByteBuffer limit has been exceeded, json assembly may fail");
}
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : 1 - rows.size()));
bos.put("[".getBytes(StandardCharsets.UTF_8));
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ public abstract class BaseRichOutputFormat extends RichOutputFormat<RowData>
/** 最新读取的数据 */
protected RowData lastRow = null;

/** 存储用于批量写入的数据 */
/** 存储用于批量写入的数据行数 */
protected transient List<RowData> rows;
/** 存储用于批量写入的数据字节数 */
protected transient long batchMaxByteSize;
/** 数据类型转换器 */
protected AbstractRowConverter rowConverter;
/** 是否需要初始化脏数据和累加器,目前只有hive插件该参数设置为false */
Expand Down