Skip to content

Commit b31a1e9

Browse files
committed
fix transaction api v1 exmaple
1 parent a4fe585 commit b31a1e9

File tree

7 files changed

+147
-113
lines changed

7 files changed

+147
-113
lines changed

examples/transaction.rs

+13-7
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,15 @@
33
mod common;
44

55
use crate::common::parse_args;
6-
use tikv_client::{BoundRange, Config, Key, KvPair, TransactionClient as Client, Value};
6+
use tikv_client::{
7+
request::request_codec::{RequestCodec, TxnApiV1},
8+
BoundRange, Config, Key, KvPair, TransactionClient as Client, Value,
9+
};
710

8-
async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
11+
async fn puts<C: RequestCodec>(
12+
client: &Client<C>,
13+
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
14+
) {
915
let mut txn = client
1016
.begin_optimistic()
1117
.await
@@ -17,7 +23,7 @@ async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>
1723
txn.commit().await.expect("Could not commit transaction");
1824
}
1925

20-
async fn get(client: &Client, key: Key) -> Option<Value> {
26+
async fn get<C: RequestCodec>(client: &Client<C>, key: Key) -> Option<Value> {
2127
let mut txn = client
2228
.begin_optimistic()
2329
.await
@@ -29,7 +35,7 @@ async fn get(client: &Client, key: Key) -> Option<Value> {
2935
res
3036
}
3137

32-
async fn key_exists(client: &Client, key: Key) -> bool {
38+
async fn key_exists<C: RequestCodec>(client: &Client<C>, key: Key) -> bool {
3339
let mut txn = client
3440
.begin_optimistic()
3541
.await
@@ -44,7 +50,7 @@ async fn key_exists(client: &Client, key: Key) -> bool {
4450
res
4551
}
4652

47-
async fn scan(client: &Client, range: impl Into<BoundRange>, limit: u32) {
53+
async fn scan<C: RequestCodec>(client: &Client<C>, range: impl Into<BoundRange>, limit: u32) {
4854
let mut txn = client
4955
.begin_optimistic()
5056
.await
@@ -56,7 +62,7 @@ async fn scan(client: &Client, range: impl Into<BoundRange>, limit: u32) {
5662
txn.commit().await.expect("Could not commit transaction");
5763
}
5864

59-
async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
65+
async fn dels<C: RequestCodec>(client: &Client<C>, keys: impl IntoIterator<Item = Key>) {
6066
let mut txn = client
6167
.begin_optimistic()
6268
.await
@@ -81,7 +87,7 @@ async fn main() {
8187
Config::default()
8288
};
8389

84-
let txn = Client::new_with_config(args.pd, config, None)
90+
let txn = Client::new_with_config(args.pd, config, TxnApiV1, None)
8591
.await
8692
.expect("Could not connect to tikv");
8793

src/lib.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@
9090
//! # })}
9191
//! ```
9292
93-
#![allow(incomplete_features)]
94-
#![feature(specialization)]
93+
#![feature(min_specialization)]
9594
#![feature(explicit_generic_args_with_impl_trait)]
9695
#[macro_use]
9796
pub mod request;

src/pd/client.rs

-9
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use tikv_client_store::{KvClient, KvConnect, TikvConnect};
1414

1515
use crate::{
1616
compat::stream_fn,
17-
kv::codec,
1817
pd::{retry::RetryClientTrait, RetryClient},
1918
region::{RegionId, RegionVerId, RegionWithLeader},
2019
region_cache::RegionCache,
@@ -198,14 +197,6 @@ pub trait PdClient: Send + Sync + 'static {
198197
.boxed()
199198
}
200199

201-
fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
202-
if enable_codec {
203-
codec::decode_bytes_in_place(region.region.mut_start_key(), false)?;
204-
codec::decode_bytes_in_place(region.region.mut_end_key(), false)?;
205-
}
206-
Ok(region)
207-
}
208-
209200
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;
210201

211202
async fn invalidate_region_cache(&self, ver_id: RegionVerId);

src/region_cache.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,14 @@ impl<C: RequestCodec, R: RetryClientTrait> RegionCache<C, R> {
132132

133133
/// Force read through (query from PD) and update cache
134134
pub async fn read_through_region_by_key(&self, key: Key) -> Result<RegionWithLeader> {
135-
let mut region = self
135+
let mut r = self
136136
.inner_client
137137
.clone()
138138
.get_region(self.codec.encode_pd_query(key).into())
139139
.await?;
140-
region.region = self.codec.decode_region(region.region)?;
141-
self.add_region(region.clone()).await;
142-
Ok(region)
140+
r.region = self.codec.decode_region(r.region)?;
141+
self.add_region(r.clone()).await;
142+
Ok(r)
143143
}
144144

145145
/// Force read through (query from PD) and update cache

src/request/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub mod request_codec;
3232
#[async_trait]
3333
pub trait KvRequest<C>: Request + Sized + Clone + Sync + Send + 'static {
3434
/// The expected response to the request.
35-
type Response: HasKeyErrors + HasLocks + HasRegionError + Clone + Send + 'static;
35+
type Response: HasKeyErrors + HasLocks + HasRegionError + Clone + Send + 'static;
3636
fn encode_request(&self, _codec: &C) -> Cow<Self>;
3737
fn decode_response(&self, _codec: &C, _resp: Self::Response) -> crate::Result<Self::Response>;
3838
}
@@ -73,7 +73,7 @@ mod test {
7373
use std::{
7474
any::Any,
7575
iter,
76-
sync::{atomic::AtomicUsize, Arc},
76+
sync::{Arc, atomic::AtomicUsize},
7777
};
7878

7979
use grpcio::CallOption;
@@ -82,10 +82,10 @@ mod test {
8282
use tikv_client_store::HasRegionError;
8383

8484
use crate::{
85+
Error,
86+
Key,
8587
mock::{MockKvClient, MockPdClient},
86-
store::store_stream_for_keys,
87-
transaction::lowering::new_commit_request,
88-
Error, Key, Result,
88+
Result, store::store_stream_for_keys, transaction::lowering::new_commit_request,
8989
};
9090

9191
use super::*;

src/request/request_codec.rs

+33-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,24 @@
11
use tikv_client_proto::metapb::Region;
22

3-
use crate::{Key, Result};
3+
use crate::{kv::codec::decode_bytes_in_place, Key, Result};
4+
5+
#[macro_export]
6+
macro_rules! plain_request {
7+
($req: ident, $codec: ident) => {
8+
if $codec.is_plain() {
9+
return ::std::borrow::Cow::Borrowed($req);
10+
}
11+
};
12+
}
13+
14+
#[macro_export]
15+
macro_rules! plain_response {
16+
($resp: ident, $codec: ident) => {
17+
if $codec.is_plain() {
18+
return Ok($resp);
19+
}
20+
};
21+
}
422

523
pub trait RequestCodec: Sized + Clone + Sync + Send + 'static {
624
fn encode_key(&self, key: Key) -> Key {
@@ -18,6 +36,9 @@ pub trait RequestCodec: Sized + Clone + Sync + Send + 'static {
1836
fn decode_region(&self, region: Region) -> Result<Region> {
1937
Ok(region)
2038
}
39+
fn is_plain(&self) -> bool {
40+
true
41+
}
2142
}
2243

2344
#[derive(Clone)]
@@ -28,4 +49,14 @@ impl RequestCodec for RawApiV1 {}
2849
#[derive(Clone)]
2950
pub struct TxnApiV1;
3051

31-
impl RequestCodec for TxnApiV1 {}
52+
impl RequestCodec for TxnApiV1 {
53+
fn encode_pd_query(&self, key: Key) -> Key {
54+
key.to_encoded()
55+
}
56+
57+
fn decode_region(&self, mut region: Region) -> Result<Region> {
58+
decode_bytes_in_place(region.mut_start_key(), false)?;
59+
decode_bytes_in_place(region.mut_end_key(), false)?;
60+
Ok(region)
61+
}
62+
}

0 commit comments

Comments
 (0)