Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
72cf061
[WIP] OPEN_STRUCT storage layer — columnar two-tier dense/sparse inde…
tarun11Mavani Jun 1, 2026
d58308e
fix(open_struct): support BIG_DECIMAL in dense materialization
tarun11Mavani Jun 1, 2026
8b4c0df
refactor(stats): add FieldSpec-based constructor to AbstractColumnSta…
tarun11Mavani Jun 1, 2026
8d438bf
refactor(stats): add FieldSpec-based collector factory overload
tarun11Mavani Jun 1, 2026
a3621d4
refactor(open_struct): build dense dictionary and stats from a column…
tarun11Mavani Jun 1, 2026
ea87ba3
refactor(open_struct): emit dense child metadata via shared addColumn…
tarun11Mavani Jun 1, 2026
c2aa947
refactor(open_struct): size dict-vs-raw from collector stats; drop _d…
tarun11Mavani Jun 1, 2026
d8bd6d5
refactor(open_struct): drop redundant dictionaryElementSize alias
tarun11Mavani Jun 1, 2026
4e778ab
refactor(open_struct): write dense child indexes via standard index c…
tarun11Mavani Jun 1, 2026
302460b
refactor(open_struct): extract dict decision and index writing from w…
tarun11Mavani Jun 2, 2026
25e7bf9
refactor(open_struct): move inferDataType out of OpenStructNaming int…
tarun11Mavani Jun 2, 2026
8fe4042
test(open_struct): add unit tests for OpenStructTypeInference.inferDa…
tarun11Mavani Jun 2, 2026
c895874
feat(open_struct): add FieldIndexConfigsUtil.fromFieldConfig (per-Fie…
tarun11Mavani Jun 2, 2026
2a37cf9
feat(open_struct): validate per-key indexes against vetted allowlist …
tarun11Mavani Jun 2, 2026
a7063ff
feat(open_struct): build dense-key indexes via a generic creator loop…
tarun11Mavani Jun 2, 2026
73afe36
fix(open_struct): seal dictionary in finally block so it survives Ind…
tarun11Mavani Jun 3, 2026
8be0582
fix(open_struct): wire OPEN_STRUCT through the offline batch segment-…
tarun11Mavani Jun 3, 2026
c377b53
fix(open_struct): realtime consume→seal pipeline + e2e test
tarun11Mavani Jun 3, 2026
2c0312b
fix(open_struct): preserve per-key inverted indexes through SegmentPr…
tarun11Mavani Jun 3, 2026
b42d391
test(open_struct): add offline OPEN_STRUCT ingestion + commit e2e test
tarun11Mavani Jun 3, 2026
6ccb988
style(open_struct): convert Javadoc to /// markdown style
tarun11Mavani Jun 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests.custom;

import org.testng.annotations.Test;


/// OFFLINE end-to-end ingestion test for an OPEN_STRUCT column. Builds segments from Avro via the
/// standard offline pipeline (row-major RecordReader path), uploads them, then reuses the inherited
/// index_map + dense/sparse validation against the loaded OFFLINE segment.
@Test(suiteName = "CustomClusterIntegrationTest")
public class OpenStructIngestionCommitOfflineTest extends OpenStructIngestionCommitTestBase {

private static final String DEFAULT_TABLE_NAME = "OpenStructIngestionCommitOfflineTest";

@Override
public String getTableName() {
return DEFAULT_TABLE_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests.custom;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


/// REALTIME end-to-end ingestion + commit test for an OPEN_STRUCT column. Consumes the data from
/// Kafka, triggers forceCommit so the consuming segment seals to a committed ONLINE segment, then
/// reuses the inherited index_map + dense/sparse validation against the committed REALTIME segment.
@Test(suiteName = "CustomClusterIntegrationTest")
public class OpenStructIngestionCommitRealtimeTest extends OpenStructIngestionCommitTestBase {

private static final String DEFAULT_TABLE_NAME = "OpenStructIngestionCommitRealtimeTest";
private static final long FORCE_COMMIT_TIMEOUT_MS = 120_000L;

@Override
public String getTableName() {
return DEFAULT_TABLE_NAME;
}

@Override
public boolean isRealtimeTable() {
return true;
}

@Override
protected int getRealtimeSegmentFlushSize() {
// <= NUM_DOCS so a segment seals during consumption; forceCommit then seals any remainder.
return 500;
}

@Override
protected TableType getSegmentTableType() {
return TableType.REALTIME;
}

@BeforeClass
@Override
public void setUp()
throws Exception {
super.setUp();
forceCommitAndWait();
}

private void forceCommitAndWait()
throws Exception {
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
String response = getOrCreateAdminClient().getTableClient().forceCommit(realtimeTableName);
String jobId = JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
TestUtils.waitForCondition(aVoid -> {
try {
return isForceCommitJobCompleted(jobId);
} catch (Exception e) {
return false;
}
}, 1000L, FORCE_COMMIT_TIMEOUT_MS, "Force commit did not complete for " + realtimeTableName);
}

private boolean isForceCommitJobCompleted(String jobId)
throws Exception {
String status = getOrCreateAdminClient().getTableClient().getForceCommitJobStatus(jobId);
JsonNode node = JsonUtils.stringToJsonNode(status);
JsonNode pending = node.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST);
return pending != null && pending.size() == 0;
}
}
Loading
Loading