@@ -20,6 +20,7 @@ use crate::region::RegionId;
20
20
use crate :: region:: RegionVerId ;
21
21
use crate :: region:: RegionWithLeader ;
22
22
use crate :: region_cache:: RegionCache ;
23
+ use crate :: request:: codec:: { ApiV1TxnCodec , Codec } ;
23
24
use crate :: store:: KvClient ;
24
25
use crate :: store:: KvConnect ;
25
26
use crate :: store:: RegionStore ;
@@ -50,6 +51,7 @@ use crate::Timestamp;
50
51
/// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff.
51
52
#[ async_trait]
52
53
pub trait PdClient : Send + Sync + ' static {
54
+ type Codec : Codec ;
53
55
type KvClient : KvClient + Send + Sync + ' static ;
54
56
55
57
/// In transactional API, `region` is decoded (keys in raw format).
@@ -189,8 +191,11 @@ pub trait PdClient: Send + Sync + 'static {
189
191
. boxed ( )
190
192
}
191
193
192
- fn decode_region ( mut region : RegionWithLeader , enable_codec : bool ) -> Result < RegionWithLeader > {
193
- if enable_codec {
194
+ fn decode_region (
195
+ mut region : RegionWithLeader ,
196
+ enable_mvcc_codec : bool ,
197
+ ) -> Result < RegionWithLeader > {
198
+ if enable_mvcc_codec {
194
199
codec:: decode_bytes_in_place ( & mut region. region . start_key , false ) ?;
195
200
codec:: decode_bytes_in_place ( & mut region. region . end_key , false ) ?;
196
201
}
@@ -200,20 +205,30 @@ pub trait PdClient: Send + Sync + 'static {
200
205
async fn update_leader ( & self , ver_id : RegionVerId , leader : metapb:: Peer ) -> Result < ( ) > ;
201
206
202
207
async fn invalidate_region_cache ( & self , ver_id : RegionVerId ) ;
208
+
209
+ /// Get the codec carried by `PdClient`.
210
+ /// The purpose of carrying the codec is to avoid passing it on so many calling paths.
211
+ fn get_codec ( & self ) -> & Self :: Codec ;
203
212
}
204
213
205
214
/// This client converts requests for the logical TiKV cluster into requests
206
215
/// for a single TiKV store using PD and internal logic.
207
- pub struct PdRpcClient < KvC : KvConnect + Send + Sync + ' static = TikvConnect , Cl = Cluster > {
216
+ pub struct PdRpcClient <
217
+ Cod : Codec = ApiV1TxnCodec ,
218
+ KvC : KvConnect + Send + Sync + ' static = TikvConnect ,
219
+ Cl = Cluster ,
220
+ > {
208
221
pd : Arc < RetryClient < Cl > > ,
209
222
kv_connect : KvC ,
210
223
kv_client_cache : Arc < RwLock < HashMap < String , KvC :: KvClient > > > ,
211
- enable_codec : bool ,
224
+ enable_mvcc_codec : bool ,
212
225
region_cache : RegionCache < RetryClient < Cl > > ,
226
+ codec : Option < Cod > ,
213
227
}
214
228
215
229
#[ async_trait]
216
- impl < KvC : KvConnect + Send + Sync + ' static > PdClient for PdRpcClient < KvC > {
230
+ impl < Cod : Codec , KvC : KvConnect + Send + Sync + ' static > PdClient for PdRpcClient < Cod , KvC > {
231
+ type Codec = Cod ;
217
232
type KvClient = KvC :: KvClient ;
218
233
219
234
async fn map_region_to_store ( self : Arc < Self > , region : RegionWithLeader ) -> Result < RegionStore > {
@@ -224,20 +239,20 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
224
239
}
225
240
226
241
async fn region_for_key ( & self , key : & Key ) -> Result < RegionWithLeader > {
227
- let enable_codec = self . enable_codec ;
228
- let key = if enable_codec {
242
+ let enable_mvcc_codec = self . enable_mvcc_codec ;
243
+ let key = if enable_mvcc_codec {
229
244
key. to_encoded ( )
230
245
} else {
231
246
key. clone ( )
232
247
} ;
233
248
234
249
let region = self . region_cache . get_region_by_key ( & key) . await ?;
235
- Self :: decode_region ( region, enable_codec )
250
+ Self :: decode_region ( region, enable_mvcc_codec )
236
251
}
237
252
238
253
async fn region_for_id ( & self , id : RegionId ) -> Result < RegionWithLeader > {
239
254
let region = self . region_cache . get_region_by_id ( id) . await ?;
240
- Self :: decode_region ( region, self . enable_codec )
255
+ Self :: decode_region ( region, self . enable_mvcc_codec )
241
256
}
242
257
243
258
async fn get_timestamp ( self : Arc < Self > ) -> Result < Timestamp > {
@@ -255,31 +270,40 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
255
270
async fn invalidate_region_cache ( & self , ver_id : RegionVerId ) {
256
271
self . region_cache . invalidate_region_cache ( ver_id) . await
257
272
}
273
+
274
+ fn get_codec ( & self ) -> & Self :: Codec {
275
+ self . codec
276
+ . as_ref ( )
277
+ . unwrap_or_else ( || panic ! ( "codec not set" ) )
278
+ }
258
279
}
259
280
260
- impl PdRpcClient < TikvConnect , Cluster > {
281
+ impl < Cod : Codec > PdRpcClient < Cod , TikvConnect , Cluster > {
261
282
pub async fn connect (
262
283
pd_endpoints : & [ String ] ,
263
284
config : Config ,
264
- enable_codec : bool ,
265
- ) -> Result < PdRpcClient > {
285
+ enable_mvcc_codec : bool , // TODO: infer from `codec`.
286
+ codec : Option < Cod > ,
287
+ ) -> Result < PdRpcClient < Cod > > {
266
288
PdRpcClient :: new (
267
289
config. clone ( ) ,
268
290
|security_mgr| TikvConnect :: new ( security_mgr, config. timeout ) ,
269
291
|security_mgr| RetryClient :: connect ( pd_endpoints, security_mgr, config. timeout ) ,
270
- enable_codec,
292
+ enable_mvcc_codec,
293
+ codec,
271
294
)
272
295
. await
273
296
}
274
297
}
275
298
276
- impl < KvC : KvConnect + Send + Sync + ' static , Cl > PdRpcClient < KvC , Cl > {
299
+ impl < Cod : Codec , KvC : KvConnect + Send + Sync + ' static , Cl > PdRpcClient < Cod , KvC , Cl > {
277
300
pub async fn new < PdFut , MakeKvC , MakePd > (
278
301
config : Config ,
279
302
kv_connect : MakeKvC ,
280
303
pd : MakePd ,
281
- enable_codec : bool ,
282
- ) -> Result < PdRpcClient < KvC , Cl > >
304
+ enable_mvcc_codec : bool ,
305
+ codec : Option < Cod > ,
306
+ ) -> Result < PdRpcClient < Cod , KvC , Cl > >
283
307
where
284
308
PdFut : Future < Output = Result < RetryClient < Cl > > > ,
285
309
MakeKvC : FnOnce ( Arc < SecurityManager > ) -> KvC ,
@@ -301,8 +325,9 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
301
325
pd : pd. clone ( ) ,
302
326
kv_client_cache,
303
327
kv_connect : kv_connect ( security_mgr) ,
304
- enable_codec ,
328
+ enable_mvcc_codec ,
305
329
region_cache : RegionCache :: new ( pd) ,
330
+ codec,
306
331
} )
307
332
}
308
333
@@ -322,6 +347,10 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
322
347
Err ( e) => Err ( e) ,
323
348
}
324
349
}
350
+
351
+ pub fn set_codec ( & mut self , codec : Cod ) {
352
+ self . codec = Some ( codec) ;
353
+ }
325
354
}
326
355
327
356
fn make_key_range ( start_key : Vec < u8 > , end_key : Vec < u8 > ) -> kvrpcpb:: KeyRange {
0 commit comments