Skip to content

Commit

Permalink
Reset proof manager and batch coordinator without inline payload chan…
Browse files Browse the repository at this point in the history
…ges (#12620)
  • Loading branch information
vusirikala committed Mar 21, 2024
1 parent f789b74 commit 4174fd6
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 209 deletions.
18 changes: 1 addition & 17 deletions consensus/src/quorum_store/batch_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,24 @@ use crate::{
quorum_store::{
batch_store::{BatchStore, BatchWriter},
counters,
proof_manager::ProofManagerCommand,
types::{Batch, PersistedValue},
},
};
use anyhow::ensure;
use aptos_logger::prelude::*;
use aptos_types::PeerId;
use std::sync::Arc;
use tokio::sync::{
mpsc::{Receiver, Sender},
oneshot,
};
use tokio::sync::{mpsc::Receiver, oneshot};

#[derive(Debug)]
pub enum BatchCoordinatorCommand {
Shutdown(oneshot::Sender<()>),
NewBatches(PeerId, Vec<Batch>),
}

/// The `BatchCoordinator` is responsible for coordinating the receipt and persistence of batches.
pub struct BatchCoordinator {
my_peer_id: PeerId,
network_sender: Arc<NetworkSender>,
sender_to_proof_manager: Arc<Sender<ProofManagerCommand>>,
batch_store: Arc<BatchStore>,
max_batch_txns: u64,
max_batch_bytes: u64,
Expand All @@ -41,7 +35,6 @@ impl BatchCoordinator {
pub(crate) fn new(
my_peer_id: PeerId,
network_sender: NetworkSender,
sender_to_proof_manager: Sender<ProofManagerCommand>,
batch_store: Arc<BatchStore>,
max_batch_txns: u64,
max_batch_bytes: u64,
Expand All @@ -51,7 +44,6 @@ impl BatchCoordinator {
Self {
my_peer_id,
network_sender: Arc::new(network_sender),
sender_to_proof_manager: Arc::new(sender_to_proof_manager),
batch_store,
max_batch_txns,
max_batch_bytes,
Expand All @@ -67,22 +59,14 @@ impl BatchCoordinator {

let batch_store = self.batch_store.clone();
let network_sender = self.network_sender.clone();
let sender_to_proof_manager = self.sender_to_proof_manager.clone();
tokio::spawn(async move {
let peer_id = persist_requests[0].author();
let batches = persist_requests
.iter()
.map(|persisted_value| persisted_value.batch_info().clone())
.collect();
let signed_batch_infos = batch_store.persist(persist_requests);
if !signed_batch_infos.is_empty() {
network_sender
.send_signed_batch_info_msg(signed_batch_infos, vec![peer_id])
.await;
}
let _ = sender_to_proof_manager
.send(ProofManagerCommand::ReceiveBatches(batches))
.await;
});
}

Expand Down
189 changes: 15 additions & 174 deletions consensus/src/quorum_store/proof_manager.rs
Original file line number Diff line number Diff line change
@@ -1,177 +1,59 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::batch_store::BatchStore;
use crate::{
monitor,
quorum_store::{
batch_generator::BackPressure,
counters,
utils::{BatchSortKey, ProofQueue},
},
quorum_store::{batch_generator::BackPressure, counters, utils::ProofQueue},
};
use aptos_consensus_types::{
common::{Payload, PayloadFilter, ProofWithData},
proof_of_store::{BatchInfo, ProofOfStore, ProofOfStoreMsg},
request_response::{GetPayloadCommand, GetPayloadResponse},
};
use aptos_logger::prelude::*;
use aptos_types::{transaction::SignedTransaction, PeerId};
use aptos_types::PeerId;
use futures::StreamExt;
use futures_channel::mpsc::Receiver;
use rand::{seq::SliceRandom, thread_rng};
use std::{
cmp::min,
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
};
use std::collections::HashSet;

#[derive(Debug)]
pub enum ProofManagerCommand {
ReceiveProofs(ProofOfStoreMsg),
ReceiveBatches(Vec<BatchInfo>),
CommitNotification(u64, Vec<BatchInfo>),
Shutdown(tokio::sync::oneshot::Sender<()>),
}

pub struct BatchQueue {
batch_store: Arc<BatchStore>,
// Queue per peer to ensure fairness between peers and priority within peer
author_to_batches: HashMap<PeerId, BTreeMap<BatchSortKey, BatchInfo>>,
}

impl BatchQueue {
pub fn new(batch_store: Arc<BatchStore>) -> Self {
Self {
batch_store,
author_to_batches: HashMap::new(),
}
}

pub fn add_batches(&mut self, batches: Vec<BatchInfo>) {
for batch in batches.into_iter() {
let queue = self.author_to_batches.entry(batch.author()).or_default();
queue.insert(BatchSortKey::from_info(&batch), batch.clone());
}
}

pub fn remove_batch(&mut self, batch: &BatchInfo) {
if let Some(batch_tree) = self.author_to_batches.get_mut(&batch.author()) {
batch_tree.remove(&BatchSortKey::from_info(batch));
}
}

pub fn remove_expired_batches(&mut self) {
let authors = self.author_to_batches.keys().cloned().collect::<Vec<_>>();
for author in authors {
if let Some(batch_tree) = self.author_to_batches.get_mut(&author) {
batch_tree.retain(|_batch_key, batch| !batch.is_expired());
}
}
}

pub fn len(&self) -> usize {
self.author_to_batches
.values()
.map(|batch_tree| batch_tree.len())
.sum()
}

pub fn pull_batches(
&mut self,
max_txns: u64,
max_bytes: u64,
excluded_batches: Vec<BatchInfo>,
) -> Vec<(BatchInfo, Vec<SignedTransaction>)> {
let mut result: Vec<(BatchInfo, Vec<SignedTransaction>)> = vec![];
let mut num_txns = 0;
let mut num_bytes = 0;
let mut iters = vec![];
let mut full = false;
for (_, batches) in self.author_to_batches.iter() {
iters.push(batches.iter().rev());
}
while !iters.is_empty() {
iters.shuffle(&mut thread_rng());
iters.retain_mut(|iter| {
if full {
return false;
}
if let Some((_sort_key, batch)) = iter.next() {
if excluded_batches.contains(batch) {
true
} else if num_txns + batch.num_txns() <= max_txns
&& num_bytes + batch.num_bytes() <= max_bytes
{
if let Ok(mut persisted_value) =
self.batch_store.get_batch_from_local(batch.digest())
{
if let Some(txns) = persisted_value.take_payload() {
num_txns += batch.num_txns();
num_bytes += batch.num_bytes();
result.push((batch.clone(), txns.clone()));
}
} else {
warn!("Couldn't find a batch in local storage while creating inline block: {:?}", batch.digest());
}
true
} else {
full = true;
false
}
} else {
false
}
})
}
result
}
}

pub struct ProofManager {
proofs_for_consensus: ProofQueue,
batch_queue: BatchQueue,
back_pressure_total_txn_limit: u64,
remaining_total_txn_num: u64,
back_pressure_total_proof_limit: u64,
remaining_total_proof_num: u64,
allow_batches_without_pos_in_proposal: bool,
}

impl ProofManager {
pub fn new(
my_peer_id: PeerId,
back_pressure_total_txn_limit: u64,
back_pressure_total_proof_limit: u64,
batch_store: Arc<BatchStore>,
allow_batches_without_pos_in_proposal: bool,
) -> Self {
Self {
proofs_for_consensus: ProofQueue::new(my_peer_id),
batch_queue: BatchQueue::new(batch_store),
back_pressure_total_txn_limit,
remaining_total_txn_num: 0,
back_pressure_total_proof_limit,
remaining_total_proof_num: 0,
allow_batches_without_pos_in_proposal,
}
}

pub(crate) fn receive_proofs(&mut self, proofs: Vec<ProofOfStore>) {
for proof in proofs.into_iter() {
self.batch_queue.remove_batch(proof.info());
self.proofs_for_consensus.push(proof);
}
(self.remaining_total_txn_num, self.remaining_total_proof_num) =
self.proofs_for_consensus.remaining_txns_and_proofs();
}

pub(crate) fn receive_batches(&mut self, batches: Vec<BatchInfo>) {
if self.allow_batches_without_pos_in_proposal {
self.batch_queue.add_batches(batches);
}
}

pub(crate) fn handle_commit_notification(
&mut self,
block_timestamp: u64,
Expand All @@ -181,10 +63,7 @@ impl ProofManager {
"QS: got clean request from execution at block timestamp {}",
block_timestamp
);
self.batch_queue.remove_expired_batches();
for batch in &batches {
self.batch_queue.remove_batch(batch);
}

self.proofs_for_consensus.mark_committed(batches);
self.proofs_for_consensus
.handle_updated_block_timestamp(block_timestamp);
Expand All @@ -197,8 +76,8 @@ impl ProofManager {
GetPayloadCommand::GetPayloadRequest(
max_txns,
max_bytes,
max_inline_txns,
max_inline_bytes,
_max_inline_txns,
_max_inline_bytes,
return_non_full,
filter,
callback,
Expand All @@ -211,58 +90,23 @@ impl ProofManager {
PayloadFilter::InQuorumStore(proofs) => proofs,
};

let (proof_block, proof_queue_fully_utilized) = self
.proofs_for_consensus
.pull_proofs(&excluded_batches, max_txns, max_bytes, return_non_full);

counters::NUM_BATCHES_WITHOUT_PROOF_OF_STORE.observe(self.batch_queue.len() as f64);
counters::PROOF_QUEUE_FULLY_UTILIZED
.observe(if proof_queue_fully_utilized { 1.0 } else { 0.0 });

let mut inline_block: Vec<(BatchInfo, Vec<SignedTransaction>)> = vec![];
let cur_txns: u64 = proof_block.iter().map(|p| p.num_txns()).sum();
let cur_bytes: u64 = proof_block.iter().map(|p| p.num_bytes()).sum();

if self.allow_batches_without_pos_in_proposal && proof_queue_fully_utilized {
inline_block = self.batch_queue.pull_batches(
min(max_txns - cur_txns, max_inline_txns),
min(max_bytes - cur_bytes, max_inline_bytes),
excluded_batches
.iter()
.cloned()
.chain(proof_block.iter().map(|proof| proof.info().clone()))
.collect(),
);
}
let inline_txns = inline_block
.iter()
.map(|(_, txns)| txns.len())
.sum::<usize>();
counters::NUM_INLINE_BATCHES.observe(inline_block.len() as f64);
counters::NUM_INLINE_TXNS.observe(inline_txns as f64);
let proof_block = self.proofs_for_consensus.pull_proofs(
&excluded_batches,
max_txns,
max_bytes,
return_non_full,
);

let res = GetPayloadResponse::GetPayloadResponse(
if proof_block.is_empty() && inline_block.is_empty() {
Payload::empty(true, self.allow_batches_without_pos_in_proposal)
} else if inline_block.is_empty() {
if proof_block.is_empty() {
Payload::empty(true, false)
} else {
trace!(
"QS: GetBlockRequest excluded len {}, block len {}",
excluded_batches.len(),
proof_block.len()
);
Payload::InQuorumStore(ProofWithData::new(proof_block))
} else {
trace!(
"QS: GetBlockRequest excluded len {}, block len {}, inline len {}",
excluded_batches.len(),
proof_block.len(),
inline_block.len()
);
Payload::QuorumStoreInlineHybrid(
inline_block,
ProofWithData::new(proof_block),
None,
)
},
);
match callback.send(Ok(res)) {
Expand Down Expand Up @@ -319,9 +163,6 @@ impl ProofManager {
ProofManagerCommand::ReceiveProofs(proofs) => {
self.receive_proofs(proofs.take());
},
ProofManagerCommand::ReceiveBatches(batches) => {
self.receive_batches(batches);
}
ProofManagerCommand::CommitNotification(block_timestamp, batches) => {
self.handle_commit_notification(
block_timestamp,
Expand Down
3 changes: 0 additions & 3 deletions consensus/src/quorum_store/quorum_store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ impl InnerBuilder {
let batch_coordinator = BatchCoordinator::new(
self.author,
self.network_sender.clone(),
self.proof_manager_cmd_tx.clone(),
self.batch_store.clone().unwrap(),
self.config.receiver_max_batch_txns as u64,
self.config.receiver_max_batch_bytes as u64,
Expand Down Expand Up @@ -351,8 +350,6 @@ impl InnerBuilder {
.back_pressure
.backlog_per_validator_batch_limit_count
* self.num_validators,
self.batch_store.clone().unwrap(),
self.config.allow_batches_without_pos_in_proposal,
);
spawn_named!(
"proof_manager",
Expand Down
7 changes: 2 additions & 5 deletions consensus/src/quorum_store/tests/proof_manager_test.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::quorum_store::{
proof_manager::ProofManager, tests::batch_store_test::batch_store_for_test,
};
use crate::quorum_store::proof_manager::ProofManager;
use aptos_consensus_types::{
common::{Payload, PayloadFilter},
proof_of_store::{BatchId, BatchInfo, ProofOfStore},
Expand All @@ -15,8 +13,7 @@ use futures::channel::oneshot;
use std::collections::HashSet;

fn create_proof_manager() -> ProofManager {
let batch_store = batch_store_for_test(5 * 1024 * 1024);
ProofManager::new(PeerId::random(), 10, 10, batch_store, true)
ProofManager::new(PeerId::random(), 10, 10)
}

fn create_proof(author: PeerId, expiration: u64, batch_sequence: u64) -> ProofOfStore {
Expand Down

0 comments on commit 4174fd6

Please sign in to comment.