// The entry point of the storage scheduler. Not only transaction commands need // to access keys serially. pubfnsched_txn_command<T: StorageCallbackType>( &self, cmd: TypedCommand<T>, callback: Callback<T>, ) ->Result<()> {
/// Executes the task in the sched pool. fnexecute(&self, mut task: Task) { set_tls_tracker_token(task.tracker); letsched = self.clone(); self.get_sched_pool(task.cmd.priority()) .pool .spawn(asyncmove {
...
// The program is currently in scheduler worker threads. // Safety: `self.inner.worker_pool` should ensure that a TLS engine exists. matchunsafe { with_tls_engine(|engine: &E| kv::snapshot(engine, snap_ctx)) }.await { Ok(snapshot) => {
/// Process the task in the current thread. asyncfnprocess(self, snapshot: E::Snap, task: Task) { ifself.check_task_deadline_exceeded(&task) { return; }
match res { Err(e) => { cmd_resp::bind_error(&mut err_resp, e); cb.invoke_with_response(err_resp); self.post_propose_fail(req_admin_cmd_type); false } Ok(Either::Right(idx)) => { if !cb.is_none() { self.cmd_epoch_checker.attach_to_conflict_cmd(idx, cb); } self.post_propose_fail(req_admin_cmd_type); false } Ok(Either::Left(idx)) => { lethas_applied_to_current_term = self.has_applied_to_current_term(); if has_applied_to_current_term { // After this peer has applied to current term and passed above checking // including `cmd_epoch_checker`, we can safely guarantee // that this proposal will be committed if there is no abnormal leader transfer // in the near future. Thus proposed callback can be called. cb.invoke_proposed(); } if is_urgent { self.last_urgent_proposal_idx = idx; // Eager flush to make urgent proposal be applied on all nodes as soon as // possible. self.raft_group.skip_bcast_commit(false); } self.should_wake_up = true; letp = Proposal { is_conf_change: req_admin_cmd_type == Some(AdminCmdType::ChangePeer) || req_admin_cmd_type == Some(AdminCmdType::ChangePeerV2), index: idx, term: self.term(), cb, propose_time: None, must_pass_epoch_check: has_applied_to_current_term, }; ifletSome(cmd_type) = req_admin_cmd_type { self.cmd_epoch_checker .post_propose(cmd_type, idx, self.term()); } self.post_propose(ctx, p); true } } }
if !self.raft_group.has_ready() { fail_point!("before_no_ready_gen_snap_task", |_| None); // Generating snapshot task won't set ready for raft group. ifletSome(gen_task) = self.mut_store().take_gen_snap_task() { self.pending_request_snapshot_count .fetch_add(1, Ordering::SeqCst); ctx.apply_router .schedule_task(self.region_id, ApplyTask::Snapshot(gen_task)); } returnNone; }
...
letmut ready = self.raft_group.ready();
...
if !ready.must_sync() { // If this ready need not to sync, the term, vote must not be changed, // entries and snapshot must be empty. ifletSome(hs) = ready.hs() { assert_eq!(hs.get_term(), self.get_store().hard_state().get_term()); assert_eq!(hs.get_vote(), self.get_store().hard_state().get_vote()); } assert!(ready.entries().is_empty()); assert!(ready.snapshot().is_empty()); }
self.raft_group.advance_append_async(ready); } } HandleReadyResult::NoIoTask => { ifletSome(last) = self.unpersisted_readies.back_mut() { // Attach to the last unpersisted ready so that it can be considered to be // persisted with the last ready at the same time. if ready_number <= last.max_empty_number { panic!( "{} ready number is not monotonically increaing, {} <= {}", self.tag, ready_number, last.max_empty_number ); } last.max_empty_number = ready_number;
if !persisted_msgs.is_empty() { self.unpersisted_message_count += persisted_msgs.capacity(); last.raft_msgs.push(persisted_msgs); } } else { // If this ready don't need to be persisted and there is no previous unpersisted // ready, we can safely consider it is persisted so the persisted msgs can be // sent immediately. self.persisted_number = ready_number;
// The commit index and messages of light ready should be empty because no data // needs to be persisted. letmut light_rd = self.raft_group.advance_append(ready);
ifletSome(idx) = light_rd.commit_index() { panic!( "{} advance ready that has no io task but commit index is changed to {}", self.tag, idx ); } if !light_rd.messages().is_empty() { panic!( "{} advance ready that has no io task but message is not empty {:?}", self.tag, light_rd.messages() ); } // The committed entries may not be empty when the size is too large to // be fetched in the previous ready. if !light_rd.committed_entries().is_empty() { self.handle_raft_committed_entries(ctx, light_rd.take_committed_entries()); } } } }
fnhandle_raft_committed_entries<T>( &mutself, ctx: &mut PollContext<EK, ER, T>, committed_entries: Vec<Entry>, ) { if committed_entries.is_empty() { return; }
...
ifletSome(last_entry) = committed_entries.last() { self.last_applying_idx = last_entry.get_index(); ifself.last_applying_idx >= self.last_urgent_proposal_idx { // Urgent requests are flushed, make it lazy again. self.raft_group.skip_bcast_commit(true); self.last_urgent_proposal_idx = u64::MAX; } letcbs = if !self.proposals.is_empty() { letcurrent_term = self.term(); letcbs = committed_entries .iter() .filter_map(|e| { self.proposals .find_proposal(e.get_term(), e.get_index(), current_term) }) .map(|mut p| { if p.must_pass_epoch_check { // In this case the apply can be guaranteed to be successful. Invoke the // on_committed callback if necessary. p.cb.invoke_committed(); } p }) .collect(); self.proposals.gc(); cbs } else { vec![] }; // Note that the `commit_index` and `commit_term` here may be used to // forward the commit index. So it must be less than or equal to persist // index. letcommit_index = cmp::min( self.raft_group.raft.raft_log.committed, self.raft_group.raft.raft_log.persisted, ); letcommit_term = self.get_store().term(commit_index).unwrap(); letmut apply = Apply::new( self.peer_id(), self.region_id, self.term(), commit_index, commit_term, committed_entries, cbs, self.region_buckets.as_ref().map(|b| b.meta.clone()), ); apply.on_schedule(&ctx.raft_metrics); self.mut_store() .trace_cached_entries(apply.entries[0].clone()); ifneeds_evict_entry_cache(ctx.cfg.evict_cache_on_memory_ratio) { // Compact all cached entries instead of half evict. self.mut_store().evict_entry_cache(false); } ctx.apply_router .schedule_task(self.region_id, ApplyTask::apply(apply)); } fail_point!("after_send_to_apply_1003", self.peer_id() == 1003, |_| {}); }
if normal.delegate.wait_merge_state.is_some() { // Check it again immediately as catching up logs can be very fast. handle_result = HandleResult::stop_at(0, false); } elseif normal.delegate.yield_state.is_some() { // Let it continue to run next time. handle_result = HandleResult::KeepProcessing; } handle_result }
/// Commits all changes have done for delegate. `persistent` indicates /// whether write the changes into rocksdb. /// /// This call is valid only when it's between a `prepare_for` and /// `finish_for`. pubfncommit(&mutself, delegate: &mut ApplyDelegate<EK>) { if delegate.last_flush_applied_index < delegate.apply_state.get_applied_index() { delegate.write_apply_state(self.kv_wb_mut()); } self.commit_opt(delegate, true); } fncommit_opt(&mutself, delegate: &mut ApplyDelegate<EK>, persistent: bool) { delegate.update_metrics(self); if persistent { self.write_to_db(); self.prepare_for(delegate); delegate.last_flush_applied_index = delegate.apply_state.get_applied_index() } self.kv_wb_last_bytes = self.kv_wb().data_size() asu64; self.kv_wb_last_keys = self.kv_wb().count() asu64; }
/// Writes all the changes into RocksDB. /// If it returns true, all pending writes are persisted in engines. pubfnwrite_to_db(&mutself) ->bool { letneed_sync = self.sync_log_hint; // There may be put and delete requests after ingest request in the same fsm. // To guarantee the correct order, we must ingest the pending_sst first, and // then persist the kv write batch to engine. if !self.pending_ssts.is_empty() { lettag = self.tag.clone(); self.importer .ingest(&self.pending_ssts, &self.engine) .unwrap_or_else(|e| { panic!( "{} failed to ingest ssts {:?}: {:?}", tag, self.pending_ssts, e ); }); self.pending_ssts = vec![]; } if !self.kv_wb_mut().is_empty() { self.perf_context.start_observe(); letmut write_opts = engine_traits::WriteOptions::new(); write_opts.set_sync(need_sync); self.kv_wb_mut().write_opt(&write_opts).unwrap_or_else(|e| { panic!("failed to write to engine: {:?}", e); }); lettrackers: Vec<_> = self .applied_batch .cb_batch .iter() .flat_map(|(cb, _)| cb.write_trackers()) .flat_map(|trackers| trackers.iter().map(|t| t.as_tracker_token())) .flatten() .collect(); self.perf_context.report_metrics(&trackers); self.sync_log_hint = false; letdata_size = self.kv_wb().data_size(); if data_size > APPLY_WB_SHRINK_SIZE { // Control the memory usage for the WriteBatch. self.kv_wb = self.engine.write_batch_with_cap(DEFAULT_APPLY_WB_SIZE); } else { // Clear data, reuse the WriteBatch, this can reduce memory allocations and // deallocations. self.kv_wb_mut().clear(); } self.kv_wb_last_bytes = 0; self.kv_wb_last_keys = 0; } if !self.delete_ssts.is_empty() { lettag = self.tag.clone(); forsstinself.delete_ssts.drain(..) { self.importer.delete(&sst.meta).unwrap_or_else(|e| { panic!("{} cleanup ingested file {:?}: {:?}", tag, sst, e); }); } } // Take the applied commands and their callback letApplyCallbackBatch { cmd_batch, batch_max_level, mut cb_batch, } = mem::replace(&mutself.applied_batch, ApplyCallbackBatch::new()); // Call it before invoking callback for preventing Commit is executed before // Prewrite is observed. self.host .on_flush_applied_cmd_batch(batch_max_level, cmd_batch, &self.engine); // Invoke callbacks letnow = std::time::Instant::now(); for (cb, resp) in cb_batch.drain(..) { fortrackerin cb.write_trackers().iter().flat_map(|v| *v) { tracker.observe(now, &self.apply_time, |t| &mut t.metrics.apply_time_nanos); } cb.invoke_with_response(resp); } self.apply_time.flush(); self.apply_wait.flush(); need_sync }
/// Flush all pending writes to engines. /// If it returns true, all pending writes are persisted in engines. pubfnflush(&mutself) ->bool { // TODO: this check is too hacky, need to be more verbose and less buggy. lett = matchself.timer.take() { Some(t) => t, None => returnfalse, };
// Write to engine // raftstore.sync-log = true means we need prevent data loss when power failure. // take raft log gc for example, we write kv WAL first, then write raft WAL, // if power failure happen, raft WAL may synced to disk, but kv WAL may not. // so we use sync-log flag here. letis_synced = self.write_to_db();
if !self.apply_res.is_empty() { fail_point!("before_nofity_apply_res"); letapply_res = mem::take(&mutself.apply_res); self.notifier.notify(apply_res); }