12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use std:: collections:: hash_map:: Entry ;
15
+ pub ( crate ) mod local;
16
+
17
+ use std:: ops:: Deref ;
16
18
use std:: pin:: Pin ;
17
19
use std:: sync:: Arc ;
18
20
use std:: task:: Context ;
19
21
use std:: task:: Poll ;
20
22
use std:: time:: Duration ;
21
23
22
- use databend_common_base:: base:: tokio:: sync:: Semaphore as TokioSemaphore ;
23
24
use databend_common_grpc:: RpcClientConf ;
24
25
use databend_common_meta_client:: errors:: CreationError ;
25
26
use databend_common_meta_client:: ClientHandle ;
26
27
use databend_common_meta_client:: MetaGrpcClient ;
27
- use databend_common_meta_embedded:: MemMeta ;
28
28
use databend_common_meta_kvapi:: kvapi;
29
29
use databend_common_meta_kvapi:: kvapi:: KVStream ;
30
30
use databend_common_meta_kvapi:: kvapi:: UpsertKVReply ;
31
31
use databend_common_meta_semaphore:: acquirer:: Permit ;
32
- use databend_common_meta_semaphore:: acquirer:: SharedAcquirerStat ;
33
32
use databend_common_meta_semaphore:: errors:: AcquireError ;
34
- use databend_common_meta_semaphore:: errors:: ConnectionClosed ;
35
33
use databend_common_meta_semaphore:: Semaphore ;
36
34
use databend_common_meta_types:: protobuf:: WatchRequest ;
37
35
use databend_common_meta_types:: protobuf:: WatchResponse ;
38
36
use databend_common_meta_types:: MetaError ;
39
37
use databend_common_meta_types:: TxnReply ;
40
38
use databend_common_meta_types:: TxnRequest ;
41
39
use databend_common_meta_types:: UpsertKV ;
40
+ pub use local:: LocalMetaService ;
42
41
use log:: info;
43
42
use tokio_stream:: Stream ;
44
43
@@ -53,11 +52,22 @@ pub struct MetaStoreProvider {
53
52
/// MetaStore is impl with either a local embedded meta store, or a grpc-client of metasrv
54
53
#[ derive( Clone ) ]
55
54
pub enum MetaStore {
56
- L ( Arc < MemMeta > ) ,
55
+ L ( Arc < LocalMetaService > ) ,
57
56
R ( Arc < ClientHandle > ) ,
58
57
}
59
58
60
59
impl MetaStore {
60
+ /// Create a local meta service for testing.
61
+ ///
62
+ /// It is required to assign a base port as the port number range.
63
+ pub async fn new_local_testing ( ) -> Self {
64
+ MetaStore :: L ( Arc :: new (
65
+ LocalMetaService :: new ( "MetaStore-new-local-testing" )
66
+ . await
67
+ . unwrap ( ) ,
68
+ ) )
69
+ }
70
+
61
71
pub fn arc ( self ) -> Arc < Self > {
62
72
Arc :: new ( self )
63
73
}
@@ -70,23 +80,23 @@ impl MetaStore {
70
80
}
71
81
72
82
pub async fn get_local_addr ( & self ) -> std:: result:: Result < Option < String > , MetaError > {
73
- match self {
74
- MetaStore :: L ( _ ) => Ok ( None ) ,
75
- MetaStore :: R ( grpc_client) => {
76
- let client_info = grpc_client . get_client_info ( ) . await ? ;
77
- Ok ( Some ( client_info . client_addr ) )
78
- }
79
- }
83
+ let client = match self {
84
+ MetaStore :: L ( l ) => l . deref ( ) . deref ( ) ,
85
+ MetaStore :: R ( grpc_client) => grpc_client ,
86
+ } ;
87
+
88
+ let client_info = client . get_client_info ( ) . await ? ;
89
+ Ok ( Some ( client_info . client_addr ) )
80
90
}
81
91
82
92
pub async fn watch ( & self , request : WatchRequest ) -> Result < WatchStream , MetaError > {
83
- match self {
84
- MetaStore :: L ( _ ) => unreachable ! ( ) ,
85
- MetaStore :: R ( grpc_client) => {
86
- let streaming = grpc_client . request ( request ) . await ? ;
87
- Ok ( Box :: pin ( WatchResponseStream :: create ( streaming ) ) )
88
- }
89
- }
93
+ let client = match self {
94
+ MetaStore :: L ( l ) => l . deref ( ) ,
95
+ MetaStore :: R ( grpc_client) => grpc_client ,
96
+ } ;
97
+
98
+ let streaming = client . request ( request ) . await ? ;
99
+ Ok ( Box :: pin ( WatchResponseStream :: create ( streaming ) ) )
90
100
}
91
101
92
102
pub async fn new_acquired (
@@ -96,34 +106,12 @@ impl MetaStore {
96
106
id : impl ToString ,
97
107
lease : Duration ,
98
108
) -> Result < Permit , AcquireError > {
99
- match self {
100
- MetaStore :: L ( v) => {
101
- let mut local_lock_map = v. locks . lock ( ) . await ;
102
-
103
- let acquire_res = match local_lock_map. entry ( prefix. to_string ( ) ) {
104
- Entry :: Occupied ( v) => v. get ( ) . clone ( ) ,
105
- Entry :: Vacant ( v) => v
106
- . insert ( Arc :: new ( TokioSemaphore :: new ( capacity as usize ) ) )
107
- . clone ( ) ,
108
- } ;
109
+ let client = match self {
110
+ MetaStore :: L ( l) => l. deref ( ) ,
111
+ MetaStore :: R ( grpc_client) => grpc_client,
112
+ } ;
109
113
110
- match acquire_res. acquire_owned ( ) . await {
111
- Ok ( guard) => Ok ( Permit {
112
- stat : SharedAcquirerStat :: new ( ) ,
113
- fu : Box :: pin ( async move {
114
- let _guard = guard;
115
- Ok ( ( ) )
116
- } ) ,
117
- } ) ,
118
- Err ( _e) => Err ( AcquireError :: ConnectionClosed ( ConnectionClosed :: new_str (
119
- "" ,
120
- ) ) ) ,
121
- }
122
- }
123
- MetaStore :: R ( grpc_client) => {
124
- Semaphore :: new_acquired ( grpc_client. clone ( ) , prefix, capacity, id, lease) . await
125
- }
126
- }
114
+ Semaphore :: new_acquired ( client. clone ( ) , prefix, capacity, id, lease) . await
127
115
}
128
116
129
117
pub async fn new_acquired_by_time (
@@ -133,35 +121,12 @@ impl MetaStore {
133
121
id : impl ToString ,
134
122
lease : Duration ,
135
123
) -> Result < Permit , AcquireError > {
136
- match self {
137
- MetaStore :: L ( v) => {
138
- let mut local_lock_map = v. locks . lock ( ) . await ;
124
+ let client = match self {
125
+ MetaStore :: L ( l) => l. deref ( ) ,
126
+ MetaStore :: R ( grpc_client) => grpc_client,
127
+ } ;
139
128
140
- let acquire_res = match local_lock_map. entry ( prefix. to_string ( ) ) {
141
- Entry :: Occupied ( v) => v. get ( ) . clone ( ) ,
142
- Entry :: Vacant ( v) => v
143
- . insert ( Arc :: new ( TokioSemaphore :: new ( capacity as usize ) ) )
144
- . clone ( ) ,
145
- } ;
146
-
147
- match acquire_res. acquire_owned ( ) . await {
148
- Ok ( guard) => Ok ( Permit {
149
- stat : SharedAcquirerStat :: new ( ) ,
150
- fu : Box :: pin ( async move {
151
- let _guard = guard;
152
- Ok ( ( ) )
153
- } ) ,
154
- } ) ,
155
- Err ( _e) => Err ( AcquireError :: ConnectionClosed ( ConnectionClosed :: new_str (
156
- "" ,
157
- ) ) ) ,
158
- }
159
- }
160
- MetaStore :: R ( grpc_client) => {
161
- Semaphore :: new_acquired_by_time ( grpc_client. clone ( ) , prefix, capacity, id, lease)
162
- . await
163
- }
164
- }
129
+ Semaphore :: new_acquired_by_time ( client. clone ( ) , prefix, capacity, id, lease) . await
165
130
}
166
131
}
167
132
@@ -211,8 +176,11 @@ impl MetaStoreProvider {
211
176
) ;
212
177
213
178
// NOTE: This can only be used for test: data will be removed when program quit.
214
- let meta_store = MemMeta :: default ( ) ;
215
- Ok ( MetaStore :: L ( Arc :: new ( meta_store) ) )
179
+ Ok ( MetaStore :: L ( Arc :: new (
180
+ LocalMetaService :: new ( "MetaStoreProvider-created" )
181
+ . await
182
+ . unwrap ( ) ,
183
+ ) ) )
216
184
} else {
217
185
info ! ( conf : ? =( & self . rpc_conf) ; "use remote meta" ) ;
218
186
let client = MetaGrpcClient :: try_new ( & self . rpc_conf ) ?;
0 commit comments