Skip to content

Commit 8a5a08d

Browse files
fix
1 parent b0cc48f commit 8a5a08d

4 files changed

Lines changed: 270 additions & 0 deletions

File tree

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "iceberg/arrow/file_io_register.h"
19+
20+
#include <mutex>
21+
22+
#include "iceberg/arrow/arrow_file_io.h"
23+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
24+
#include "iceberg/file_io_registry.h"
25+
#include "iceberg/util/macros.h"
26+
27+
namespace iceberg::arrow {
28+
29+
void RegisterFileIO() {
30+
static std::once_flag flag;
31+
std::call_once(flag, []() {
32+
// Register Arrow local filesystem FileIO
33+
FileIORegistry::Register(
34+
FileIORegistry::kArrowLocalFileIO,
35+
[](const std::string& /*warehouse*/,
36+
const std::unordered_map<std::string, std::string>& /*properties*/)
37+
-> Result<std::shared_ptr<FileIO>> {
38+
return std::shared_ptr<FileIO>(MakeLocalFileIO());
39+
});
40+
41+
// Register Arrow S3 FileIO
42+
FileIORegistry::Register(
43+
FileIORegistry::kArrowS3FileIO,
44+
[](const std::string& warehouse,
45+
const std::unordered_map<std::string, std::string>& properties)
46+
-> Result<std::shared_ptr<FileIO>> {
47+
ICEBERG_ASSIGN_OR_RAISE(auto file_io, MakeS3FileIO(warehouse, properties));
48+
return std::shared_ptr<FileIO>(std::move(file_io));
49+
});
50+
});
51+
}
52+
53+
} // namespace iceberg::arrow
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/arrow/file_io_register.h
23+
/// \brief Provide functions to register Arrow FileIO implementations.
24+
25+
#include "iceberg/iceberg_bundle_export.h"
26+
27+
namespace iceberg::arrow {
28+
29+
/// \brief Register Arrow FileIO implementations (local and S3) into the
30+
/// FileIORegistry.
31+
///
32+
/// This function is idempotent and thread-safe. It registers:
33+
/// - ArrowFileIO (local filesystem)
34+
/// - ArrowS3FileIO (S3 filesystem)
35+
///
36+
/// Must be called before using FileIORegistry::Load() with the built-in
37+
/// implementation names (e.g., from RestCatalog::Make(config)).
38+
ICEBERG_BUNDLE_EXPORT void RegisterFileIO();
39+
40+
} // namespace iceberg::arrow

src/iceberg/arrow/s3_properties.h

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <string>
23+
24+
namespace iceberg::arrow {
25+
26+
/// \brief S3 configuration property keys for ArrowS3FileIO.
27+
///
28+
/// These constants define the property keys used to configure S3 access
29+
/// via the Arrow filesystem integration, following the Iceberg spec for
30+
/// S3 configuration properties.
31+
struct S3Properties {
32+
/// AWS access key ID
33+
static constexpr const char* kAccessKeyId = "s3.access-key-id";
34+
/// AWS secret access key
35+
static constexpr const char* kSecretAccessKey = "s3.secret-access-key";
36+
/// AWS session token (for temporary credentials)
37+
static constexpr const char* kSessionToken = "s3.session-token";
38+
/// AWS region
39+
static constexpr const char* kRegion = "s3.region";
40+
/// Custom endpoint override (for MinIO, LocalStack, etc.)
41+
static constexpr const char* kEndpoint = "s3.endpoint";
42+
/// Whether to use path-style access (needed for MinIO)
43+
static constexpr const char* kPathStyleAccess = "s3.path-style-access";
44+
/// Whether SSL is enabled
45+
static constexpr const char* kSslEnabled = "s3.ssl.enabled";
46+
/// Connection timeout in milliseconds
47+
static constexpr const char* kConnectTimeoutMs = "s3.connect-timeout-ms";
48+
/// Socket timeout in milliseconds
49+
static constexpr const char* kSocketTimeoutMs = "s3.socket-timeout-ms";
50+
};
51+
52+
} // namespace iceberg::arrow
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "iceberg/file_io_registry.h"
19+
20+
#include <gmock/gmock.h>
21+
#include <gtest/gtest.h>
22+
23+
#include "iceberg/test/matchers.h"
24+
25+
namespace iceberg {
26+
27+
namespace {
28+
29+
/// A minimal FileIO implementation for testing.
30+
class MockFileIO : public FileIO {
31+
public:
32+
Result<std::string> ReadFile(const std::string& /*file_location*/,
33+
std::optional<size_t> /*length*/) override {
34+
return std::string("mock");
35+
}
36+
37+
Status WriteFile(const std::string& /*file_location*/,
38+
std::string_view /*content*/) override {
39+
return {};
40+
}
41+
42+
Status DeleteFile(const std::string& /*file_location*/) override { return {}; }
43+
};
44+
45+
} // namespace
46+
47+
TEST(FileIoRegistryTest, RegisterAndLoad) {
48+
const std::string impl_name = "com.test.MockFileIO";
49+
FileIORegistry::Register(
50+
impl_name,
51+
[](const std::string& /*warehouse*/,
52+
const std::unordered_map<std::string, std::string>& /*properties*/)
53+
-> Result<std::shared_ptr<FileIO>> {
54+
return std::make_shared<MockFileIO>();
55+
});
56+
57+
auto result = FileIORegistry::Load(impl_name, "/test/warehouse", {});
58+
ASSERT_THAT(result, IsOk());
59+
EXPECT_NE(result.value(), nullptr);
60+
61+
// Verify the loaded FileIO works
62+
auto read_result = result.value()->ReadFile("any_file", std::nullopt);
63+
ASSERT_THAT(read_result, IsOk());
64+
EXPECT_EQ(read_result.value(), "mock");
65+
}
66+
67+
TEST(FileIoRegistryTest, LoadNonExistentReturnsError) {
68+
auto result = FileIORegistry::Load("com.nonexistent.FileIO", "/test/warehouse", {});
69+
EXPECT_THAT(result, IsError(ErrorKind::kNotFound));
70+
EXPECT_THAT(result, HasErrorMessage("FileIO implementation not found"));
71+
}
72+
73+
TEST(FileIoRegistryTest, OverrideExistingRegistration) {
74+
const std::string impl_name = "com.test.OverrideFileIO";
75+
76+
// Register first implementation
77+
FileIORegistry::Register(
78+
impl_name,
79+
[](const std::string& /*warehouse*/,
80+
const std::unordered_map<std::string, std::string>& /*properties*/)
81+
-> Result<std::shared_ptr<FileIO>> {
82+
return std::make_shared<MockFileIO>();
83+
});
84+
85+
// Override with a different factory
86+
FileIORegistry::Register(
87+
impl_name,
88+
[](const std::string& /*warehouse*/,
89+
const std::unordered_map<std::string, std::string>& /*properties*/)
90+
-> Result<std::shared_ptr<FileIO>> {
91+
return std::make_shared<MockFileIO>();
92+
});
93+
94+
// Should still work (the override replaces the original)
95+
auto result = FileIORegistry::Load(impl_name, "/test/warehouse", {});
96+
ASSERT_THAT(result, IsOk());
97+
EXPECT_NE(result.value(), nullptr);
98+
}
99+
100+
TEST(FileIoRegistryTest, FactoryReceivesWarehouseAndProperties) {
101+
const std::string impl_name = "com.test.PropCheckFileIO";
102+
std::string captured_warehouse;
103+
std::unordered_map<std::string, std::string> captured_properties;
104+
105+
FileIORegistry::Register(
106+
impl_name,
107+
[&captured_warehouse, &captured_properties](
108+
const std::string& warehouse,
109+
const std::unordered_map<std::string, std::string>& properties)
110+
-> Result<std::shared_ptr<FileIO>> {
111+
captured_warehouse = warehouse;
112+
captured_properties = properties;
113+
return std::make_shared<MockFileIO>();
114+
});
115+
116+
std::unordered_map<std::string, std::string> props = {{"key1", "val1"},
117+
{"key2", "val2"}};
118+
auto result = FileIORegistry::Load(impl_name, "s3://my-bucket/warehouse", props);
119+
ASSERT_THAT(result, IsOk());
120+
EXPECT_EQ(captured_warehouse, "s3://my-bucket/warehouse");
121+
EXPECT_EQ(captured_properties.at("key1"), "val1");
122+
EXPECT_EQ(captured_properties.at("key2"), "val2");
123+
}
124+
125+
} // namespace iceberg

0 commit comments

Comments
 (0)