TiKV 源码阅读之读写流程

本文最后更新于:几秒前

背景

TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会 的顶级项目。

想要参与到 TiKV 的研发中来,首先需要学习 Rust 语言,其次需要理解 TiKV 的原理,最后便是在前两者的基础上去了解熟悉 TiKV 的源码。

笔者在 PingCAP 事务组实习的过程中,首先入门了 Rust 语言,接着又基于之前的一些知识系统地学习了 TiKV 事务层的演进历史,最后开始了解熟悉 TiKV 的源码并尝试在其中做一些探索性的预研工作。

TiKV 官方源码解析文档 详细地介绍了 TiKV 3.x 版本重要模块的设计要点,主要流程和相应代码片段,是学习 TiKV 源码必读的学习资料。当前 TiKV 已经迭代到了 6.x 版本,不仅引入了很多新的功能和优化,而且对源码也进行了多次重构,因而一些官方源码解析文档中的代码片段已经不复存在,这使得读者在阅读源码解析文档时无法对照最新源码加深理解;此外尽管 TiKV 官方源码解析文档系统地介绍了若干重要模块的工作,但并没有将读写流程全链路串起来去介绍经过的模块和对应的代码片段,实际上尽快地熟悉读写流程全链路会更利于新人从全局角度理解代码。

基于以上存在的问题,本篇博客将首先介绍 TiKV 中的一些基本概念,接着从源码的视角介绍 TiKV 的三个重要模块(KVService,Storage,RaftStore)和读写流程,最后再简单推荐断点调试 TiKV 学习源码的方案。

希望本篇博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。

注:以下源码阅读基于 TiKV 6.1 版本。

基本概念

TiKV 的架构简介可以查看 官方文档。总体来看,TiKV 是一个通过 Multi-Raft 实现的分布式 KV 数据库。

TiKV 的每个进程拥有一个 store,store 中拥有若干 region。每个 region 是一个 raft 组,会存在于副本数个 store 上管理一段 KV 区间的数据。

重要模块

KVService

TiKV 的 Service 层代码位于 src/server 文件夹下,其职责包括提供 RPC 服务、将 store id 解析成地址、TiKV 之间的相互通信等。有关 Service 层的概念解析可以查看阅读 TiKV 源码解析系列文章(九)Service 层处理流程解析

TiKV 包含多个 gRPC service。其中最重要的一个是 KVService,位于 src/server/service/kv.rs 文件中。

KVService 定义了 TiKV 的 kv_get,kv_scan,kv_prewrite,kv_commit 等事务操作 API,用于执行 TiDB 下推下来的复杂查询和计算的 coprocessor API,以及 raw_get,raw_put 等 Raw KV API。batch_commands 接口则是用于将上述的接口 batch 起来,以优化高吞吐量的场景。另外,TiKV 的 Raft group 各成员之间通信用到的 raft 和 batch_raft 接口也是在这里提供的。

本小节将简单介绍 KVService 及其启动流程,并顺带介绍 TiKV 若干重要结构的初始化流程。

cmd/tikv-server/main.rs 是 TiKV 进程启动的入口,其主要做了以下两个工作:

  • 解析配置参数
  • 使用 server::server::run_tikv(config) 启动 tikv 进程
1
2
3
4
5
6
7
8
9
10
fn main() {
let build_timestamp = option_env!("TIKV_BUILD_TIME");
let version_info = tikv::tikv_version_info(build_timestamp);

// config parsing
// ...
// config parsing

server::server::run_tikv(config);
}

对于 components/server/src/server.rs 的 run-tikv 函数,其会调用 run_impl 函数并根据配置参数来启动对应的 KV 引擎。

在 run_impl 函数中,首先会调用 TikvServer::<CER>::init::<F>(config) 函数做若干重要结构的初始化,包含但不限于 batch_system, concurrency_manager, background_worker, quota_limiter 等等,接着在 tikv.init_servers::<F>() 里将 RPC handler 与 KVService 绑定起来,最后在 tikv.run_server(server_config) 中便会使用 TiKV 源码解析系列文章(七)gRPC Server 的初始化和启动流程 中介绍的 grpc server 绑定对应的端口并开始监听连接了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/// Run a TiKV server. Returns when the server is shutdown by the user, in which
/// case the server will be properly stopped.
pub fn run_tikv(config: TikvConfig) {

...

// Do some prepare works before start.
pre_start();

let _m = Monitor::default();

dispatch_api_version!(config.storage.api_version(), {
if !config.raft_engine.enable {
run_impl::<RocksEngine, API>(config)
} else {
run_impl::<RaftLogEngine, API>(config)
}
})
}

#[inline]
fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(config: TikvConfig) {
let mut tikv = TikvServer::<CER>::init::<F>(config);

...

let server_config = tikv.init_servers::<F>();

...

tikv.run_server(server_config);

signal_handler::wait_for_signal(Some(tikv.engines.take().unwrap().engines));
tikv.stop();
}

fn run_server(&mut self, server_config: Arc<VersionTrack<ServerConfig>>) {
let server = self.servers.as_mut().unwrap();
server
.server
.build_and_bind()
.unwrap_or_else(|e| fatal!("failed to build server: {}", e));
server
.server
.start(server_config, self.security_mgr.clone())
.unwrap_or_else(|e| fatal!("failed to start server: {}", e));
}

KVService 服务启动后,所有发往监听端口的请求便会路由到 KVService 对应的 handler 上。有关 KVService 目前支持的接口,可以直接查看 kvproto 对应的 service Tikv,目前的 RPC 接口已经接近 60 个,每个接口在代码中都会对应一个 handler。

1
2
3
4
5
6
7
8
9
10
// Key/value store API for TiKV.
service Tikv {
// Commands using a transactional interface.
rpc KvGet(kvrpcpb.GetRequest) returns (kvrpcpb.GetResponse) {}
rpc KvScan(kvrpcpb.ScanRequest) returns (kvrpcpb.ScanResponse) {}
rpc KvPrewrite(kvrpcpb.PrewriteRequest) returns (kvrpcpb.PrewriteResponse) {}
rpc KvPessimisticLock(kvrpcpb.PessimisticLockRequest) returns (kvrpcpb.PessimisticLockResponse) {}
rpc KVPessimisticRollback(kvrpcpb.PessimisticRollbackRequest) returns (kvrpcpb.PessimisticRollbackResponse) {}
...
}

当 KVService 收到请求之后,会根据请求的类型把这些请求转发到不同的模块进行处理。对于从 TiDB 下推的读请求,比如 sum,avg 操作,会转发到 Coprocessor 模块进行处理,对于 KV 请求会直接转发到 Storage 模块进行处理。

KV 操作根据功能可以被划分为 Raw KV 操作以及 Txn KV 操作两大类。Raw KV 操作包括 raw put、raw get、raw delete、raw batch get、raw batch put、raw batch delete、raw scan 等普通 KV 操作。 Txn KV 操作是为了实现事务机制而设计的一系列操作,如 prewrite 和 commit 分别对应于 2PC 中的 prepare 和 commit 阶段的操作。

TiKV 源码解析系列文章(七)gRPC Server 的初始化和启动流程 中介绍的 handler example 不同,当前 KVService 对事务 API 例如 kv_prewrite, kv_commit 和 Raw API 例如 raw_get, raw_scan 进行了封装,由于他们都会被路由到 Storage 模块,所以接口无关的逻辑都被封装到了 handle_request 宏中,接口相关的逻辑则被封装到了 future_prewirte, future_commit 等 future_xxx 函数中。需要注意的是,对于 coprocessor API,raft API 等相关接口依然采用了原生对接 grpc-rs 的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
macro_rules! handle_request {
($fn_name: ident, $future_name: ident, $req_ty: ident, $resp_ty: ident) => {
handle_request!($fn_name, $future_name, $req_ty, $resp_ty, no_time_detail);
};
($fn_name: ident, $future_name: ident, $req_ty: ident, $resp_ty: ident, $time_detail: tt) => {
fn $fn_name(&mut self, ctx: RpcContext<'_>, mut req: $req_ty, sink: UnarySink<$resp_ty>) {
forward_unary!(self.proxy, $fn_name, ctx, req, sink);
let begin_instant = Instant::now();

let source = req.mut_context().take_request_source();
let resp = $future_name(&self.storage, req);
let task = async move {
let resp = resp.await?;
let elapsed = begin_instant.saturating_elapsed();
set_total_time!(resp, elapsed, $time_detail);
sink.success(resp).await?;
GRPC_MSG_HISTOGRAM_STATIC
.$fn_name
.observe(elapsed.as_secs_f64());
record_request_source_metrics(source, elapsed);
ServerResult::Ok(())
}
.map_err(|e| {
log_net_error!(e, "kv rpc failed";
"request" => stringify!($fn_name)
);
GRPC_MSG_FAIL_COUNTER.$fn_name.inc();
})
.map(|_|());

ctx.spawn(task);
}
}
}

impl<T: RaftStoreRouter<E::Local> + 'static, E: Engine, L: LockManager, F: KvFormat> Tikv
for Service<T, E, L, F>
{
handle_request!(kv_get, future_get, GetRequest, GetResponse, has_time_detail);
handle_request!(kv_scan, future_scan, ScanRequest, ScanResponse);
handle_request!(
kv_prewrite,
future_prewrite,
PrewriteRequest,
PrewriteResponse,
has_time_detail
);

...

handle_request!(raw_get, future_raw_get, RawGetRequest, RawGetResponse);
handle_request!(
raw_batch_get,
future_raw_batch_get,
RawBatchGetRequest,
RawBatchGetResponse
);
handle_request!(raw_scan, future_raw_scan, RawScanRequest, RawScanResponse);

...

fn coprocessor(&mut self, ctx: RpcContext<'_>, mut req: Request, sink: UnarySink<Response>) {
forward_unary!(self.proxy, coprocessor, ctx, req, sink);
let source = req.mut_context().take_request_source();
let begin_instant = Instant::now();
let future = future_copr(&self.copr, Some(ctx.peer()), req);
let task = async move {
let resp = future.await?.consume();
sink.success(resp).await?;
let elapsed = begin_instant.saturating_elapsed();
GRPC_MSG_HISTOGRAM_STATIC
.coprocessor
.observe(elapsed.as_secs_f64());
record_request_source_metrics(source, elapsed);
ServerResult::Ok(())
}
.map_err(|e| {
log_net_error!(e, "kv rpc failed";
"request" => "coprocessor"
);
GRPC_MSG_FAIL_COUNTER.coprocessor.inc();
})
.map(|_| ());

ctx.spawn(task);
}

...
}

在事务相关 API 的 future_xxx 函数实现中,对于带有写语义的 future_prewrite, future_commit 等函数,由于它们会被统一调度到 Storage 模块的 sched_txn_command 函数中,当前又抽象出了 txn_command_future 宏来减少冗余代码;对于带有读语义的 future_get, future_scan 等函数,由于他们会分别调用 Storage 模块的 get/scan 等函数,因而目前并没有进行进一步抽象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
macro_rules! txn_command_future {
($fn_name: ident, $req_ty: ident, $resp_ty: ident, ($req: ident) $prelude: stmt; ($v: ident, $resp: ident, $tracker: ident) { $else_branch: expr }) => {
fn $fn_name<E: Engine, L: LockManager, F: KvFormat>(
storage: &Storage<E, L, F>,
$req: $req_ty,
) -> impl Future<Output = ServerResult<$resp_ty>> {
$prelude
let $tracker = GLOBAL_TRACKERS.insert(Tracker::new(RequestInfo::new(
$req.get_context(),
RequestType::Unknown,
0,
)));
set_tls_tracker_token($tracker);
let (cb, f) = paired_future_callback();
let res = storage.sched_txn_command($req.into(), cb);

async move {
defer!{{
GLOBAL_TRACKERS.remove($tracker);
}};
let $v = match res {
Err(e) => Err(e),
Ok(_) => f.await?,
};
let mut $resp = $resp_ty::default();
if let Some(err) = extract_region_error(&$v) {
$resp.set_region_error(err);
} else {
$else_branch;
}
Ok($resp)
}
}
};
($fn_name: ident, $req_ty: ident, $resp_ty: ident, ($v: ident, $resp: ident, $tracker: ident) { $else_branch: expr }) => {
txn_command_future!($fn_name, $req_ty, $resp_ty, (req) {}; ($v, $resp, $tracker) { $else_branch });
};
($fn_name: ident, $req_ty: ident, $resp_ty: ident, ($v: ident, $resp: ident) { $else_branch: expr }) => {
txn_command_future!($fn_name, $req_ty, $resp_ty, (req) {}; ($v, $resp, tracker) { $else_branch });
};
}

txn_command_future!(future_prewrite, PrewriteRequest, PrewriteResponse, (v, resp, tracker) {{
if let Ok(v) = &v {
resp.set_min_commit_ts(v.min_commit_ts.into_inner());
resp.set_one_pc_commit_ts(v.one_pc_commit_ts.into_inner());
GLOBAL_TRACKERS.with_tracker(tracker, |tracker| {
tracker.write_scan_detail(resp.mut_exec_details_v2().mut_scan_detail_v2());
tracker.write_write_detail(resp.mut_exec_details_v2().mut_write_detail());
});
}
resp.set_errors(extract_key_errors(v.map(|v| v.locks)).into());
}});

fn future_get<E: Engine, L: LockManager, F: KvFormat>(
storage: &Storage<E, L, F>,
mut req: GetRequest,
) -> impl Future<Output = ServerResult<GetResponse>> {
let tracker = GLOBAL_TRACKERS.insert(Tracker::new(RequestInfo::new(
req.get_context(),
RequestType::KvGet,
req.get_version(),
)));
set_tls_tracker_token(tracker);
let start = Instant::now();
let v = storage.get(
req.take_context(),
Key::from_raw(req.get_key()),
req.get_version().into(),
);

async move {
let v = v.await;
let duration_ms = duration_to_ms(start.saturating_elapsed());
let mut resp = GetResponse::default();
if let Some(err) = extract_region_error(&v) {
resp.set_region_error(err);
} else {
match v {
Ok((val, stats)) => {
...
}
Err(e) => resp.set_error(extract_key_error(&e)),
}
}
GLOBAL_TRACKERS.remove(tracker);
Ok(resp)
}
}

自 3.x 版本以来,KVService 利用了多个宏显著减少了不同 RPC handler 间的冗余代码,然而这些宏目前还不能被 Clion 等调试工具的函数调用关系链捕捉到,这可能会困惑刚开始查看函数调用链却无法找到对应 handler 的新同学。

通过本小节,希望您能够了解 KVService 的作用和启动流程,具备寻找全局重要结构体初始化代码片段的能力,此外也能够在需要时迅速找到对应的 RPC handler。

Storage

Storage 模块位于 Service 与底层 KV 存储引擎之间,主要负责事务的并发控制。TiKV 端事务相关的实现都在 Storage 模块中。有关 3.x 版本的 Storage 模块可以参照 TiKV 源码解析系列文章(十一)Storage - 事务控制层

经过三个大版本的迭代,Storage 和 Scheduler 结构体已经发生了一些变化,本小节将基于之前的源码解析文档做一些更新和补充。

Storage 结构体:

  • engine:代表的是底层的 KV 存储引擎,利用 Trait Bound 来约束接口,拥有多种实现。实际 TiKV 使用的是 RaftKV 引擎,当调用 RaftKV 的 async_write 进行写入操作时,如果 async_write 通过回调方式成功返回了,说明写入操作已经通过 raft 复制给了大多数副本,并且在 leader 节点(调用者所在 TiKV)完成写入了,后续 leader 节点上的读就能够看到之前写入的内容
  • sched:事务调度器,负责并发事务请求的调度工作
  • readPool:读取线程池,所有只读 KV 请求,包括事务的非事务的,如 raw get、txn kv get 等最终都会在这个线程池内执行。由于只读请求不需要获取 latches,所以为其分配一个独立的线程池直接执行,而不是与非只读事务共用事务调度器。值得注意的是,当前版本的 readPool 已经支持根据读请求中的 priority 字段来差别调度读请求,而不是全部看做相同优先级的任务来公平调度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
pub struct Storage<E: Engine, L: LockManager, F: KvFormat> {
// TODO: Too many Arcs, would be slow when clone.
engine: E,

sched: TxnScheduler<E, L>,

/// The thread pool used to run most read operations.
read_pool: ReadPoolHandle,

...
}

#[derive(Clone)]
pub enum ReadPoolHandle {
FuturePools {
read_pool_high: FuturePool,
read_pool_normal: FuturePool,
read_pool_low: FuturePool,
},
Yatp {
remote: Remote<TaskCell>,
running_tasks: IntGauge,
max_tasks: usize,
pool_size: usize,
},
}

Scheduler 结构体:

  • id_alloc:到达 Scheduler 的请求都会被分配一个唯一的 command id
  • latches:写请求到达 Scheduler 之后会尝试获取所需要的 latch,如果暂时获取不到所需要的 latch,其对应的 command id 会被插入到 latch 的 waiting list 里,当前面的请求执行结束后会唤醒 waiting list 里的请求继续执行。至于为什么需要 latches,可以参考 TiKV 源码解析系列文章(十二)分布式事务 中的 Scheduler 与 Latch 章节
  • task_slots:用于存储 Scheduler 中所有请求的上下文,比如暂时未能获取到所有所需 latch 的请求会被暂存在 task_slots 中
  • lock_mgr:悲观事务冲突管理器,当多个并行悲观事务之间存在冲突时可能会暂时阻塞某些事务。TiKV 悲观事务具体原理可参考博客 TiDB 新特性漫谈:悲观事务
  • pipelined_pessimistic_lock/in_memory_pessimistic_lock/enable_async_apply_prewrite:TiKV 悲观事务若干优化引入的新字段,具体优化可参考博客 TiDB 6.0 实战分享丨内存悲观锁原理浅析与实践
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/// Scheduler which schedules the execution of `storage::Command`s.
#[derive(Clone)]
pub struct Scheduler<E: Engine, L: LockManager> {
inner: Arc<SchedulerInner<L>>,
// The engine can be fetched from the thread local storage of scheduler threads.
// So, we don't store the engine here.
_engine: PhantomData<E>,
}

struct SchedulerInner<L: LockManager> {
// slot_id -> { cid -> `TaskContext` } in the slot.
task_slots: Vec<CachePadded<Mutex<HashMap<u64, TaskContext>>>>,

// cmd id generator
id_alloc: CachePadded<AtomicU64>,

// write concurrency control
latches: Latches,

sched_pending_write_threshold: usize,

// worker pool
worker_pool: SchedPool,

// high priority commands and system commands will be delivered to this pool
high_priority_pool: SchedPool,

// used to control write flow
running_write_bytes: CachePadded<AtomicUsize>,

flow_controller: Arc<FlowController>,

control_mutex: Arc<tokio::sync::Mutex<bool>>,

lock_mgr: L,

concurrency_manager: ConcurrencyManager,

pipelined_pessimistic_lock: Arc<AtomicBool>,

in_memory_pessimistic_lock: Arc<AtomicBool>,

enable_async_apply_prewrite: bool,

resource_tag_factory: ResourceTagFactory,

quota_limiter: Arc<QuotaLimiter>,
feature_gate: FeatureGate,
}

最开始看到 id_alloc 和 task_slots 的介绍时往往会好奇为每个 command 生成唯一 id 的意义是什么? task_slots 里面存的上下文到底是什么?实际上这与 TiKV 的异步框架有关系。

以下是 Storage 模块执行事务请求的关键函数 schedule_command,可以看到,每个请求一进入函数首先会申请一个递增唯一的 cid,接着依据该 cid 将本次请求的 command 包在一个 task 中,然后将该 task 附带 callback 生成一个 TaskContext 插入到 task_slot 中,之后便会尝试去申请 latches,如果成功便会继续调用 execute 函数去真正执行 task,否则便似乎没有下文了?那么如果 task 申请 latches 失败,之后该 task 会在什么时候被执行呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
fn schedule_command(&self, cmd: Command, callback: StorageCallback) {
let cid = self.inner.gen_id();
let tracker = get_tls_tracker_token();
debug!("received new command"; "cid" => cid, "cmd" => ?cmd, "tracker" => ?tracker);
let tag = cmd.tag();
let priority_tag = get_priority_tag(cmd.priority());
SCHED_STAGE_COUNTER_VEC.get(tag).new.inc();
SCHED_COMMANDS_PRI_COUNTER_VEC_STATIC
.get(priority_tag)
.inc();

let mut task_slot = self.inner.get_task_slot(cid);
let tctx = task_slot.entry(cid).or_insert_with(|| {
self.inner
.new_task_context(Task::new(cid, tracker, cmd), callback)
});

if self.inner.latches.acquire(&mut tctx.lock, cid) {
fail_point!("txn_scheduler_acquire_success");
tctx.on_schedule();
let task = tctx.task.take().unwrap();
drop(task_slot);
self.execute(task);
return;
}
let task = tctx.task.as_ref().unwrap();
let deadline = task.cmd.deadline();
let cmd_ctx = task.cmd.ctx().clone();
self.fail_fast_or_check_deadline(cid, tag, cmd_ctx, deadline);
fail_point!("txn_scheduler_acquire_fail");
}

/// Task is a running command.
pub(super) struct Task {
pub(super) cid: u64,
pub(super) tracker: TrackerToken,
pub(super) cmd: Command,
pub(super) extra_op: ExtraOp,
}

// It stores context of a task.
struct TaskContext {
task: Option<Task>,

lock: Lock,
cb: Option<StorageCallback>,
pr: Option<ProcessResult>,
// The one who sets `owned` from false to true is allowed to take
// `cb` and `pr` safely.
owned: AtomicBool,
write_bytes: usize,
tag: CommandKind,
// How long it waits on latches.
// latch_timer: Option<Instant>,
latch_timer: Instant,
// Total duration of a command.
_cmd_timer: CmdTimer,
}

/// Latches which are used for concurrency control in the scheduler.
///
/// Each latch is indexed by a slot ID, hence the term latch and slot are used
/// interchangeably, but conceptually a latch is a queue, and a slot is an index
/// to the queue.
pub struct Latches {
slots: Vec<CachePadded<Mutex<Latch>>>,
size: usize,
}

进入 Latches::acquire 函数中去细究,可以看到其会渐进的去收集所有 latch,如果在本次函数调用中没有收集到所有的 latch, 当前线程不会受到任何阻塞而是直接返回 false。当然在返回 false 之前其也会利用 latch.wair_for_wake 函数将当前 task 的 id 放到对应 latch 的 waiting 队列里面,之后当前线程便可以处理其他的任务而不是阻塞在该任务上。由于每个获取到所有 latch 去执行的任务会在执行结束后调用 scheduler::release_lock 函数来释放所拥有的全部 latch,在释放过程中,其便能够获取到阻塞在这些 latch 且位于 waiting 队列首位的所有其他 task,接着对应线程会调用 scheduler::try_to_wake_up 函数遍历唤醒这些 task 并尝试再次获取 latch 和执行,一旦能够获取成功便去 execute,否则继续阻塞等待其他线程再次唤醒即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/// Tries to acquire the latches specified by the `lock` for command with ID
/// `who`.
///
/// This method will enqueue the command ID into the waiting queues of the
/// latches. A latch is considered acquired if the command ID is the first
/// one of elements in the queue which have the same hash value. Returns
/// true if all the Latches are acquired, false otherwise.
pub fn acquire(&self, lock: &mut Lock, who: u64) -> bool {
let mut acquired_count: usize = 0;
for &key_hash in &lock.required_hashes[lock.owned_count..] {
let mut latch = self.lock_latch(key_hash);
match latch.get_first_req_by_hash(key_hash) {
Some(cid) => {
if cid == who {
acquired_count += 1;
} else {
latch.wait_for_wake(key_hash, who);
break;
}
}
None => {
latch.wait_for_wake(key_hash, who);
acquired_count += 1;
}
}
}
lock.owned_count += acquired_count;
lock.acquired()
}

pub fn wait_for_wake(&mut self, key_hash: u64, cid: u64) {
self.waiting.push_back(Some((key_hash, cid)));
}

/// Releases all the latches held by a command.
fn release_lock(&self, lock: &Lock, cid: u64) {
let wakeup_list = self.inner.latches.release(lock, cid);
for wcid in wakeup_list {
self.try_to_wake_up(wcid);
}
}

/// Releases all latches owned by the `lock` of command with ID `who`,
/// returns the wakeup list.
///
/// Preconditions: the caller must ensure the command is at the front of the
/// latches.
pub fn release(&self, lock: &Lock, who: u64) -> Vec<u64> {
let mut wakeup_list: Vec<u64> = vec![];
for &key_hash in &lock.required_hashes[..lock.owned_count] {
let mut latch = self.lock_latch(key_hash);
let (v, front) = latch.pop_front(key_hash).unwrap();
assert_eq!(front, who);
assert_eq!(v, key_hash);
if let Some(wakeup) = latch.get_first_req_by_hash(key_hash) {
wakeup_list.push(wakeup);
}
}
wakeup_list
}

/// Tries to acquire all the necessary latches. If all the necessary latches
/// are acquired, the method initiates a get snapshot operation for further
/// processing.
fn try_to_wake_up(&self, cid: u64) {
match self.inner.acquire_lock_on_wakeup(cid) {
Ok(Some(task)) => {
fail_point!("txn_scheduler_try_to_wake_up");
self.execute(task);
}
Ok(None) => {}
Err(err) => {
// Spawn the finish task to the pool to avoid stack overflow
// when many queuing tasks fail successively.
let this = self.clone();
self.inner
.worker_pool
.pool
.spawn(async move {
this.finish_with_err(cid, err);
})
.unwrap();
}
}
}

实际上一旦构造出 TaskContext 并插入到 task_slots 中,只要持有 id 便可以去 task_slots 中获取到该 task 和其对应的 callback,那么任何一个线程都可以去执行该任务并返回客户端对应的执行结果。

总体来看,这样的异步方案相当于在 command 级别抽象出了一套类协程调度逻辑,再辅以 Rust 原生的无栈协程,减少了很多线程之前的同步阻塞和切换,是一种值得学习的异步方案。

通过本小节,希望您能够了解 Storage 模块的组织结构,并对 scheduler 的并发请求调度方案有一定的认知,能够在正确的位置去追踪单个请求的异步调用路径。

RaftStore

RaftStore 常被认为是 TiKV 最复杂,最晦涩的模块,劝退了相当一部分开发者。

在笔者看来这主要跟要保证 multi-raft + split/merge 在各种 case 下的一致性/正确性有关,本身的语义就十分复杂,那实现也就很难简单了。尽管有太多需要注意的细节,但如果仅要了解 RaftStore 的大体框架依然是可行的。

TiKV 源码解析系列文章(十七)raftstore 概览 介绍了 3.x 版本的 RaftStore,目前 RaftStore 已经有了些许的变化,本小节将简单补充笔者的理解。

Batch System 是 RaftStore 处理的基石,是一套用来并发驱动状态机的机制。

状态机的核心定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/// A `Fsm` is a finite state machine. It should be able to be notified for
/// updating internal state according to incoming messages.
pub trait Fsm {
type Message: Send;

fn is_stopped(&self) -> bool;

/// Set a mailbox to FSM, which should be used to send message to itself.
fn set_mailbox(&mut self, _mailbox: Cow<'_, BasicMailbox<Self>>)
where
Self: Sized,
{
}
/// Take the mailbox from FSM. Implementation should ensure there will be
/// no reference to mailbox after calling this method.
fn take_mailbox(&mut self) -> Option<BasicMailbox<Self>>
where
Self: Sized,
{
None
}

fn get_priority(&self) -> Priority {
Priority::Normal
}
}

/// A unify type for FSMs so that they can be sent to channel easily.
pub enum FsmTypes<N, C> {
Normal(Box<N>),
Control(Box<C>),
// Used as a signal that scheduler should be shutdown.
Empty,
}

状态机通过 PollHandler 来驱动,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/// A handler that polls all FSMs in ready.
///
/// A general process works like the following:
///
/// loop {
/// begin
/// if control is ready:
/// handle_control
/// foreach ready normal:
/// handle_normal
/// light_end
/// end
/// }
///
/// A [`PollHandler`] doesn't have to be [`Sync`] because each poll thread has
/// its own handler.
pub trait PollHandler<N, C>: Send + 'static {
/// This function is called at the very beginning of every round.
fn begin<F>(&mut self, _batch_size: usize, update_cfg: F)
where
for<'a> F: FnOnce(&'a Config);

/// This function is called when the control FSM is ready.
///
/// If `Some(len)` is returned, this function will not be called again until
/// there are more than `len` pending messages in `control` FSM.
///
/// If `None` is returned, this function will be called again with the same
/// FSM `control` in the next round, unless it is stopped.
fn handle_control(&mut self, control: &mut C) -> Option<usize>;

/// This function is called when some normal FSMs are ready.
fn handle_normal(&mut self, normal: &mut impl DerefMut<Target = N>) -> HandleResult;

/// This function is called after [`handle_normal`] is called for all FSMs
/// and before calling [`end`]. The function is expected to run lightweight
/// works.
fn light_end(&mut self, _batch: &mut [Option<impl DerefMut<Target = N>>]) {}

/// This function is called at the end of every round.
fn end(&mut self, batch: &mut [Option<impl DerefMut<Target = N>>]);

/// This function is called when batch system is going to sleep.
fn pause(&mut self) {}

/// This function returns the priority of this handler.
fn get_priority(&self) -> Priority {
Priority::Normal
}
}

大体来看,状态机分成两种,normal 和 control。对于每一个 Batch System,只有一个 control 状态机,负责管理和处理一些需要全局视野的任务。其他 normal 状态机负责处理其自身相关的任务。每个状态机都有其绑定的消息和消息队列。PollHandler 负责驱动状态机,处理自身队列中的消息。Batch System 的职责就是检测哪些状态机需要驱动,然后调用 PollHandler 去消费消息。消费消息会产生副作用,而这些副作用或要落盘,或要网络交互。PollHandler 在一个批次中可以处理多个 normal 状态机。

在 RaftStore 里,一共有两个 Batch System。分别是 RaftBatchSystem 和 ApplyBatchSystem。RaftBatchSystem 用于驱动 Raft 状态机,包括日志的分发、落盘、状态跃迁等。已经提交的日志会被发往 ApplyBatchSystem 进行处理。ApplyBatchSystem 将日志解析并应用到底层 KV 数据库中,执行回调函数。所有的写操作都遵循着这个流程。

具体一点来说:

  • 每个 PollHandler 对应一个线程,其在 poll 函数中会持续地检测需要驱动的状态机并进行处理,此外还可能将某些 hot region 路由给其他 PollHandler 来做一些负载均衡操作。
  • 每个 region 对应一个 raft 组,而每个 raft 组在一个 BatchSystem 里就对应一个 normal 状态机,
    • 对于 RaftBatchSystem,参照 TiKV 源码解析系列文章(二)raft-rs proposal 示例情景分析 中提到的 raft-rs 接口,每个 normal 状态机在一轮 loop 中被 PollHandler 获取一次 ready,其中一般包含需要持久化的未提交日志,需要发送的消息和需要应用的已提交日志等。对于需要持久化的未提交日志,最直接的做法便是将其暂时缓存到内存中进行攒批,然后在当前 loop 结尾的 end 函数中统一阻塞处理,这无疑会影响每轮 loop 的效率, TiKV 的 6.x 版本已经将 loop 结尾的阻塞 IO 抽到了 loop 外交给了额外的线程池去做,这进一步提升了 store loop 的效率,具体可参考该 issue。对于需要发送的消息,则通过 Transport 异步发送给对应的 store。对于需要应用的已提交日志,则通过 applyRouter 带着回调函数发给 ApplyBatchSystem。
    • 对于 ApplyBatchSystem,每个 normal 状态机在一轮 loop 中被 PollHandler 获取 RaftBatchSystem 发来的若干已经提交需要应用的日志,其需要将其攒批提交并在之后执行对应的回调函数返回客户端结果。需要注意的是,返回客户端结果之后 ApplyBatchSystem 还需要向 RaftBatchSystem 再 propose ApplyRes 的消息,从而更新 RaftBatchSystem 的某些内存状态,比如 applyIndex,该字段的更新能够推动某些阻塞在某个 ReadIndex 上的读请求继续执行。

如下便是 BatchSystem 的启动流程及 poll 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
/// A system that can poll FSMs concurrently and in batch.
///
/// To use the system, two type of FSMs and their PollHandlers need to be
/// defined: Normal and Control. Normal FSM handles the general task while
/// Control FSM creates normal FSM instances.
pub struct BatchSystem<N: Fsm, C: Fsm> {
name_prefix: Option<String>,
router: BatchRouter<N, C>,
receiver: channel::Receiver<FsmTypes<N, C>>,
low_receiver: channel::Receiver<FsmTypes<N, C>>,
pool_size: usize,
max_batch_size: usize,
workers: Arc<Mutex<Vec<JoinHandle<()>>>>,
joinable_workers: Arc<Mutex<Vec<ThreadId>>>,
reschedule_duration: Duration,
low_priority_pool_size: usize,
pool_state_builder: Option<PoolStateBuilder<N, C>>,
}

impl<N, C> BatchSystem<N, C>
where
N: Fsm + Send + 'static,
C: Fsm + Send + 'static,
{
fn start_poller<B>(&mut self, name: String, priority: Priority, builder: &mut B)
where
B: HandlerBuilder<N, C>,
B::Handler: Send + 'static,
{
let handler = builder.build(priority);
let receiver = match priority {
Priority::Normal => self.receiver.clone(),
Priority::Low => self.low_receiver.clone(),
};
let mut poller = Poller {
router: self.router.clone(),
fsm_receiver: receiver,
handler,
max_batch_size: self.max_batch_size,
reschedule_duration: self.reschedule_duration,
joinable_workers: if priority == Priority::Normal {
Some(Arc::clone(&self.joinable_workers))
} else {
None
},
};
let props = tikv_util::thread_group::current_properties();
let t = thread::Builder::new()
.name(name)
.spawn_wrapper(move || {
tikv_util::thread_group::set_properties(props);
set_io_type(IoType::ForegroundWrite);
poller.poll();
})
.unwrap();
self.workers.lock().unwrap().push(t);
}

/// Start the batch system.
pub fn spawn<B>(&mut self, name_prefix: String, mut builder: B)
where
B: HandlerBuilder<N, C>,
B::Handler: Send + 'static,
{
for i in 0..self.pool_size {
self.start_poller(
thd_name!(format!("{}-{}", name_prefix, i)),
Priority::Normal,
&mut builder,
);
}
for i in 0..self.low_priority_pool_size {
self.start_poller(
thd_name!(format!("{}-low-{}", name_prefix, i)),
Priority::Low,
&mut builder,
);
}
self.name_prefix = Some(name_prefix);
}
}

/// Polls for readiness and forwards them to handler. Removes stale peers if
/// necessary.
pub fn poll(&mut self) {
fail_point!("poll");
let mut batch = Batch::with_capacity(self.max_batch_size);
let mut reschedule_fsms = Vec::with_capacity(self.max_batch_size);
let mut to_skip_end = Vec::with_capacity(self.max_batch_size);

// Fetch batch after every round is finished. It's helpful to protect regions
// from becoming hungry if some regions are hot points. Since we fetch new FSM
// every time calling `poll`, we do not need to configure a large value for
// `self.max_batch_size`.
let mut run = true;
while run && self.fetch_fsm(&mut batch) {
// If there is some region wait to be deal, we must deal with it even if it has
// overhead max size of batch. It's helpful to protect regions from becoming
// hungry if some regions are hot points.
let mut max_batch_size = std::cmp::max(self.max_batch_size, batch.normals.len());
// Update some online config if needed.
{
// TODO: rust 2018 does not support capture disjoint field within a closure.
// See https://github.com/rust-lang/rust/issues/53488 for more details.
// We can remove this once we upgrade to rust 2021 or later edition.
let batch_size = &mut self.max_batch_size;
self.handler.begin(max_batch_size, |cfg| {
*batch_size = cfg.max_batch_size();
});
}
max_batch_size = std::cmp::max(self.max_batch_size, batch.normals.len());

if batch.control.is_some() {
let len = self.handler.handle_control(batch.control.as_mut().unwrap());
if batch.control.as_ref().unwrap().is_stopped() {
batch.remove_control(&self.router.control_box);
} else if let Some(len) = len {
batch.release_control(&self.router.control_box, len);
}
}

let mut hot_fsm_count = 0;
for (i, p) in batch.normals.iter_mut().enumerate() {
let p = p.as_mut().unwrap();
let res = self.handler.handle_normal(p);
if p.is_stopped() {
p.policy = Some(ReschedulePolicy::Remove);
reschedule_fsms.push(i);
} else if p.get_priority() != self.handler.get_priority() {
p.policy = Some(ReschedulePolicy::Schedule);
reschedule_fsms.push(i);
} else {
if p.timer.saturating_elapsed() >= self.reschedule_duration {
hot_fsm_count += 1;
// We should only reschedule a half of the hot regions, otherwise,
// it's possible all the hot regions are fetched in a batch the
// next time.
if hot_fsm_count % 2 == 0 {
p.policy = Some(ReschedulePolicy::Schedule);
reschedule_fsms.push(i);
continue;
}
}
if let HandleResult::StopAt { progress, skip_end } = res {
p.policy = Some(ReschedulePolicy::Release(progress));
reschedule_fsms.push(i);
if skip_end {
to_skip_end.push(i);
}
}
}
}
let mut fsm_cnt = batch.normals.len();
while batch.normals.len() < max_batch_size {
if let Ok(fsm) = self.fsm_receiver.try_recv() {
run = batch.push(fsm);
}
// When `fsm_cnt >= batch.normals.len()`:
// - No more FSMs in `fsm_receiver`.
// - We receive a control FSM. Break the loop because ControlFsm may change
// state of the handler, we shall deal with it immediately after calling
// `begin` of `Handler`.
if !run || fsm_cnt >= batch.normals.len() {
break;
}
let p = batch.normals[fsm_cnt].as_mut().unwrap();
let res = self.handler.handle_normal(p);
if p.is_stopped() {
p.policy = Some(ReschedulePolicy::Remove);
reschedule_fsms.push(fsm_cnt);
} else if let HandleResult::StopAt { progress, skip_end } = res {
p.policy = Some(ReschedulePolicy::Release(progress));
reschedule_fsms.push(fsm_cnt);
if skip_end {
to_skip_end.push(fsm_cnt);
}
}
fsm_cnt += 1;
}
self.handler.light_end(&mut batch.normals);
for index in &to_skip_end {
batch.schedule(&self.router, *index);
}
to_skip_end.clear();
self.handler.end(&mut batch.normals);

// Iterate larger index first, so that `swap_reclaim` won't affect other FSMs
// in the list.
for index in reschedule_fsms.iter().rev() {
batch.schedule(&self.router, *index);
batch.swap_reclaim(*index);
}
reschedule_fsms.clear();
}
if let Some(fsm) = batch.control.take() {
self.router.control_scheduler.schedule(fsm);
info!("poller will exit, release the left ControlFsm");
}
let left_fsm_cnt = batch.normals.len();
if left_fsm_cnt > 0 {
info!(
"poller will exit, schedule {} left NormalFsms",
left_fsm_cnt
);
for i in 0..left_fsm_cnt {
let to_schedule = match batch.normals[i].take() {
Some(f) => f,
None => continue,
};
self.router.normal_scheduler.schedule(to_schedule.fsm);
}
}
batch.clear();
}
}

通过本小节,希望您能够了解 BatchSystem 的大体框架,以便结合之后的小节去熟悉全链路的读写流程。

读流程

TiKV 源码解析系列文章(十九)read index 和 local read 情景分析 介绍了 TiKV 3.x 版本的 ReadIndex/LeaseRead 实现方案。

本小节将在 TiKV 6.x 版本的源码基础上,以一条读请求为例,介绍当前版本读请求的全链路执行流程。

前文已经提到,可以从 kvproto 对应的 service Tikv 中了解当前 TiKV 支持的 RPC 接口。

经过简单整理,常用的读接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Key/value store API for TiKV.
service Tikv {

rpc KvGet(kvrpcpb.GetRequest) returns (kvrpcpb.GetResponse) {}
rpc KvScan(kvrpcpb.ScanRequest) returns (kvrpcpb.ScanResponse) {}
rpc KvBatchGet(kvrpcpb.BatchGetRequest) returns (kvrpcpb.BatchGetResponse) {}

rpc RawGet(kvrpcpb.RawGetRequest) returns (kvrpcpb.RawGetResponse) {}
rpc RawBatchGet(kvrpcpb.RawBatchGetRequest) returns (kvrpcpb.RawBatchGetResponse) {}
rpc RawScan(kvrpcpb.RawScanRequest) returns (kvrpcpb.RawScanResponse) {}
rpc RawBatchScan(kvrpcpb.RawBatchScanRequest) returns (kvrpcpb.RawBatchScanResponse) {}

...
}

以下将以最常用的 KvGet 接口为例介绍读流程,其他的读接口所经过的模块大致相似,之后也可以用断点调试的方案去自行阅读。

在 KVService 中, handle_request 宏将业务逻辑封装到了 future_get 函数中。在 future_get 函数中,主要使用了 storage.get(req.take_context(), Key::from_raw(req.get_key()), req.get_version().into()) 函数将请求路由到 Storage 模块去执行。

为了可观测性,当前 TiKV 在读写关键路径上加了很多全局和 request 级别的 metric,这一定程度上影响了刚开始阅读代码的体验。其实刚开始熟悉代码时只需要关注核心逻辑即可,metric 相关的代码可以先不用细究。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
impl<T: RaftStoreRouter<E::Local> + 'static, E: Engine, L: LockManager, F: KvFormat> Tikv
for Service<T, E, L, F>
{
handle_request!(kv_get, future_get, GetRequest, GetResponse, has_time_detail);
}

fn future_get<E: Engine, L: LockManager, F: KvFormat>(
storage: &Storage<E, L, F>,
mut req: GetRequest,
) -> impl Future<Output = ServerResult<GetResponse>> {

...

let v = storage.get(
req.take_context(),
Key::from_raw(req.get_key()),
req.get_version().into(),
);

async move {
let v = v.await;

...

Ok(resp)
}
}

在 Storage 模块的 get 函数中,所有的 task 都会被 spawn 到 readPool 中执行,具体执行的任务主要包含以下两个工作:

  • 使用 Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await? 获取 snapshot
  • 使用 snap_store.get(&key, &mut statistics) 基于获取到的 snapshot 获取符合对应事务语义的数据

第二个工作比较简单,本小节不再赘述,以下主要介绍第一个工作的具体代码流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/// Get value of the given key from a snapshot.
///
/// Only writes that are committed before `start_ts` are visible.
pub fn get(
&self,
mut ctx: Context,
key: Key,
start_ts: TimeStamp,
) -> impl Future<Output = Result<(Option<Value>, KvGetStatistics)>> {

...

let res = self.read_pool.spawn_handle(
async move {

...

let snap_ctx = prepare_snap_ctx(
&ctx,
iter::once(&key),
start_ts,
&bypass_locks,
&concurrency_manager,
CMD,
)?;
let snapshot =
Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await?;

{
let begin_instant = Instant::now();
let stage_snap_recv_ts = begin_instant;
let buckets = snapshot.ext().get_buckets();
let mut statistics = Statistics::default();
let result = Self::with_perf_context(CMD, || {
let _guard = sample.observe_cpu();
let snap_store = SnapshotStore::new(
snapshot,
start_ts,
ctx.get_isolation_level(),
!ctx.get_not_fill_cache(),
bypass_locks,
access_locks,
false,
);
snap_store
.get(&key, &mut statistics)
// map storage::txn::Error -> storage::Error
.map_err(Error::from)
.map(|r| {
KV_COMMAND_KEYREAD_HISTOGRAM_STATIC.get(CMD).observe(1_f64);
r
})
});

...

Ok((
result?,
KvGetStatistics {
stats: statistics,
latency_stats,
},
))
}
}
.in_resource_metering_tag(resource_tag),
priority,
thread_rng().next_u64(),
);
async move {
res.map_err(|_| Error::from(ErrorInner::SchedTooBusy))
.await?
}
}

对于 Self::snapshot(engine, snap_ctx) 函数,其会经由 storage::snapshot -> kv::snapshot -> raftkv::async_snapshot -> raftkv::exec_snapshot 的调用链来到 ServerRaftStoreRouter::read 函数中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/// Get a snapshot of `engine`.
fn snapshot(
engine: &E,
ctx: SnapContext<'_>,
) -> impl std::future::Future<Output = Result<E::Snap>> {
kv::snapshot(engine, ctx)
.map_err(txn::Error::from)
.map_err(Error::from)
}

/// Get a snapshot of `engine`.
pub fn snapshot<E: Engine>(
engine: &E,
ctx: SnapContext<'_>,
) -> impl std::future::Future<Output = Result<E::Snap>> {
let begin = Instant::now();
let (callback, future) =
tikv_util::future::paired_must_called_future_callback(drop_snapshot_callback::<E>);
let val = engine.async_snapshot(ctx, callback);
// make engine not cross yield point
async move {
val?; // propagate error
let result = future
.map_err(|cancel| Error::from(ErrorInner::Other(box_err!(cancel))))
.await?;
with_tls_tracker(|tracker| {
tracker.metrics.get_snapshot_nanos += begin.elapsed().as_nanos() as u64;
});
fail_point!("after-snapshot");
result
}
}

fn async_snapshot(&self, mut ctx: SnapContext<'_>, cb: Callback<Self::Snap>) -> kv::Result<()> {

...

self.exec_snapshot(
ctx,
req,
Box::new(move |res| match res {
...
}),
)
.map_err(|e| {
let status_kind = get_status_kind_from_error(&e);
ASYNC_REQUESTS_COUNTER_VEC.snapshot.get(status_kind).inc();
e.into()
})
}

fn exec_snapshot(
&self,
ctx: SnapContext<'_>,
req: Request,
cb: Callback<CmdRes<E::Snapshot>>,
) -> Result<()> {

...

let mut cmd = RaftCmdRequest::default();
cmd.set_header(header);
cmd.set_requests(vec![req].into());
self.router
.read(
ctx.read_id,
cmd,
StoreCallback::read(Box::new(move |resp| {
cb(on_read_result(resp).map_err(Error::into));
})),
)
.map_err(From::from)
}

impl<EK: KvEngine, ER: RaftEngine> LocalReadRouter<EK> for ServerRaftStoreRouter<EK, ER> {
fn read(
&self,
read_id: Option<ThreadReadId>,
req: RaftCmdRequest,
cb: Callback<EK::Snapshot>,
) -> RaftStoreResult<()> {
let mut local_reader = self.local_reader.borrow_mut();
local_reader.read(read_id, req, cb);
Ok(())
}
}

ServerRaftStoreRouter::read 函数中,其会调用 local_readerread 函数,并进而路由到 LocalReader::propose_raft_command 函数。在该函数中,会使用 LocalReader::pre_propose_raft_command 函数来判断是否能够 ReadLocal,如果可以则直接获取本地引擎的 snapshot 并执行 callback 返回即可,否则便调用 redirect 函数连带 callback 路由到 RaftBatchSystem 的对应 normal 状态机中去,之后本线程不再处理该任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#[inline]
pub fn read(
&mut self,
read_id: Option<ThreadReadId>,
req: RaftCmdRequest,
cb: Callback<E::Snapshot>,
) {
self.propose_raft_command(read_id, req, cb);
maybe_tls_local_read_metrics_flush();
}

pub fn propose_raft_command(
&mut self,
mut read_id: Option<ThreadReadId>,
req: RaftCmdRequest,
cb: Callback<E::Snapshot>,
) {
match self.pre_propose_raft_command(&req) {
Ok(Some((mut delegate, policy))) => {
let delegate_ext: LocalReadContext<'_, E>;
let mut response = match policy {
// Leader can read local if and only if it is in lease.
RequestPolicy::ReadLocal => {

...

let region = Arc::clone(&delegate.region);
let response =
delegate.execute(&req, &region, None, read_id, Some(delegate_ext));
// Try renew lease in advance
delegate.maybe_renew_lease_advance(&self.router, snapshot_ts);
response
}
// Replica can serve stale read if and only if its `safe_ts` >= `read_ts`
RequestPolicy::StaleRead => {

...

let region = Arc::clone(&delegate.region);
// Getting the snapshot
let response =
delegate.execute(&req, &region, None, read_id, Some(delegate_ext));

...

}
_ => unreachable!(),
};

...

cb.invoke_read(response);
}
// Forward to raftstore.
Ok(None) => self.redirect(RaftCommand::new(req, cb)),
Err(e) => {
let mut response = cmd_resp::new_error(e);
if let Some(delegate) = self.delegates.get(&req.get_header().get_region_id()) {
cmd_resp::bind_term(&mut response, delegate.term);
}
cb.invoke_read(ReadResponse {
response,
snapshot: None,
txn_extra_op: TxnExtraOp::Noop,
});
}
}
}

需要注意的是,在此处判断能否 ReadLocal 是可以并行的,也就是乐观情况下并行的读请求可以并行获取底层引擎的 snapshot,不需要经过 RaftBatchSystem 。

那么到底什么时候可以直接读取 snapshot 而不需要经过 RaftStore 呢?原理就是 Lease 机制,可以先简单阅读一下 TiKV Lease Read 的功能介绍

接着让我们回到 LocalReader::pre_propose_raft_command 函数,其会进行一系列的检查(此处已略去),如果皆通过则会进一步调用 inspector.inspect(req) 函数,在其内部,其会进行一系列的判断并返回是否可以 ReadLocal。

  • req.get_header().get_read_quorum():如果该请求明确要求需要用 read index 方式处理,所以返回 ReadIndex。
  • self.has_applied_to_current_term():如果该 leader 尚未 apply 到它自己的 term,则使用 ReadIndex 处理,这是 Raft 有关线性一致性读的一个 corner case。
  • self.inspect_lease():如果该 leader 的 lease 已经过期或者不确定,说明可能出现了一些问题,比如网络不稳定,心跳没成功等,此时使用 ReadIndex 处理,否则便可以使用 ReadLocal 处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
pub fn pre_propose_raft_command(
&mut self,
req: &RaftCmdRequest,
) -> Result<Option<(D, RequestPolicy)>> {

...

match inspector.inspect(req) {
Ok(RequestPolicy::ReadLocal) => Ok(Some((delegate, RequestPolicy::ReadLocal))),
Ok(RequestPolicy::StaleRead) => Ok(Some((delegate, RequestPolicy::StaleRead))),
// It can not handle other policies.
Ok(_) => Ok(None),
Err(e) => Err(e),
}
}

fn inspect(&mut self, req: &RaftCmdRequest) -> Result<RequestPolicy> {

...

fail_point!("perform_read_index", |_| Ok(RequestPolicy::ReadIndex));

let flags = WriteBatchFlags::from_bits_check(req.get_header().get_flags());
if flags.contains(WriteBatchFlags::STALE_READ) {
return Ok(RequestPolicy::StaleRead);
}

if req.get_header().get_read_quorum() {
return Ok(RequestPolicy::ReadIndex);
}

// If applied index's term is differ from current raft's term, leader transfer
// must happened, if read locally, we may read old value.
if !self.has_applied_to_current_term() {
return Ok(RequestPolicy::ReadIndex);
}

// Local read should be performed, if and only if leader is in lease.
// None for now.
match self.inspect_lease() {
LeaseState::Valid => Ok(RequestPolicy::ReadLocal),
LeaseState::Expired | LeaseState::Suspect => {
// Perform a consistent read to Raft quorum and try to renew the leader lease.
Ok(RequestPolicy::ReadIndex)
}
}
}

乐观情况下的 ReadLocal 流程我们已经了解,接下来让我们看看 ReadIndex 的执行路径。

前文已经介绍过 RaftBatchSystem 的大体框架,我们已知会有多个 PollHandler 线程调用 poll 函数进入长期循环来事件驱动地管理多个 normal 状态机。

当 ReadIndex 请求被路由到 RaftBatchSystem 中的对应 normal 状态机后,某个 PollHandler 会在接下来的一次 loop 中处理该状态机的消息。

直接定位到 RaftPollerhandle_normal 函数。可以看到,其会首先尝试获取 messages_per_tick 次路由到该状态机的消息,接着调用 PeerFsmDelegate::handle_msgs 函数进行处理,

这里只列出了我们需要关注的几种消息类型:

  • RaftMessage: 其他 Peer 发送过来 Raft 消息,包括心跳、日志、投票消息等。
  • RaftCommand: 上层提出的 proposal,其中包含了需要通过 Raft 同步的操作,以及操作成功之后需要调用的 callback 函数。ReadIndex 请求便是一种特殊的 proposal。
  • ApplyRes: ApplyFsm 在将日志应用到状态机之后发送给 PeerFsm 的消息,用于在进行操作之后更新某些内存状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
impl<EK: KvEngine, ER: RaftEngine, T: Transport> PollHandler<PeerFsm<EK, ER>, StoreFsm<EK>>
for RaftPoller<EK, ER, T>
{
fn handle_normal(
&mut self,
peer: &mut impl DerefMut<Target = PeerFsm<EK, ER>>,
) -> HandleResult {
let mut handle_result = HandleResult::KeepProcessing;

...

while self.peer_msg_buf.len() < self.messages_per_tick {
match peer.receiver.try_recv() {
// TODO: we may need a way to optimize the message copy.
Ok(msg) => {
...
self.peer_msg_buf.push(msg);
}
Err(TryRecvError::Empty) => {
handle_result = HandleResult::stop_at(0, false);
break;
}
Err(TryRecvError::Disconnected) => {
peer.stop();
handle_result = HandleResult::stop_at(0, false);
break;
}
}
}

let mut delegate = PeerFsmDelegate::new(peer, &mut self.poll_ctx);
delegate.handle_msgs(&mut self.peer_msg_buf);
// No readiness is generated and using sync write, skipping calling ready and
// release early.
if !delegate.collect_ready() && self.poll_ctx.sync_write_worker.is_some() {
if let HandleResult::StopAt { skip_end, .. } = &mut handle_result {
*skip_end = true;
}
}

handle_result
}
}

impl<'a, EK, ER, T: Transport> PeerFsmDelegate<'a, EK, ER, T>
where
EK: KvEngine,
ER: RaftEngine,
{
pub fn handle_msgs(&mut self, msgs: &mut Vec<PeerMsg<EK>>) {
for m in msgs.drain(..) {
match m {
PeerMsg::RaftMessage(msg) => {
if let Err(e) = self.on_raft_message(msg) {
error!(%e;
"handle raft message err";
"region_id" => self.fsm.region_id(),
"peer_id" => self.fsm.peer_id(),
);
}
}
PeerMsg::RaftCommand(cmd) => {
...
self.propose_raft_command(
cmd.request,
cmd.callback,
cmd.extra_opts.disk_full_opt,
);
}
}
PeerMsg::ApplyRes { res } => {
self.on_apply_res(res);
}
...
}
}
}

对于 ReadIndex 请求,其会进入 PeerMsg::RaftCommand(cmd) 分支,进而以 PeerFsmDelegate::propose_raft_command -> PeerFsmDelegate::propose_raft_command_internal 的调用链走到 store::propose 函数中,在该函数中,会再进行一次 self.inspect(),如果此时 Leader 的 lease 已经稳定,则会调用 read_local 函数直接获取引擎的 snapshot 并执行 callback 返回,否则调用 read_index 函数执行 ReadIndex 流程。

在 read_index 函数中,ReadIndex 请求连带 callback 会被构建成一个 ReadIndexRequest 被 push 到 pending_reads 即一个 ReadIndexQueue 中,之后当前线程即可结束本轮流程,之后的事件会进而触发该 ReadIndexRequest 的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
pub fn propose<T: Transport>(
&mut self,
ctx: &mut PollContext<EK, ER, T>,
mut cb: Callback<EK::Snapshot>,
req: RaftCmdRequest,
mut err_resp: RaftCmdResponse,
mut disk_full_opt: DiskFullOpt,
) -> bool {

...

let policy = self.inspect(&req);
let res = match policy {
Ok(RequestPolicy::ReadLocal) | Ok(RequestPolicy::StaleRead) => {
self.read_local(ctx, req, cb);
return false;
}
Ok(RequestPolicy::ReadIndex) => return self.read_index(ctx, req, err_resp, cb),
Ok(RequestPolicy::ProposeTransferLeader) => {
return self.propose_transfer_leader(ctx, req, cb);
}
Ok(RequestPolicy::ProposeNormal) => {
// For admin cmds, only region split/merge comes here.
if req.has_admin_request() {
disk_full_opt = DiskFullOpt::AllowedOnAlmostFull;
}
self.check_normal_proposal_with_disk_full_opt(ctx, disk_full_opt)
.and_then(|_| self.propose_normal(ctx, req))
}
Ok(RequestPolicy::ProposeConfChange) => self.propose_conf_change(ctx, &req),
Err(e) => Err(e),
};
fail_point!("after_propose");

...
}

fn read_index<T: Transport>(
&mut self,
poll_ctx: &mut PollContext<EK, ER, T>,
mut req: RaftCmdRequest,
mut err_resp: RaftCmdResponse,
cb: Callback<EK::Snapshot>,
) -> bool {

...

let mut read = ReadIndexRequest::with_command(id, req, cb, now);
read.addition_request = request.map(Box::new);
self.push_pending_read(read, self.is_leader());
self.should_wake_up = true;

...

true
}

那么什么条件满足后该 ReadIndexRequest 会被 pop 出队列并执行呢?

前面已经提到 ApplyBatchSystem 在应用一批日志之后首先会调用对应的 callback 尽快回复客户端,之后会发送一条 ApplyRes 的消息到 RaftBatchSystem,该消息和以上的 ReadIndex 请求一样被 PollHandler 在一次 loop 中被处理,并最终进入 PeerFsmDelegate::handle_msgs 函数的 PeerMsg::ApplyRes { res } 分支,接着其会调用 fsm::on_apply_res 函数并进入 store::post_apply 函数,在该函数中,ApplyRes 中携带的信息会被用来更新一些内存状态例如 raft_groupcmd_epoch_checker,当然,这些信息也会释放某些满足条件的 ReadIndexRequest,对于每个 ReadIndexRequest ,此时可以获取底层引擎的 Snapshot 并执行 callback 返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
fn on_apply_res(&mut self, res: ApplyTaskRes<EK::Snapshot>) {
fail_point!("on_apply_res", |_| {});
match res {
ApplyTaskRes::Apply(mut res) => {

...

self.fsm.has_ready |= self.fsm.peer.post_apply(
self.ctx,
res.apply_state,
res.applied_term,
&res.metrics,
);

...
}
ApplyTaskRes::Destroy {
region_id,
peer_id,
merge_from_snapshot,
} => {
...
}
}
}

pub fn post_apply<T>(
&mut self,
ctx: &mut PollContext<EK, ER, T>,
apply_state: RaftApplyState,
applied_term: u64,
apply_metrics: &ApplyMetrics,
) -> bool {
let mut has_ready = false;

if self.is_handling_snapshot() {
panic!("{} should not applying snapshot.", self.tag);
}

let applied_index = apply_state.get_applied_index();
self.raft_group.advance_apply_to(applied_index);

self.cmd_epoch_checker.advance_apply(
applied_index,
self.term(),
self.raft_group.store().region(),
);

...

if self.has_pending_snapshot() && self.ready_to_handle_pending_snap() {
has_ready = true;
}
if !self.is_leader() {
self.post_pending_read_index_on_replica(ctx)
} else if self.ready_to_handle_read() {
while let Some(mut read) = self.pending_reads.pop_front() {
self.response_read(&mut read, ctx, false);
}
}
self.pending_reads.gc();

...

has_ready
}

综上,ReadIndexRequest 入队和出队的时机已经被介绍,那么 ReadIndex 的整体流程也基本介绍完整了。

通过本小节,希望您能够了解 KVGet 读请求的完整流程,并进而具备分析其他读请求全链路的能力。

写流程

以下四篇博客由上到下分别介绍了 TiKV 3.x 版本 KVService,Storage 和 RaftStore 模块对于分布式事务请求的执行流程。

本小节将在 TiKV 6.x 版本的基础上,以一条 PreWrite 请求为例,介绍当前版本的写请求全链路执行流程。

在 KVService 层,通过 handle_request 和 txn_command_future 宏,PreWrite 接口的请求会直接被路由到 Storage::sched_txn_command 函数中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
impl<T: RaftStoreRouter<E::Local> + 'static, E: Engine, L: LockManager, F: KvFormat> Tikv
for Service<T, E, L, F>
{
handle_request!(
kv_prewrite,
future_prewrite,
PrewriteRequest,
PrewriteResponse,
has_time_detail
);
}

txn_command_future!(future_prewrite, PrewriteRequest, PrewriteResponse, (v, resp, tracker) {{
if let Ok(v) = &v {
resp.set_min_commit_ts(v.min_commit_ts.into_inner());
resp.set_one_pc_commit_ts(v.one_pc_commit_ts.into_inner());
GLOBAL_TRACKERS.with_tracker(tracker, |tracker| {
tracker.write_scan_detail(resp.mut_exec_details_v2().mut_scan_detail_v2());
tracker.write_write_detail(resp.mut_exec_details_v2().mut_write_detail());
});
}
resp.set_errors(extract_key_errors(v.map(|v| v.locks)).into());
}});

在 Storage 模块,其会将请求路由到 Scheduler::run_cmd 函数中,并进一步路由到 Scheduler::schedule_command 函数中。在 schedule_command 函数中,当前 command 连同 callback 等上下文会被保存到 task_slots 中,如果当前线程申请到了所有 latch 则会调用 execute 函数继续执行该 task,否则如前文所述,当前任务便会被阻塞在某些 latch 上等待其他线程去唤醒进而执行,当前线程会直接返回并执行其他的工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// The entry point of the storage scheduler. Not only transaction commands need
// to access keys serially.
pub fn sched_txn_command<T: StorageCallbackType>(
&self,
cmd: TypedCommand<T>,
callback: Callback<T>,
) -> Result<()> {

...

self.sched.run_cmd(cmd, T::callback(callback));

Ok(())
}

pub(in crate::storage) fn run_cmd(&self, cmd: Command, callback: StorageCallback) {
// write flow control
if cmd.need_flow_control() && self.inner.too_busy(cmd.ctx().region_id) {
SCHED_TOO_BUSY_COUNTER_VEC.get(cmd.tag()).inc();
callback.execute(ProcessResult::Failed {
err: StorageError::from(StorageErrorInner::SchedTooBusy),
});
return;
}
self.schedule_command(cmd, callback);
}

fn schedule_command(&self, cmd: Command, callback: StorageCallback) {
let cid = self.inner.gen_id();
let tracker = get_tls_tracker_token();
debug!("received new command"; "cid" => cid, "cmd" => ?cmd, "tracker" => ?tracker);
let tag = cmd.tag();
let priority_tag = get_priority_tag(cmd.priority());
SCHED_STAGE_COUNTER_VEC.get(tag).new.inc();
SCHED_COMMANDS_PRI_COUNTER_VEC_STATIC
.get(priority_tag)
.inc();

let mut task_slot = self.inner.get_task_slot(cid);
let tctx = task_slot.entry(cid).or_insert_with(|| {
self.inner
.new_task_context(Task::new(cid, tracker, cmd), callback)
});

if self.inner.latches.acquire(&mut tctx.lock, cid) {
fail_point!("txn_scheduler_acquire_success");
tctx.on_schedule();
let task = tctx.task.take().unwrap();
drop(task_slot);
self.execute(task);
return;
}
let task = tctx.task.as_ref().unwrap();
let deadline = task.cmd.deadline();
let cmd_ctx = task.cmd.ctx().clone();
self.fail_fast_or_check_deadline(cid, tag, cmd_ctx, deadline);
fail_point!("txn_scheduler_acquire_fail");
}

在 execute 函数中,当前线程会生成一个异步任务 spawn 到另一个 worker 线程池中去,该任务主要包含以下两个步骤:

  • 使用 Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await 获取 snapshot。此步骤与上文读流程中获取 snapshot 的步骤相同,可能通过 ReadLocal 也可能通过 ReadIndex 来获取引擎的 snapshot,此小节不在赘述
  • 使用 sched.process(snapshot, task).await 基于获取到的 snapshot 和对应 task 去调用 scheduler::process 函数,进而被路由到 scheduler::process_write 函数中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/// Executes the task in the sched pool.
fn execute(&self, mut task: Task) {
set_tls_tracker_token(task.tracker);
let sched = self.clone();
self.get_sched_pool(task.cmd.priority())
.pool
.spawn(async move {

...

// The program is currently in scheduler worker threads.
// Safety: `self.inner.worker_pool` should ensure that a TLS engine exists.
match unsafe { with_tls_engine(|engine: &E| kv::snapshot(engine, snap_ctx)) }.await
{
Ok(snapshot) => {

...

sched.process(snapshot, task).await;
}
Err(err) => {
...
}
}
})
.unwrap();
}

/// Process the task in the current thread.
async fn process(self, snapshot: E::Snap, task: Task) {
if self.check_task_deadline_exceeded(&task) {
return;
}

let resource_tag = self.inner.resource_tag_factory.new_tag(task.cmd.ctx());
async {

...

if task.cmd.readonly() {
self.process_read(snapshot, task, &mut statistics);
} else {
self.process_write(snapshot, task, &mut statistics).await;
};

...
}
.in_resource_metering_tag(resource_tag)
.await;
}

scheduler::process_write 函数是事务处理的关键函数,目前已经有近四百行,里面夹杂了很多新特性和新优化的复杂逻辑,其中最重要的逻辑有两个:

  • 使用 task.cmd.process_write(snapshot, context).map_err(StorageError::from) 根据 snapshot 和 task 执行事务对应的语义:可以从 Command::process_write 函数看到不同的请求都有不同的实现,每种请求都可能根据 snapshot 去底层获取一些数据并尝试写入一些数据。有关 PreWrite 和其他请求的具体操作可以参照 TiKV 源码解析系列文章(十二)分布式事务,此处不再赘述。需要注意的是,此时的写入仅仅缓存在了 WriteData 中,并没有对底层引擎进行实际修改。
  • 使用 engine.async_write_ext(&ctx, to_be_write, engine_cb, proposed_cb, committed_cb) 将缓存的 WriteData 实际写入到 engine 层,对于 RaftKV 来说则是表示一次 propose,想要对这一批 WriteData commit 且 apply
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
async fn process_write(self, snapshot: E::Snap, task: Task, statistics: &mut Statistics) {

...

let write_result = {
let _guard = sample.observe_cpu();
let context = WriteContext {
lock_mgr: &self.inner.lock_mgr,
concurrency_manager: self.inner.concurrency_manager.clone(),
extra_op: task.extra_op,
statistics,
async_apply_prewrite: self.inner.enable_async_apply_prewrite,
};
let begin_instant = Instant::now();
let res = unsafe {
with_perf_context::<E, _, _>(tag, || {
task.cmd
.process_write(snapshot, context)
.map_err(StorageError::from)
})
};
SCHED_PROCESSING_READ_HISTOGRAM_STATIC
.get(tag)
.observe(begin_instant.saturating_elapsed_secs());
res
};

...

// Safety: `self.sched_pool` ensures a TLS engine exists.
unsafe {
with_tls_engine(|engine: &E| {
if let Err(e) =
engine.async_write_ext(&ctx, to_be_write, engine_cb, proposed_cb, committed_cb)
{
SCHED_STAGE_COUNTER_VEC.get(tag).async_write_err.inc();

info!("engine async_write failed"; "cid" => cid, "err" => ?e);
scheduler.finish_with_err(cid, e);
}
})
}
}

pub(crate) fn process_write<S: Snapshot, L: LockManager>(
self,
snapshot: S,
context: WriteContext<'_, L>,
) -> Result<WriteResult> {
match self {
Command::Prewrite(t) => t.process_write(snapshot, context),
Command::PrewritePessimistic(t) => t.process_write(snapshot, context),
Command::AcquirePessimisticLock(t) => t.process_write(snapshot, context),
Command::Commit(t) => t.process_write(snapshot, context),
Command::Cleanup(t) => t.process_write(snapshot, context),
Command::Rollback(t) => t.process_write(snapshot, context),
Command::PessimisticRollback(t) => t.process_write(snapshot, context),
Command::ResolveLock(t) => t.process_write(snapshot, context),
Command::ResolveLockLite(t) => t.process_write(snapshot, context),
Command::TxnHeartBeat(t) => t.process_write(snapshot, context),
Command::CheckTxnStatus(t) => t.process_write(snapshot, context),
Command::CheckSecondaryLocks(t) => t.process_write(snapshot, context),
Command::Pause(t) => t.process_write(snapshot, context),
Command::RawCompareAndSwap(t) => t.process_write(snapshot, context),
Command::RawAtomicStore(t) => t.process_write(snapshot, context),
_ => panic!("unsupported write command"),
}
}

fn async_write_ext(
&self,
ctx: &Context,
batch: WriteData,
write_cb: Callback<()>,
proposed_cb: Option<ExtCallback>,
committed_cb: Option<ExtCallback>,
) -> kv::Result<()> {
fail_point!("raftkv_async_write");
if batch.modifies.is_empty() {
return Err(KvError::from(KvErrorInner::EmptyRequest));
}

ASYNC_REQUESTS_COUNTER_VEC.write.all.inc();
let begin_instant = Instant::now_coarse();

self.exec_write_requests(
ctx,
batch,
Box::new(move |res| match res {

...

}),
proposed_cb,
committed_cb,
)
.map_err(|e| {
let status_kind = get_status_kind_from_error(&e);
ASYNC_REQUESTS_COUNTER_VEC.write.get(status_kind).inc();
e.into()
})
}

进入 raftkv::async_write_ext 函数后,其进而通过 raftkv::exec_write_requests -> RaftStoreRouter::send_command 的调用栈将 task 连带 callback 发送给 RaftBatchSystem。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
fn exec_write_requests(
&self,
ctx: &Context,
batch: WriteData,
write_cb: Callback<CmdRes<E::Snapshot>>,
proposed_cb: Option<ExtCallback>,
committed_cb: Option<ExtCallback>,
) -> Result<()> {

...

let cb = StoreCallback::write_ext(
Box::new(move |resp| {
write_cb(on_write_result(resp).map_err(Error::into));
}),
proposed_cb,
committed_cb,
);
let extra_opts = RaftCmdExtraOpts {
deadline: batch.deadline,
disk_full_opt: batch.disk_full_opt,
};
self.router.send_command(cmd, cb, extra_opts)?;

Ok(())
}

/// Sends RaftCmdRequest to local store.
fn send_command(
&self,
req: RaftCmdRequest,
cb: Callback<EK::Snapshot>,
extra_opts: RaftCmdExtraOpts,
) -> RaftStoreResult<()> {
send_command_impl::<EK, _>(self, req, cb, extra_opts)
}

直接定位到 RaftPollerhandle_normal 函数。

与处理 ReadIndex 请求相似, RaftPoller 会首先尝试获取 messages_per_tick 次路由到该状态机的消息,接着调用 PeerFsmDelegate::handle_msgs 函数进行处理,

这里依然只列出了我们需要关注的几种消息类型:

  • RaftMessage: 其他 Peer 发送过来 Raft 消息,包括心跳、日志、投票消息等。
  • RaftCommand: 上层提出的 proposal,其中包含了需要通过 Raft 同步的操作,以及操作成功之后需要调用的 callback 函数。PreWrite 包装出的 RaftCommand 便是最正常的 proposal。
  • ApplyRes: ApplyFsm 在将日志应用到状态机之后发送给 PeerFsm 的消息,用于在进行操作之后更新某些内存状态。

对于 PreWrite 请求,其会进入 PeerMsg::RaftCommand(cmd) 分支,进而以 PeerFsmDelegate::propose_raft_command -> PeerFsmDelegate::propose_raft_command_internal -> Peer::propose -> Peer::propose_normal 的调用链最终被 propose 到 raft-rs 的 RawNode 接口中,同时其 callback 会连带该请求的 logIndex 被 push 到该 Peer 的 proposals 中去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
impl<EK: KvEngine, ER: RaftEngine, T: Transport> PollHandler<PeerFsm<EK, ER>, StoreFsm<EK>>
for RaftPoller<EK, ER, T>
{
fn handle_normal(
&mut self,
peer: &mut impl DerefMut<Target = PeerFsm<EK, ER>>,
) -> HandleResult {
let mut handle_result = HandleResult::KeepProcessing;

...

while self.peer_msg_buf.len() < self.messages_per_tick {
match peer.receiver.try_recv() {
// TODO: we may need a way to optimize the message copy.
Ok(msg) => {
...
self.peer_msg_buf.push(msg);
}
Err(TryRecvError::Empty) => {
handle_result = HandleResult::stop_at(0, false);
break;
}
Err(TryRecvError::Disconnected) => {
peer.stop();
handle_result = HandleResult::stop_at(0, false);
break;
}
}
}

let mut delegate = PeerFsmDelegate::new(peer, &mut self.poll_ctx);
delegate.handle_msgs(&mut self.peer_msg_buf);
// No readiness is generated and using sync write, skipping calling ready and
// release early.
if !delegate.collect_ready() && self.poll_ctx.sync_write_worker.is_some() {
if let HandleResult::StopAt { skip_end, .. } = &mut handle_result {
*skip_end = true;
}
}

handle_result
}
}

impl<'a, EK, ER, T: Transport> PeerFsmDelegate<'a, EK, ER, T>
where
EK: KvEngine,
ER: RaftEngine,
{
pub fn handle_msgs(&mut self, msgs: &mut Vec<PeerMsg<EK>>) {
for m in msgs.drain(..) {
match m {
PeerMsg::RaftMessage(msg) => {
if let Err(e) = self.on_raft_message(msg) {
error!(%e;
"handle raft message err";
"region_id" => self.fsm.region_id(),
"peer_id" => self.fsm.peer_id(),
);
}
}
PeerMsg::RaftCommand(cmd) => {
...
self.propose_raft_command(
cmd.request,
cmd.callback,
cmd.extra_opts.disk_full_opt,
);
}
}
PeerMsg::ApplyRes { res } => {
self.on_apply_res(res);
}
...
}
}
}

pub fn propose<T: Transport>(
&mut self,
ctx: &mut PollContext<EK, ER, T>,
mut cb: Callback<EK::Snapshot>,
req: RaftCmdRequest,
mut err_resp: RaftCmdResponse,
mut disk_full_opt: DiskFullOpt,
) -> bool {

...

let policy = self.inspect(&req);
let res = match policy {
Ok(RequestPolicy::ReadLocal) | Ok(RequestPolicy::StaleRead) => {
self.read_local(ctx, req, cb);
return false;
}
Ok(RequestPolicy::ReadIndex) => return self.read_index(ctx, req, err_resp, cb),
Ok(RequestPolicy::ProposeTransferLeader) => {
return self.propose_transfer_leader(ctx, req, cb);
}
Ok(RequestPolicy::ProposeNormal) => {
// For admin cmds, only region split/merge comes here.
if req.has_admin_request() {
disk_full_opt = DiskFullOpt::AllowedOnAlmostFull;
}
self.check_normal_proposal_with_disk_full_opt(ctx, disk_full_opt)
.and_then(|_| self.propose_normal(ctx, req))
}
Ok(RequestPolicy::ProposeConfChange) => self.propose_conf_change(ctx, &req),
Err(e) => Err(e),
};
fail_point!("after_propose");

match res {
Err(e) => {
cmd_resp::bind_error(&mut err_resp, e);
cb.invoke_with_response(err_resp);
self.post_propose_fail(req_admin_cmd_type);
false
}
Ok(Either::Right(idx)) => {
if !cb.is_none() {
self.cmd_epoch_checker.attach_to_conflict_cmd(idx, cb);
}
self.post_propose_fail(req_admin_cmd_type);
false
}
Ok(Either::Left(idx)) => {
let has_applied_to_current_term = self.has_applied_to_current_term();
if has_applied_to_current_term {
// After this peer has applied to current term and passed above checking
// including `cmd_epoch_checker`, we can safely guarantee
// that this proposal will be committed if there is no abnormal leader transfer
// in the near future. Thus proposed callback can be called.
cb.invoke_proposed();
}
if is_urgent {
self.last_urgent_proposal_idx = idx;
// Eager flush to make urgent proposal be applied on all nodes as soon as
// possible.
self.raft_group.skip_bcast_commit(false);
}
self.should_wake_up = true;
let p = Proposal {
is_conf_change: req_admin_cmd_type == Some(AdminCmdType::ChangePeer)
|| req_admin_cmd_type == Some(AdminCmdType::ChangePeerV2),
index: idx,
term: self.term(),
cb,
propose_time: None,
must_pass_epoch_check: has_applied_to_current_term,
};
if let Some(cmd_type) = req_admin_cmd_type {
self.cmd_epoch_checker
.post_propose(cmd_type, idx, self.term());
}
self.post_propose(ctx, p);
true
}
}
}

在调用完 PeerFsmDelegate::handle_msgs 处理完消息后,会再调用 PeerFsmDelegate::collect_ready() 函数,进而进入 Peer::handle_raft_ready_append 函数。在该函数中会收集 normal 状态机的一次 ready,接着对需要持久化的未提交日志进行持久化(延后攒批),需要发送的消息进行异步发送,需要应用的已提交日志发送给 ApplyBatchSystem。

在三副本情况下,这次获取到的 ready 会在需要持久化的日志和需要发往其他两个 peer 的 message 里发现该 PreWrite 请求的身影,对于 message,一旦收到就会 spawn 给 Transport 让其异步发送,对于持久化,会暂存到内存中在当前 loop 结尾的 end 函数中实际写入到底层引擎中去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/// Collect ready if any.
///
/// Returns false is no readiness is generated.
pub fn collect_ready(&mut self) -> bool {
...

let res = self.fsm.peer.handle_raft_ready_append(self.ctx);

...

}
pub fn handle_raft_ready_append<T: Transport>(
&mut self,
ctx: &mut PollContext<EK, ER, T>,
) -> Option<ReadyResult> {

...

if !self.raft_group.has_ready() {
fail_point!("before_no_ready_gen_snap_task", |_| None);
// Generating snapshot task won't set ready for raft group.
if let Some(gen_task) = self.mut_store().take_gen_snap_task() {
self.pending_request_snapshot_count
.fetch_add(1, Ordering::SeqCst);
ctx.apply_router
.schedule_task(self.region_id, ApplyTask::Snapshot(gen_task));
}
return None;
}

...

let mut ready = self.raft_group.ready();

...

if !ready.must_sync() {
// If this ready need not to sync, the term, vote must not be changed,
// entries and snapshot must be empty.
if let Some(hs) = ready.hs() {
assert_eq!(hs.get_term(), self.get_store().hard_state().get_term());
assert_eq!(hs.get_vote(), self.get_store().hard_state().get_vote());
}
assert!(ready.entries().is_empty());
assert!(ready.snapshot().is_empty());
}

self.on_role_changed(ctx, &ready);

if let Some(hs) = ready.hs() {
let pre_commit_index = self.get_store().commit_index();
assert!(hs.get_commit() >= pre_commit_index);
if self.is_leader() {
self.on_leader_commit_idx_changed(pre_commit_index, hs.get_commit());
}
}

if !ready.messages().is_empty() {
assert!(self.is_leader());
let raft_msgs = self.build_raft_messages(ctx, ready.take_messages());
self.send_raft_messages(ctx, raft_msgs);
}

self.apply_reads(ctx, &ready);

if !ready.committed_entries().is_empty() {
self.handle_raft_committed_entries(ctx, ready.take_committed_entries());
}

...

let ready_number = ready.number();
let persisted_msgs = ready.take_persisted_messages();
let mut has_write_ready = false;
match &res {
HandleReadyResult::SendIoTask | HandleReadyResult::Snapshot { .. } => {
if !persisted_msgs.is_empty() {
task.messages = self.build_raft_messages(ctx, persisted_msgs);
}

if !trackers.is_empty() {
task.trackers = trackers;
}

if let Some(write_worker) = &mut ctx.sync_write_worker {
write_worker.handle_write_task(task);

assert_eq!(self.unpersisted_ready, None);
self.unpersisted_ready = Some(ready);
has_write_ready = true;
} else {
self.write_router.send_write_msg(
ctx,
self.unpersisted_readies.back().map(|r| r.number),
WriteMsg::WriteTask(task),
);

self.unpersisted_readies.push_back(UnpersistedReady {
number: ready_number,
max_empty_number: ready_number,
raft_msgs: vec![],
});

self.raft_group.advance_append_async(ready);
}
}
HandleReadyResult::NoIoTask => {
if let Some(last) = self.unpersisted_readies.back_mut() {
// Attach to the last unpersisted ready so that it can be considered to be
// persisted with the last ready at the same time.
if ready_number <= last.max_empty_number {
panic!(
"{} ready number is not monotonically increaing, {} <= {}",
self.tag, ready_number, last.max_empty_number
);
}
last.max_empty_number = ready_number;

if !persisted_msgs.is_empty() {
self.unpersisted_message_count += persisted_msgs.capacity();
last.raft_msgs.push(persisted_msgs);
}
} else {
// If this ready don't need to be persisted and there is no previous unpersisted
// ready, we can safely consider it is persisted so the persisted msgs can be
// sent immediately.
self.persisted_number = ready_number;

if !persisted_msgs.is_empty() {
fail_point!("raft_before_follower_send");
let msgs = self.build_raft_messages(ctx, persisted_msgs);
self.send_raft_messages(ctx, msgs);
}

// The commit index and messages of light ready should be empty because no data
// needs to be persisted.
let mut light_rd = self.raft_group.advance_append(ready);

self.add_light_ready_metric(&light_rd, &mut ctx.raft_metrics);

if let Some(idx) = light_rd.commit_index() {
panic!(
"{} advance ready that has no io task but commit index is changed to {}",
self.tag, idx
);
}
if !light_rd.messages().is_empty() {
panic!(
"{} advance ready that has no io task but message is not empty {:?}",
self.tag,
light_rd.messages()
);
}
// The committed entries may not be empty when the size is too large to
// be fetched in the previous ready.
if !light_rd.committed_entries().is_empty() {
self.handle_raft_committed_entries(ctx, light_rd.take_committed_entries());
}
}
}
}

...
}

等到任何一个 follower 返回确认后,该 response 会被路由到 RaftBatchSystem,PollHandler 在接下来的一次 loop 中对其进行处理,该请求会被路由到 PeerFsmDelegate::handle_msgs 函数的 PeerMsg::RaftMessage(msg) 分支中,进而调用 step 函数交给 raft-rs 状态机进行处理。

由于此时已经满足了 quorum 的写入,raft-rs 会将该 PreWrite 请求对应的 raftlog 进行提交并在下一次被获取 ready 时返回,在本轮 loop 的 PeerFsmDelegate::collect_ready() 函数及 Peer::handle_raft_ready_append 函数中,会调用 self.handle_raft_committed_entries(ctx, ready.take_committed_entries()) 函数。在该函数中,其会根据已提交日志从 Peer 的 proposals 中获取到对应的 callback,连带这一批所有的已提交日志构建一个 Apply Task 通过 apply_router 发送给 ApplyBatchSystem。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
impl<'a, EK, ER, T: Transport> PeerFsmDelegate<'a, EK, ER, T>
where
EK: KvEngine,
ER: RaftEngine,
{
pub fn handle_msgs(&mut self, msgs: &mut Vec<PeerMsg<EK>>) {
for m in msgs.drain(..) {
match m {
PeerMsg::RaftMessage(msg) => {
if let Err(e) = self.on_raft_message(msg) {
error!(%e;
"handle raft message err";
"region_id" => self.fsm.region_id(),
"peer_id" => self.fsm.peer_id(),
);
}
}
PeerMsg::RaftCommand(cmd) => {
...
self.propose_raft_command(
cmd.request,
cmd.callback,
cmd.extra_opts.disk_full_opt,
);
}
}
PeerMsg::ApplyRes { res } => {
self.on_apply_res(res);
}
...
}
}
}

fn handle_raft_committed_entries<T>(
&mut self,
ctx: &mut PollContext<EK, ER, T>,
committed_entries: Vec<Entry>,
) {
if committed_entries.is_empty() {
return;
}

...

if let Some(last_entry) = committed_entries.last() {
self.last_applying_idx = last_entry.get_index();
if self.last_applying_idx >= self.last_urgent_proposal_idx {
// Urgent requests are flushed, make it lazy again.
self.raft_group.skip_bcast_commit(true);
self.last_urgent_proposal_idx = u64::MAX;
}
let cbs = if !self.proposals.is_empty() {
let current_term = self.term();
let cbs = committed_entries
.iter()
.filter_map(|e| {
self.proposals
.find_proposal(e.get_term(), e.get_index(), current_term)
})
.map(|mut p| {
if p.must_pass_epoch_check {
// In this case the apply can be guaranteed to be successful. Invoke the
// on_committed callback if necessary.
p.cb.invoke_committed();
}
p
})
.collect();
self.proposals.gc();
cbs
} else {
vec![]
};
// Note that the `commit_index` and `commit_term` here may be used to
// forward the commit index. So it must be less than or equal to persist
// index.
let commit_index = cmp::min(
self.raft_group.raft.raft_log.committed,
self.raft_group.raft.raft_log.persisted,
);
let commit_term = self.get_store().term(commit_index).unwrap();
let mut apply = Apply::new(
self.peer_id(),
self.region_id,
self.term(),
commit_index,
commit_term,
committed_entries,
cbs,
self.region_buckets.as_ref().map(|b| b.meta.clone()),
);
apply.on_schedule(&ctx.raft_metrics);
self.mut_store()
.trace_cached_entries(apply.entries[0].clone());
if needs_evict_entry_cache(ctx.cfg.evict_cache_on_memory_ratio) {
// Compact all cached entries instead of half evict.
self.mut_store().evict_entry_cache(false);
}
ctx.apply_router
.schedule_task(self.region_id, ApplyTask::apply(apply));
}
fail_point!("after_send_to_apply_1003", self.peer_id() == 1003, |_| {});
}

此时直接定位到 ApplyPollerhandle_normal 函数,可以看到,ApplyPoller 也会首先尝试获取 messages_per_tick 次路由到该状态机的消息,接着调用 ApplyFSM::handle_tasks 函数进行处理。然后其会经历 ApplyFSM::handle_apply -> ApplyDelegate::handle_raft_committed_entries 的调用链来到 ApplyDelegate::handle_raft_entry_normal 函数中,在该函数中,会尝试将调用 ApplyDelegate::process_raft_cmd 函数来将本次写入缓存到 kv_write_batch 中,值得一提的是,在写入缓存之前会首先判断是否能够进行一次提交,如果可以则需要在写入缓存之前将这一批日志提交到底层引擎。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
fn handle_normal(&mut self, normal: &mut impl DerefMut<Target = ApplyFsm<EK>>) -> HandleResult {

...

while self.msg_buf.len() < self.messages_per_tick {
match normal.receiver.try_recv() {
Ok(msg) => self.msg_buf.push(msg),
Err(TryRecvError::Empty) => {
handle_result = HandleResult::stop_at(0, false);
break;
}
Err(TryRecvError::Disconnected) => {
normal.delegate.stopped = true;
handle_result = HandleResult::stop_at(0, false);
break;
}
}
}

normal.handle_tasks(&mut self.apply_ctx, &mut self.msg_buf);

if normal.delegate.wait_merge_state.is_some() {
// Check it again immediately as catching up logs can be very fast.
handle_result = HandleResult::stop_at(0, false);
} else if normal.delegate.yield_state.is_some() {
// Let it continue to run next time.
handle_result = HandleResult::KeepProcessing;
}
handle_result
}

fn handle_raft_entry_normal(
&mut self,
apply_ctx: &mut ApplyContext<EK>,
entry: &Entry,
) -> ApplyResult<EK::Snapshot> {
fail_point!(
"yield_apply_first_region",
self.region.get_start_key().is_empty() && !self.region.get_end_key().is_empty(),
|_| ApplyResult::Yield
);

let index = entry.get_index();
let term = entry.get_term();
let data = entry.get_data();

if !data.is_empty() {
let cmd = util::parse_data_at(data, index, &self.tag);

if apply_ctx.yield_high_latency_operation && has_high_latency_operation(&cmd) {
self.priority = Priority::Low;
}
let mut has_unflushed_data =
self.last_flush_applied_index != self.apply_state.get_applied_index();
if has_unflushed_data && should_write_to_engine(&cmd)
|| apply_ctx.kv_wb().should_write_to_engine()
{
apply_ctx.commit(self);
if let Some(start) = self.handle_start.as_ref() {
if start.saturating_elapsed() >= apply_ctx.yield_duration {
return ApplyResult::Yield;
}
}
has_unflushed_data = false;
}
if self.priority != apply_ctx.priority {
if has_unflushed_data {
apply_ctx.commit(self);
}
return ApplyResult::Yield;
}

return self.process_raft_cmd(apply_ctx, index, term, cmd);
}

...
}

那么为什么不像 RaftBatchSystem 一样在 end 函数中统一进行攒批提交呢?原因是此时只要攒够一定的大小不对底层引擎造成过大的负载就可以快速提交并返回客户端了,等到最后再去处理只会增加写入延时而没有太大的收益。

让我们阅读一下提交 batch 的逻辑,其会经由 ApplyContext::commit -> ApplyContext::commit_opt 的调用链来到 ApplyContext::write_to_db 函数,在该函数中,会调用 self.kv_wb_mut().write_opt(&write_opts) 函数将该 WriteBatch 提交到底层引擎,接着在最后调用 cb.invoke_with_response(resp) 来执行 callback 尽快返回客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/// Commits all changes have done for delegate. `persistent` indicates
/// whether write the changes into rocksdb.
///
/// This call is valid only when it's between a `prepare_for` and
/// `finish_for`.
pub fn commit(&mut self, delegate: &mut ApplyDelegate<EK>) {
if delegate.last_flush_applied_index < delegate.apply_state.get_applied_index() {
delegate.write_apply_state(self.kv_wb_mut());
}
self.commit_opt(delegate, true);
}
fn commit_opt(&mut self, delegate: &mut ApplyDelegate<EK>, persistent: bool) {
delegate.update_metrics(self);
if persistent {
self.write_to_db();
self.prepare_for(delegate);
delegate.last_flush_applied_index = delegate.apply_state.get_applied_index()
}
self.kv_wb_last_bytes = self.kv_wb().data_size() as u64;
self.kv_wb_last_keys = self.kv_wb().count() as u64;
}

/// Writes all the changes into RocksDB.
/// If it returns true, all pending writes are persisted in engines.
pub fn write_to_db(&mut self) -> bool {
let need_sync = self.sync_log_hint;
// There may be put and delete requests after ingest request in the same fsm.
// To guarantee the correct order, we must ingest the pending_sst first, and
// then persist the kv write batch to engine.
if !self.pending_ssts.is_empty() {
let tag = self.tag.clone();
self.importer
.ingest(&self.pending_ssts, &self.engine)
.unwrap_or_else(|e| {
panic!(
"{} failed to ingest ssts {:?}: {:?}",
tag, self.pending_ssts, e
);
});
self.pending_ssts = vec![];
}
if !self.kv_wb_mut().is_empty() {
self.perf_context.start_observe();
let mut write_opts = engine_traits::WriteOptions::new();
write_opts.set_sync(need_sync);
self.kv_wb_mut().write_opt(&write_opts).unwrap_or_else(|e| {
panic!("failed to write to engine: {:?}", e);
});
let trackers: Vec<_> = self
.applied_batch
.cb_batch
.iter()
.flat_map(|(cb, _)| cb.write_trackers())
.flat_map(|trackers| trackers.iter().map(|t| t.as_tracker_token()))
.flatten()
.collect();
self.perf_context.report_metrics(&trackers);
self.sync_log_hint = false;
let data_size = self.kv_wb().data_size();
if data_size > APPLY_WB_SHRINK_SIZE {
// Control the memory usage for the WriteBatch.
self.kv_wb = self.engine.write_batch_with_cap(DEFAULT_APPLY_WB_SIZE);
} else {
// Clear data, reuse the WriteBatch, this can reduce memory allocations and
// deallocations.
self.kv_wb_mut().clear();
}
self.kv_wb_last_bytes = 0;
self.kv_wb_last_keys = 0;
}
if !self.delete_ssts.is_empty() {
let tag = self.tag.clone();
for sst in self.delete_ssts.drain(..) {
self.importer.delete(&sst.meta).unwrap_or_else(|e| {
panic!("{} cleanup ingested file {:?}: {:?}", tag, sst, e);
});
}
}
// Take the applied commands and their callback
let ApplyCallbackBatch {
cmd_batch,
batch_max_level,
mut cb_batch,
} = mem::replace(&mut self.applied_batch, ApplyCallbackBatch::new());
// Call it before invoking callback for preventing Commit is executed before
// Prewrite is observed.
self.host
.on_flush_applied_cmd_batch(batch_max_level, cmd_batch, &self.engine);
// Invoke callbacks
let now = std::time::Instant::now();
for (cb, resp) in cb_batch.drain(..) {
for tracker in cb.write_trackers().iter().flat_map(|v| *v) {
tracker.observe(now, &self.apply_time, |t| &mut t.metrics.apply_time_nanos);
}
cb.invoke_with_response(resp);
}
self.apply_time.flush();
self.apply_wait.flush();
need_sync
}

ApplyPoller 一轮 loop 结尾的 end 函数中,其会调用 ApplyContext::flush 函数,进而通过 self.notifier.notify(apply_res) 将 ApplyRes 重新发送到 RaftBatchSystem 中去,进而更新某些内存结构,此处不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
fn end(&mut self, fsms: &mut [Option<impl DerefMut<Target = ApplyFsm<EK>>>]) {
self.apply_ctx.flush();
for fsm in fsms.iter_mut().flatten() {
fsm.delegate.last_flush_applied_index = fsm.delegate.apply_state.get_applied_index();
fsm.delegate.update_memory_trace(&mut self.trace_event);
}
MEMTRACE_APPLYS.trace(mem::take(&mut self.trace_event));
}

/// Flush all pending writes to engines.
/// If it returns true, all pending writes are persisted in engines.
pub fn flush(&mut self) -> bool {
// TODO: this check is too hacky, need to be more verbose and less buggy.
let t = match self.timer.take() {
Some(t) => t,
None => return false,
};

// Write to engine
// raftstore.sync-log = true means we need prevent data loss when power failure.
// take raft log gc for example, we write kv WAL first, then write raft WAL,
// if power failure happen, raft WAL may synced to disk, but kv WAL may not.
// so we use sync-log flag here.
let is_synced = self.write_to_db();

if !self.apply_res.is_empty() {
fail_point!("before_nofity_apply_res");
let apply_res = mem::take(&mut self.apply_res);
self.notifier.notify(apply_res);
}

let elapsed = t.saturating_elapsed();
STORE_APPLY_LOG_HISTOGRAM.observe(duration_to_sec(elapsed) as f64);
for mut inspector in std::mem::take(&mut self.pending_latency_inspect) {
inspector.record_apply_process(elapsed);
inspector.finish();
}

slow_log!(
elapsed,
"{} handle ready {} committed entries",
self.tag,
self.committed_count
);
self.committed_count = 0;
is_synced
}

通过本小节,希望您能够了解 PreWrite 请求的完整流程,并进而具备分析其他写请求全链路的能力。

调试方案

断点调试是一种学习源码的有效手段。当不是很熟悉 gdb 等工具时,使用一些更现代的 IDE 会大幅提升调试代码的体验。

笔者平时采用了 CLion + intellij-rust 的工具链来调试 Rust 代码。需要注意的是,在使用 CLion 调试 TiKV 源码时,需要参照 Cargo book 修改 TiKV cargo.toml 中 [profile.test][profile.dev]debug 选项 来开启调试信息,否则在 Clion 里断点调试时会无法看到对应的堆栈信息。

实际上如果要做到以上读写路径的全链路追踪,最简单的方法便是从集成测试里面寻找一些 case,接着从 Service 模块开始打断点,之后执行调试即可。在这里推荐 integrations/server/kv_service.rs 中的测试,里面的 test 都会构造 TiKVClient 发送真实的 RPC 请求,且服务端也基本不包含 Mock 组件,可以完整的去追踪一条 RPC 的全链路流程。

此外由于 TiKV 的代码中有比较多的 spawn 和回调函数,刚开始可能并不能很直接的串起来流程,但相信通过上文的介绍,您已经大致了解其异步框架的实现,从而可以在正确的闭包位置打下断点,进而熟悉地追踪单条请求的全链路实现。

总结

本篇博客从源码的视角介绍了 TiKV 的全链路读写流程,并在最后推荐了断点调试 TiKV 学习源码的方案。

希望本篇博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。

感谢您的阅读~


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!