Skip to content

Commit f26ba13

Browse files
committed
1. remove extra # in cmake files
2. fix a bug in FindANTLR.cmake 3. download antlr jar to build directory instead of the source directory 4. add StorageType to ClpTableHandle 5. use consistent types for filteredRows across different methods 6. remove archiveSource_ from ClpConfig and config docs, use storageType_ in ClpDataSource 7. minor improvements in logs and method signatures 8. rename splitPath to path in ClpConnectorSplit
1 parent 0cd9943 commit f26ba13

13 files changed

+68
-85
lines changed

CMake/ExternalAntlr4Cpp.cmake

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ if(ANTLR4_ZIP_REPOSITORY)
9999
-DDISABLE_WARNINGS:BOOL=ON
100100
# -DCMAKE_CXX_STANDARD:STRING=17 # if desired, compile the runtime with a
101101
# different C++ standard -DCMAKE_CXX_STANDARD:STRING=${CMAKE_CXX_STANDARD}
102-
# # alternatively, compile the runtime with the same C++ standard as the
102+
# alternatively, compile the runtime with the same C++ standard as the
103103
# outer project
104104
INSTALL_COMMAND ""
105105
EXCLUDE_FROM_ALL 1)
@@ -120,7 +120,7 @@ else()
120120
-DDISABLE_WARNINGS:BOOL=ON
121121
# -DCMAKE_CXX_STANDARD:STRING=17 # if desired, compile the runtime with a
122122
# different C++ standard -DCMAKE_CXX_STANDARD:STRING=${CMAKE_CXX_STANDARD}
123-
# # alternatively, compile the runtime with the same C++ standard as the
123+
# alternatively, compile the runtime with the same C++ standard as the
124124
# outer project
125125
INSTALL_COMMAND ""
126126
EXCLUDE_FROM_ALL 1)

CMake/FindANTLR.cmake

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ add_definitions(-DANTLR4CPP_STATIC)
2121

2222
# Define the JAR name and path
2323
set(ANTLR_JAR_NAME antlr-${ANTLR4_TAG}-complete.jar)
24-
set(ANTLR_EXECUTABLE ${PROJECT_SOURCE_DIR}/third_party/antlr/${ANTLR_JAR_NAME})
2524

26-
# Ensure the output directory exists
27-
file(MAKE_DIRECTORY ${PROJECT_SOURCE_DIR}/third_party/antlr)
25+
# Set the download directory
26+
set(ANTLR_DOWNLOAD_DIR ${CMAKE_BINARY_DIR}/antlr)
27+
file(MAKE_DIRECTORY ${ANTLR_DOWNLOAD_DIR})
28+
29+
# Define the full path to the ANTLR JAR
30+
set(ANTLR_EXECUTABLE ${ANTLR_DOWNLOAD_DIR}/${ANTLR_JAR_NAME})
2831

2932
# Download the ANTLR JAR if it does not exist
3033
if(NOT EXISTS ${ANTLR_EXECUTABLE})
@@ -84,7 +87,6 @@ if(ANTLR_EXECUTABLE AND Java_JAVA_EXECUTABLE)
8487
if(ANTLR_TARGET_OUTPUT_DIRECTORY)
8588
set(ANTLR_${Name}_OUTPUT_DIR ${ANTLR_TARGET_OUTPUT_DIRECTORY})
8689
else()
87-
message("ANTLR output: ${CMAKE_CURRENT_BINARY_DIR}")
8890
set(ANTLR_${Name}_OUTPUT_DIR
8991
${CMAKE_CURRENT_BINARY_DIR}/antlr4cpp_generated_src/${ANTLR_INPUT})
9092
endif()

velox/connectors/clp/ClpConfig.h

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,6 @@ class ConfigBase;
2727
namespace facebook::velox::connector::clp {
2828
class ClpConfig {
2929
public:
30-
enum class SplitSource {
31-
kLocal,
32-
kS3,
33-
};
34-
35-
static constexpr const char* kSplitSource = "clp.split-source";
36-
37-
[[nodiscard]] SplitSource splitSource() const {
38-
auto const value = config_->get<std::string>(kSplitSource, "local");
39-
auto const upperValue = boost::algorithm::to_upper_copy(value);
40-
if (upperValue == "LOCAL") {
41-
return SplitSource::kLocal;
42-
}
43-
if (upperValue == "S3") {
44-
return SplitSource::kS3;
45-
}
46-
VELOX_UNSUPPORTED("Unsupported split source: {}.", value);
47-
}
48-
4930
explicit ClpConfig(std::shared_ptr<const config::ConfigBase> config) {
5031
VELOX_CHECK_NOT_NULL(config, "Config is null for CLP initialization");
5132
config_ = std::move(config);

velox/connectors/clp/ClpConnector.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ class ClpConnectorFactory : public ConnectorFactory {
6767
std::shared_ptr<Connector> newConnector(
6868
const std::string& id,
6969
std::shared_ptr<const config::ConfigBase> config,
70-
folly::Executor* ioExecutor,
71-
folly::Executor* cpuExecutor) override {
70+
folly::Executor* /*ioExecutor*/,
71+
folly::Executor* /*cpuExecutor*/) override {
7272
return std::make_shared<ClpConnector>(id, config);
7373
}
7474
};

velox/connectors/clp/ClpConnectorSplit.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,13 @@
2020

2121
namespace facebook::velox::connector::clp {
2222
struct ClpConnectorSplit : public connector::ConnectorSplit {
23-
ClpConnectorSplit(
24-
const std::string& connectorId,
25-
const std::string& splitPath)
26-
: connector::ConnectorSplit(connectorId), splitPath_(splitPath) {}
23+
ClpConnectorSplit(const std::string& connectorId, const std::string& path)
24+
: connector::ConnectorSplit(connectorId), path_(path) {}
2725

2826
[[nodiscard]] std::string toString() const override {
29-
return fmt::format("CLP: {}", splitPath_);
27+
return fmt::format("CLP Split: {}", path_);
3028
}
3129

32-
const std::string splitPath_;
30+
const std::string path_;
3331
};
3432
} // namespace facebook::velox::connector::clp

velox/connectors/clp/ClpDataSource.cpp

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
* limitations under the License.
1515
*/
1616

17-
#include <iostream>
1817
#include <optional>
1918

2019
#include "velox/connectors/clp/ClpColumnHandle.h"
@@ -35,10 +34,9 @@ ClpDataSource::ClpDataSource(
3534
std::shared_ptr<connector::ColumnHandle>>& columnHandles,
3635
velox::memory::MemoryPool* pool,
3736
std::shared_ptr<const ClpConfig>& clpConfig)
38-
: splitSource_(clpConfig->splitSource()),
39-
pool_(pool),
40-
outputType_(outputType) {
37+
: pool_(pool), outputType_(outputType) {
4138
auto clpTableHandle = std::dynamic_pointer_cast<ClpTableHandle>(tableHandle);
39+
storageType_ = clpTableHandle->storageType();
4240
if (auto query = clpTableHandle->kqlQuery(); query && !query->empty()) {
4341
kqlQuery_ = *query;
4442
} else {
@@ -104,12 +102,12 @@ void ClpDataSource::addFieldsRecursively(
104102
void ClpDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
105103
auto clpSplit = std::dynamic_pointer_cast<ClpConnectorSplit>(split);
106104

107-
if (splitSource_ == ClpConfig::SplitSource::kLocal) {
105+
if (storageType_ == ClpTableHandle::StorageType::kFS) {
108106
cursor_ = std::make_unique<search_lib::ClpCursor>(
109-
clp_s::InputSource::Filesystem, clpSplit->splitPath_);
110-
} else if (splitSource_ == ClpConfig::SplitSource::kS3) {
107+
clp_s::InputSource::Filesystem, clpSplit->path_);
108+
} else if (storageType_ == ClpTableHandle::StorageType::kS3) {
111109
cursor_ = std::make_unique<search_lib::ClpCursor>(
112-
clp_s::InputSource::Network, clpSplit->splitPath_);
110+
clp_s::InputSource::Network, clpSplit->path_);
113111
}
114112

115113
cursor_->executeQuery(kqlQuery_, fields_);
@@ -119,7 +117,7 @@ VectorPtr ClpDataSource::createVector(
119117
const TypePtr& type,
120118
size_t size,
121119
const std::vector<clp_s::BaseColumnReader*>& projectedColumns,
122-
const std::shared_ptr<std::vector<size_t>>& filteredRows,
120+
const std::shared_ptr<std::vector<uint64_t>>& filteredRows,
123121
size_t& readerIndex) {
124122
if (type->kind() == TypeKind::ROW) {
125123
std::vector<VectorPtr> children;
@@ -155,8 +153,7 @@ VectorPtr ClpDataSource::createVector(
155153
std::optional<RowVectorPtr> ClpDataSource::next(
156154
uint64_t size,
157155
ContinueFuture& future) {
158-
std::shared_ptr<std::vector<uint64_t>> filteredRows =
159-
std::make_shared<std::vector<uint64_t>>();
156+
auto filteredRows = std::make_shared<std::vector<uint64_t>>();
160157
auto rowsScanned = cursor_->fetchNext(size, filteredRows);
161158
auto rowsFiltered = filteredRows->size();
162159
if (rowsFiltered == 0) {

velox/connectors/clp/ClpDataSource.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include "velox/connectors/Connector.h"
2222
#include "velox/connectors/clp/ClpConfig.h"
23+
#include "velox/connectors/clp/ClpTableHandle.h"
2324
#include "velox/connectors/clp/search_lib/ClpCursor.h"
2425

2526
namespace facebook::velox::connector::clp {
@@ -87,10 +88,10 @@ class ClpDataSource : public DataSource {
8788
const TypePtr& type,
8889
size_t size,
8990
const std::vector<clp_s::BaseColumnReader*>& projectedColumns,
90-
const std::shared_ptr<std::vector<size_t>>& filteredRows,
91+
const std::shared_ptr<std::vector<uint64_t>>& filteredRows,
9192
size_t& readerIndex);
9293

93-
ClpConfig::SplitSource splitSource_;
94+
ClpTableHandle::StorageType storageType_;
9495
std::string kqlQuery_;
9596
velox::memory::MemoryPool* pool_;
9697
RowTypePtr outputType_;

velox/connectors/clp/ClpTableHandle.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,29 @@
2121
namespace facebook::velox::connector::clp {
2222
class ClpTableHandle : public ConnectorTableHandle {
2323
public:
24+
enum class StorageType {
25+
kFS,
26+
kS3,
27+
};
28+
2429
ClpTableHandle(
25-
std::string connectorId,
30+
const std::string& connectorId,
2631
const std::string& tableName,
32+
StorageType storageType,
2733
std::shared_ptr<std::string> kqlQuery)
2834
: ConnectorTableHandle(connectorId),
2935
tableName_(tableName),
36+
storageType_(storageType),
3037
kqlQuery_(std::move(kqlQuery)) {}
3138

3239
[[nodiscard]] const std::string& tableName() const {
3340
return tableName_;
3441
}
3542

43+
[[nodiscard]] const StorageType storageType() const {
44+
return storageType_;
45+
}
46+
3647
[[nodiscard]] const std::shared_ptr<std::string>& kqlQuery() const {
3748
return kqlQuery_;
3849
}
@@ -43,6 +54,7 @@ class ClpTableHandle : public ConnectorTableHandle {
4354

4455
private:
4556
const std::string tableName_;
57+
const StorageType storageType_;
4658
std::shared_ptr<std::string> kqlQuery_;
4759
};
4860
} // namespace facebook::velox::connector::clp

velox/connectors/clp/search_lib/ClpCursor.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,13 @@ uint64_t ClpCursor::fetchNext(
101101
return 0;
102102
}
103103

104-
std::vector<clp_s::BaseColumnReader*>& ClpCursor::getProjectedColumns() const {
104+
const std::vector<clp_s::BaseColumnReader*>& ClpCursor::getProjectedColumns()
105+
const {
105106
if (queryRunner_) {
106107
return queryRunner_->getProjectedColumns();
107108
}
108-
static std::vector<clp_s::BaseColumnReader*> empty;
109-
return empty;
109+
static std::vector<clp_s::BaseColumnReader*> kEmpty;
110+
return kEmpty;
110111
}
111112

112113
ErrorCode ClpCursor::preprocessQuery() {

velox/connectors/clp/search_lib/ClpCursor.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ enum class ErrorCode {
3737
InternalError
3838
};
3939

40-
enum class ColumnType { String, Integer, Float, Array, Boolean, Unknown };
40+
enum class ColumnType { String, Integer, Float, Array, Boolean, Unknown = -1 };
4141

4242
struct Field {
4343
ColumnType type;
@@ -87,7 +87,7 @@ class ClpCursor {
8787
* @return A vector of BaseColumnReader pointers representing the projected
8888
* columns.
8989
*/
90-
std::vector<clp_s::BaseColumnReader*>& getProjectedColumns() const;
90+
const std::vector<clp_s::BaseColumnReader*>& getProjectedColumns() const;
9191

9292
private:
9393
/**

velox/connectors/clp/search_lib/ClpQueryRunner.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ void ClpQueryRunner::init(
6161

6262
uint64_t ClpQueryRunner::fetchNext(
6363
size_t numRows,
64-
const std::shared_ptr<std::vector<size_t>>& filteredRowIndices) {
64+
const std::shared_ptr<std::vector<uint64_t>>& filteredRowIndices) {
6565
size_t rowsfiltered = 0;
6666
size_t rowsScanned = 0;
6767
while (curMessage_ < numMessages_) {

velox/connectors/clp/tests/ClpConnectorTest.cpp

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ class ClpConnectorTest : public exec::test::OperatorTestBase {
6060
OperatorTestBase::TearDown();
6161
}
6262

63-
exec::Split makeClpSplit(
64-
const std::string& tableName,
65-
const std::string& splitPath) {
63+
exec::Split makeClpSplit(const std::string& splitPath) {
6664
return exec::Split(
6765
std::make_shared<ClpConnectorSplit>(kClpConnectorId, splitPath));
6866
}
@@ -76,7 +74,7 @@ class ClpConnectorTest : public exec::test::OperatorTestBase {
7674
}
7775

7876
static std::string getExampleFilePath(const std::string& filePath) {
79-
std::string current_path = fs::current_path().c_str();
77+
std::string current_path = fs::current_path().string();
8078
return current_path + "/examples/" + filePath;
8179
}
8280
};
@@ -88,7 +86,10 @@ TEST_F(ClpConnectorTest, test1NoPushdown) {
8886
ROW({"requestId", "userId", "method"},
8987
{VARCHAR(), VARCHAR(), VARCHAR()}))
9088
.tableHandle(std::make_shared<ClpTableHandle>(
91-
kClpConnectorId, "test_1", nullptr))
89+
kClpConnectorId,
90+
"test_1",
91+
ClpTableHandle::StorageType::kFS,
92+
nullptr))
9293
.assignments({
9394
{"requestId",
9495
std::make_shared<ClpColumnHandle>(
@@ -104,8 +105,8 @@ TEST_F(ClpConnectorTest, test1NoPushdown) {
104105
.filter("method = 'GET'")
105106
.planNode();
106107

107-
auto output = getResults(
108-
plan, {makeClpSplit("test_1", getExampleFilePath("test_1.clps"))});
108+
auto output =
109+
getResults(plan, {makeClpSplit(getExampleFilePath("test_1.clps"))});
109110
auto expected = makeRowVector(
110111
{// requestId
111112
makeFlatVector<StringView>(
@@ -133,6 +134,7 @@ TEST_F(ClpConnectorTest, test1Pushdown) {
133134
.tableHandle(std::make_shared<ClpTableHandle>(
134135
kClpConnectorId,
135136
"test_1",
137+
ClpTableHandle::StorageType::kFS,
136138
std::make_shared<std::string>(
137139
"method: \"POST\" AND status: 200")))
138140
.assignments({
@@ -149,8 +151,8 @@ TEST_F(ClpConnectorTest, test1Pushdown) {
149151
.endTableScan()
150152
.planNode();
151153

152-
auto output = getResults(
153-
plan, {makeClpSplit("test_1", getExampleFilePath("test_1.clps"))});
154+
auto output =
155+
getResults(plan, {makeClpSplit(getExampleFilePath("test_1.clps"))});
154156
auto expected =
155157
makeRowVector({// requestId
156158
makeFlatVector<StringView>({"req-106"}),
@@ -171,7 +173,10 @@ TEST_F(ClpConnectorTest, test2NoPushdown) {
171173
ROW({"type", "subtype", "severity"},
172174
{VARCHAR(), VARCHAR(), VARCHAR()})}))
173175
.tableHandle(std::make_shared<ClpTableHandle>(
174-
kClpConnectorId, "test_2", nullptr))
176+
kClpConnectorId,
177+
"test_2",
178+
ClpTableHandle::StorageType::kFS,
179+
nullptr))
175180
.assignments(
176181
{{"timestamp",
177182
std::make_shared<ClpColumnHandle>(
@@ -190,8 +195,8 @@ TEST_F(ClpConnectorTest, test2NoPushdown) {
190195
"(event.type = 'storage' AND event.subtype LIKE 'disk_usage%'))")
191196
.planNode();
192197

193-
auto output = getResults(
194-
plan, {makeClpSplit("test_2", getExampleFilePath("test_2.clps"))});
198+
auto output =
199+
getResults(plan, {makeClpSplit(getExampleFilePath("test_2.clps"))});
195200
auto expected =
196201
makeRowVector({// timestamp
197202
makeFlatVector<StringView>({"2025-04-30T08:50:05Z"}),
@@ -219,6 +224,7 @@ TEST_F(ClpConnectorTest, test2Pushdown) {
219224
.tableHandle(std::make_shared<ClpTableHandle>(
220225
kClpConnectorId,
221226
"test_2",
227+
ClpTableHandle::StorageType::kFS,
222228
std::make_shared<std::string>(
223229
"(event.severity: \"WARNING\" OR event.severity: \"ERROR\") AND "
224230
"((event.type: \"network\" AND event.subtype: \"connection\") OR "
@@ -237,8 +243,8 @@ TEST_F(ClpConnectorTest, test2Pushdown) {
237243
.endTableScan()
238244
.planNode();
239245

240-
auto output = getResults(
241-
plan, {makeClpSplit("test_2", getExampleFilePath("test_2.clps"))});
246+
auto output =
247+
getResults(plan, {makeClpSplit(getExampleFilePath("test_2.clps"))});
242248
auto expected =
243249
makeRowVector({// timestamp
244250
makeFlatVector<StringView>({"2025-04-30T08:50:05Z"}),
@@ -266,6 +272,7 @@ TEST_F(ClpConnectorTest, test2Hybrid) {
266272
.tableHandle(std::make_shared<ClpTableHandle>(
267273
kClpConnectorId,
268274
"test_2",
275+
ClpTableHandle::StorageType::kFS,
269276
std::make_shared<std::string>(
270277
"((event.type: \"network\" AND event.subtype: \"connection\") OR "
271278
"(event.type: \"storage\" AND event.subtype: \"disk*\"))")))
@@ -284,8 +291,8 @@ TEST_F(ClpConnectorTest, test2Hybrid) {
284291
.filter("upper(event.severity) IN ('WARNING', 'ERROR')")
285292
.planNode();
286293

287-
auto output = getResults(
288-
plan, {makeClpSplit("test_2", getExampleFilePath("test_2.clps"))});
294+
auto output =
295+
getResults(plan, {makeClpSplit(getExampleFilePath("test_2.clps"))});
289296
auto expected =
290297
makeRowVector({// timestamp
291298
makeFlatVector<StringView>({"2025-04-30T08:50:05Z"}),

0 commit comments

Comments
 (0)