Skip to content

Commit 11fcf47

Browse files
committed
refactor: start a meta-service as local meta for testing
- Replaced the in-memory meta-store with a meta-service running in a temporary directory for testing purposes. - The in-memory meta-store was limited in functionality and did not provide a complete feature set. - The new approach ensures full functionality during testing by simulating a real meta-service.
1 parent 23f254f commit 11fcf47

File tree

8 files changed

+361
-92
lines changed

8 files changed

+361
-92
lines changed

Cargo.lock

+5-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/store/Cargo.toml

+5-1
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,20 @@ edition = { workspace = true }
1111
io-uring = []
1212

1313
[dependencies]
14+
anyhow = { workspace = true }
1415
async-trait = { workspace = true }
1516
databend-common-base = { workspace = true }
1617
databend-common-grpc = { workspace = true }
1718
databend-common-meta-client = { workspace = true }
18-
databend-common-meta-embedded = { workspace = true }
1919
databend-common-meta-kvapi = { workspace = true }
2020
databend-common-meta-semaphore = { workspace = true }
2121
databend-common-meta-types = { workspace = true }
22+
databend-meta = { workspace = true }
2223
log = { workspace = true }
24+
tempfile = { workspace = true }
25+
tokio = { workspace = true }
2326
tokio-stream = { workspace = true }
27+
tonic = { workspace = true }
2428

2529
[lints]
2630
workspace = true

src/meta/store/src/lib.rs

+45-77
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,32 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::hash_map::Entry;
15+
pub(crate) mod local;
16+
17+
use std::ops::Deref;
1618
use std::pin::Pin;
1719
use std::sync::Arc;
1820
use std::task::Context;
1921
use std::task::Poll;
2022
use std::time::Duration;
2123

22-
use databend_common_base::base::tokio::sync::Semaphore as TokioSemaphore;
2324
use databend_common_grpc::RpcClientConf;
2425
use databend_common_meta_client::errors::CreationError;
2526
use databend_common_meta_client::ClientHandle;
2627
use databend_common_meta_client::MetaGrpcClient;
27-
use databend_common_meta_embedded::MemMeta;
2828
use databend_common_meta_kvapi::kvapi;
2929
use databend_common_meta_kvapi::kvapi::KVStream;
3030
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
3131
use databend_common_meta_semaphore::acquirer::Permit;
32-
use databend_common_meta_semaphore::acquirer::SharedAcquirerStat;
3332
use databend_common_meta_semaphore::errors::AcquireError;
34-
use databend_common_meta_semaphore::errors::ConnectionClosed;
3533
use databend_common_meta_semaphore::Semaphore;
3634
use databend_common_meta_types::protobuf::WatchRequest;
3735
use databend_common_meta_types::protobuf::WatchResponse;
3836
use databend_common_meta_types::MetaError;
3937
use databend_common_meta_types::TxnReply;
4038
use databend_common_meta_types::TxnRequest;
4139
use databend_common_meta_types::UpsertKV;
40+
pub use local::LocalMetaService;
4241
use log::info;
4342
use tokio_stream::Stream;
4443

@@ -53,11 +52,22 @@ pub struct MetaStoreProvider {
5352
/// MetaStore is impl with either a local embedded meta store, or a grpc-client of metasrv
5453
#[derive(Clone)]
5554
pub enum MetaStore {
56-
L(Arc<MemMeta>),
55+
L(Arc<LocalMetaService>),
5756
R(Arc<ClientHandle>),
5857
}
5958

6059
impl MetaStore {
60+
/// Create a local meta service for testing.
61+
///
62+
/// It is required to assign a base port as the port number range.
63+
pub async fn new_local_testing(base_port: u16) -> Self {
64+
MetaStore::L(Arc::new(
65+
LocalMetaService::new("MetaStore-new-local-testing", Some(base_port))
66+
.await
67+
.unwrap(),
68+
))
69+
}
70+
6171
pub fn arc(self) -> Arc<Self> {
6272
Arc::new(self)
6373
}
@@ -70,23 +80,23 @@ impl MetaStore {
7080
}
7181

7282
pub async fn get_local_addr(&self) -> std::result::Result<Option<String>, MetaError> {
73-
match self {
74-
MetaStore::L(_) => Ok(None),
75-
MetaStore::R(grpc_client) => {
76-
let client_info = grpc_client.get_client_info().await?;
77-
Ok(Some(client_info.client_addr))
78-
}
79-
}
83+
let client = match self {
84+
MetaStore::L(l) => l.deref().deref(),
85+
MetaStore::R(grpc_client) => grpc_client,
86+
};
87+
88+
let client_info = client.get_client_info().await?;
89+
Ok(Some(client_info.client_addr))
8090
}
8191

8292
pub async fn watch(&self, request: WatchRequest) -> Result<WatchStream, MetaError> {
83-
match self {
84-
MetaStore::L(_) => unreachable!(),
85-
MetaStore::R(grpc_client) => {
86-
let streaming = grpc_client.request(request).await?;
87-
Ok(Box::pin(WatchResponseStream::create(streaming)))
88-
}
89-
}
93+
let client = match self {
94+
MetaStore::L(l) => l.deref(),
95+
MetaStore::R(grpc_client) => grpc_client,
96+
};
97+
98+
let streaming = client.request(request).await?;
99+
Ok(Box::pin(WatchResponseStream::create(streaming)))
90100
}
91101

92102
pub async fn new_acquired(
@@ -96,34 +106,12 @@ impl MetaStore {
96106
id: impl ToString,
97107
lease: Duration,
98108
) -> Result<Permit, AcquireError> {
99-
match self {
100-
MetaStore::L(v) => {
101-
let mut local_lock_map = v.locks.lock().await;
102-
103-
let acquire_res = match local_lock_map.entry(prefix.to_string()) {
104-
Entry::Occupied(v) => v.get().clone(),
105-
Entry::Vacant(v) => v
106-
.insert(Arc::new(TokioSemaphore::new(capacity as usize)))
107-
.clone(),
108-
};
109+
let client = match self {
110+
MetaStore::L(l) => l.deref(),
111+
MetaStore::R(grpc_client) => grpc_client,
112+
};
109113

110-
match acquire_res.acquire_owned().await {
111-
Ok(guard) => Ok(Permit {
112-
stat: SharedAcquirerStat::new(),
113-
fu: Box::pin(async move {
114-
let _guard = guard;
115-
Ok(())
116-
}),
117-
}),
118-
Err(_e) => Err(AcquireError::ConnectionClosed(ConnectionClosed::new_str(
119-
"",
120-
))),
121-
}
122-
}
123-
MetaStore::R(grpc_client) => {
124-
Semaphore::new_acquired(grpc_client.clone(), prefix, capacity, id, lease).await
125-
}
126-
}
114+
Semaphore::new_acquired(client.clone(), prefix, capacity, id, lease).await
127115
}
128116

129117
pub async fn new_acquired_by_time(
@@ -133,35 +121,12 @@ impl MetaStore {
133121
id: impl ToString,
134122
lease: Duration,
135123
) -> Result<Permit, AcquireError> {
136-
match self {
137-
MetaStore::L(v) => {
138-
let mut local_lock_map = v.locks.lock().await;
124+
let client = match self {
125+
MetaStore::L(l) => l.deref(),
126+
MetaStore::R(grpc_client) => grpc_client,
127+
};
139128

140-
let acquire_res = match local_lock_map.entry(prefix.to_string()) {
141-
Entry::Occupied(v) => v.get().clone(),
142-
Entry::Vacant(v) => v
143-
.insert(Arc::new(TokioSemaphore::new(capacity as usize)))
144-
.clone(),
145-
};
146-
147-
match acquire_res.acquire_owned().await {
148-
Ok(guard) => Ok(Permit {
149-
stat: SharedAcquirerStat::new(),
150-
fu: Box::pin(async move {
151-
let _guard = guard;
152-
Ok(())
153-
}),
154-
}),
155-
Err(_e) => Err(AcquireError::ConnectionClosed(ConnectionClosed::new_str(
156-
"",
157-
))),
158-
}
159-
}
160-
MetaStore::R(grpc_client) => {
161-
Semaphore::new_acquired_by_time(grpc_client.clone(), prefix, capacity, id, lease)
162-
.await
163-
}
164-
}
129+
Semaphore::new_acquired_by_time(client.clone(), prefix, capacity, id, lease).await
165130
}
166131
}
167132

@@ -211,8 +176,11 @@ impl MetaStoreProvider {
211176
);
212177

213178
// NOTE: This can only be used for test: data will be removed when program quit.
214-
let meta_store = MemMeta::default();
215-
Ok(MetaStore::L(Arc::new(meta_store)))
179+
Ok(MetaStore::L(Arc::new(
180+
LocalMetaService::new("MetaStoreProvider-created", None)
181+
.await
182+
.unwrap(),
183+
)))
216184
} else {
217185
info!(conf :? =(&self.rpc_conf); "use remote meta");
218186
let client = MetaGrpcClient::try_new(&self.rpc_conf)?;

0 commit comments

Comments
 (0)