use futures::lock::Mutex;
use rand::prelude::*;
use rayon::prelude::*;
use sc_client_api::backend::AuxStore;
use sc_consensus::block_import::BlockImportParams;
use sc_consensus::import_queue::Verifier;
use sc_consensus_slots::check_equivocation;
use sc_proof_of_time::verifier::PotVerifier;
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_TRACE};
use schnorrkel::context::SigningContext;
use sp_api::ProvideRuntimeApi;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::HeaderBackend;
use sp_consensus::BlockOrigin;
use sp_consensus_slots::Slot;
use sp_consensus_subspace::digests::{
extract_subspace_digest_items, CompatibleDigestItem, PreDigest, SubspaceDigestItems,
};
use sp_consensus_subspace::{ChainConstants, PotNextSlotInput, SubspaceApi, SubspaceJustification};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use sp_runtime::{DigestItem, Justifications};
use std::iter;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::thread::available_parallelism;
use subspace_core_primitives::{BlockNumber, PublicKey};
use subspace_kzg::Kzg;
use subspace_proof_of_space::Table;
use subspace_verification::{check_reward_signature, verify_solution, VerifySolutionParams};
use tokio::runtime::Handle;
use tracing::{debug, info, trace, warn};
#[derive(Debug, Eq, PartialEq, thiserror::Error)]
pub enum VerificationError<Header: HeaderT> {
#[error("Header {0:?} has a bad seal")]
HeaderBadSeal(Header::Hash),
#[error("Header {0:?} is unsealed")]
HeaderUnsealed(Header::Hash),
#[error("Bad reward signature on {0:?}")]
BadRewardSignature(Header::Hash),
#[error("Missing Subspace justification")]
MissingSubspaceJustification,
#[error("Invalid Subspace justification: {0}")]
InvalidSubspaceJustification(codec::Error),
#[error("Invalid Subspace justification contents")]
InvalidSubspaceJustificationContents,
#[error("Invalid proof of time")]
InvalidProofOfTime,
#[error("Verification error on slot {0:?}: {1:?}")]
VerificationError(Slot, subspace_verification::Error),
}
struct CheckedHeader<H> {
pre_header: H,
pre_digest: PreDigest<PublicKey>,
seal: DigestItem,
}
struct VerificationParams<'a, Header>
where
Header: HeaderT + 'a,
{
header: Header,
verify_solution_params: &'a VerifySolutionParams,
}
pub struct SubspaceVerifierOptions<Client> {
pub client: Arc<Client>,
pub chain_constants: ChainConstants,
pub kzg: Kzg,
pub telemetry: Option<TelemetryHandle>,
pub reward_signing_context: SigningContext,
pub sync_target_block_number: Arc<AtomicU32>,
pub is_authoring_blocks: bool,
pub pot_verifier: PotVerifier,
}
struct Inner<PosTable, Block, Client>
where
Block: BlockT,
{
client: Arc<Client>,
kzg: Kzg,
telemetry: Option<TelemetryHandle>,
chain_constants: ChainConstants,
reward_signing_context: SigningContext,
sync_target_block_number: Arc<AtomicU32>,
is_authoring_blocks: bool,
pot_verifier: PotVerifier,
equivocation_mutex: Mutex<()>,
_pos_table: PhantomData<PosTable>,
_block: PhantomData<Block>,
}
impl<PosTable, Block, Client> Inner<PosTable, Block, Client>
where
PosTable: Table,
Block: BlockT,
BlockNumber: From<NumberFor<Block>>,
Client: HeaderBackend<Block> + ProvideRuntimeApi<Block> + AuxStore + 'static,
Client::Api: BlockBuilderApi<Block> + SubspaceApi<Block, PublicKey>,
{
fn new(options: SubspaceVerifierOptions<Client>) -> Self {
let SubspaceVerifierOptions {
client,
chain_constants,
kzg,
telemetry,
reward_signing_context,
sync_target_block_number,
is_authoring_blocks,
pot_verifier,
} = options;
Self {
client,
kzg,
telemetry,
chain_constants,
reward_signing_context,
sync_target_block_number,
is_authoring_blocks,
pot_verifier,
equivocation_mutex: Mutex::default(),
_pos_table: Default::default(),
_block: Default::default(),
}
}
fn full_pot_verification(&self, block_number: NumberFor<Block>) -> bool {
let sync_target_block_number: BlockNumber =
self.sync_target_block_number.load(Ordering::Relaxed);
let Some(diff) = sync_target_block_number.checked_sub(BlockNumber::from(block_number))
else {
return true;
};
let sample_size = match diff {
..=1_581 => {
return true;
}
1_582..=6_234 => 1_581,
6_235..=63_240 => 3_162 * (diff - 3_162) / (diff - 1),
63_241..=3_162_000 => 3_162,
_ => diff / 1_000,
};
let n = thread_rng().gen_range(0..=diff);
n < sample_size
}
fn check_header(
&self,
params: VerificationParams<'_, Block::Header>,
subspace_digest_items: SubspaceDigestItems<PublicKey>,
full_pot_verification: bool,
justifications: &Option<Justifications>,
) -> Result<CheckedHeader<Block::Header>, VerificationError<Block::Header>> {
let VerificationParams {
mut header,
verify_solution_params,
} = params;
let pre_digest = subspace_digest_items.pre_digest;
let slot = pre_digest.slot();
let seal = header
.digest_mut()
.pop()
.ok_or_else(|| VerificationError::HeaderUnsealed(header.hash()))?;
let signature = seal
.as_subspace_seal()
.ok_or_else(|| VerificationError::HeaderBadSeal(header.hash()))?;
let pre_hash = header.hash();
{
let Some(subspace_justification) = justifications
.as_ref()
.and_then(|justifications| {
justifications
.iter()
.find_map(SubspaceJustification::try_from_justification)
})
.transpose()
.map_err(VerificationError::InvalidSubspaceJustification)?
else {
return Err(VerificationError::MissingSubspaceJustification);
};
let SubspaceJustification::PotCheckpoints { seed, checkpoints } =
subspace_justification;
if checkpoints.last().map(|checkpoints| checkpoints.output())
!= Some(pre_digest.pot_info().future_proof_of_time())
{
return Err(VerificationError::InvalidSubspaceJustificationContents);
}
let future_slot = slot + self.chain_constants.block_authoring_delay();
let first_slot_to_check = Slot::from(
future_slot
.checked_sub(checkpoints.len() as u64 - 1)
.ok_or(VerificationError::InvalidProofOfTime)?,
);
let slot_iterations = subspace_digest_items
.pot_parameters_change
.as_ref()
.and_then(|parameters_change| {
(parameters_change.slot <= first_slot_to_check)
.then_some(parameters_change.slot_iterations)
})
.unwrap_or(subspace_digest_items.pot_slot_iterations);
let mut pot_input = PotNextSlotInput {
slot: first_slot_to_check,
slot_iterations,
seed,
};
let checkpoints_verification_input = iter::once((
pot_input,
*checkpoints
.first()
.expect("Not empty, contents was checked above; qed"),
));
let checkpoints_verification_input = checkpoints_verification_input
.chain(checkpoints.windows(2).map(|checkpoints_pair| {
pot_input = PotNextSlotInput::derive(
pot_input.slot_iterations,
pot_input.slot,
checkpoints_pair[0].output(),
&subspace_digest_items.pot_parameters_change,
);
(pot_input, checkpoints_pair[1])
}))
.collect::<Vec<_>>();
let pot_verifier = &self.pot_verifier;
checkpoints_verification_input
.into_par_iter()
.find_map_first(|(pot_input, checkpoints)| {
if full_pot_verification {
if !pot_verifier.verify_checkpoints(
pot_input.seed,
pot_input.slot_iterations,
&checkpoints,
) {
return Some(VerificationError::InvalidProofOfTime);
}
} else {
pot_verifier.inject_verified_checkpoints(
pot_input.seed,
pot_input.slot_iterations,
checkpoints,
);
}
None
})
.map_or(Ok(()), Err)?;
}
if check_reward_signature(
pre_hash.as_ref(),
&signature,
&pre_digest.solution().public_key,
&self.reward_signing_context,
)
.is_err()
{
return Err(VerificationError::BadRewardSignature(pre_hash));
}
verify_solution::<PosTable, _>(
pre_digest.solution(),
slot.into(),
verify_solution_params,
&self.kzg,
)
.map_err(|error| VerificationError::VerificationError(slot, error))?;
Ok(CheckedHeader {
pre_header: header,
pre_digest,
seal,
})
}
async fn check_and_report_equivocation(
&self,
slot_now: Slot,
slot: Slot,
header: &Block::Header,
author: &PublicKey,
origin: &BlockOrigin,
) -> Result<(), String> {
if *origin == BlockOrigin::NetworkInitialSync {
return Ok(());
}
let _guard = self.equivocation_mutex.lock().await;
let equivocation_proof =
match check_equivocation(&*self.client, slot_now, slot, header, author)
.map_err(|error| error.to_string())?
{
Some(proof) => proof,
None => return Ok(()),
};
info!(
"Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
author,
slot,
equivocation_proof.first_header.hash(),
equivocation_proof.second_header.hash(),
);
if self.is_authoring_blocks {
} else {
info!("Not submitting equivocation report because node is not authoring blocks");
}
Ok(())
}
async fn verify(
&self,
mut block: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
trace!(
origin = ?block.origin,
header = ?block.header,
justifications = ?block.justifications,
body = ?block.body,
"Verifying",
);
let best_number = self.client.info().best_number;
if *block.header.number() + self.chain_constants.confirmation_depth_k().into() < best_number
&& matches!(block.origin, BlockOrigin::NetworkBroadcast)
{
debug!(
header = ?block.header,
%best_number,
"Rejecting block below archiving point"
);
return Err(format!(
"Rejecting block #{} below archiving point",
block.header.number()
));
}
let hash = block.header.hash();
debug!(
"We have {:?} logs in this header",
block.header.digest().logs().len()
);
let subspace_digest_items =
extract_subspace_digest_items::<Block::Header, PublicKey>(&block.header)?;
let full_pot_verification = self.full_pot_verification(*block.header.number());
let checked_header = self
.check_header(
VerificationParams {
header: block.header.clone(),
verify_solution_params: &VerifySolutionParams {
proof_of_time: subspace_digest_items.pre_digest.pot_info().proof_of_time(),
solution_range: subspace_digest_items.solution_range,
piece_check_params: None,
},
},
subspace_digest_items,
full_pot_verification,
&block.justifications,
)
.map_err(|error| error.to_string())?;
let CheckedHeader {
pre_header,
pre_digest,
seal,
} = checked_header;
let slot = pre_digest.slot();
let diff_in_blocks = self
.sync_target_block_number
.load(Ordering::Relaxed)
.saturating_sub(BlockNumber::from(*pre_header.number()));
let slot_now = if diff_in_blocks > 0 {
slot + Slot::from(
u64::from(diff_in_blocks) * self.chain_constants.slot_probability().1
/ self.chain_constants.slot_probability().0,
)
} else {
slot
};
if let Err(error) = self
.check_and_report_equivocation(
slot_now,
slot,
&block.header,
&pre_digest.solution().public_key,
&block.origin,
)
.await
{
warn!(
%error,
"Error checking/reporting Subspace equivocation"
);
}
trace!(?pre_header, "Checked header; importing");
telemetry!(
self.telemetry;
CONSENSUS_TRACE;
"subspace.checked_and_importing";
"pre_header" => ?pre_header,
);
block.header = pre_header;
block.post_digests.push(seal);
block.post_hash = Some(hash);
Ok(block)
}
}
pub struct SubspaceVerifier<PosTable, Block, Client>
where
Block: BlockT,
{
inner: Arc<Inner<PosTable, Block, Client>>,
}
impl<PosTable, Block, Client> SubspaceVerifier<PosTable, Block, Client>
where
PosTable: Table,
Block: BlockT,
BlockNumber: From<NumberFor<Block>>,
Client: HeaderBackend<Block> + ProvideRuntimeApi<Block> + AuxStore + 'static,
Client::Api: BlockBuilderApi<Block> + SubspaceApi<Block, PublicKey>,
{
pub fn new(options: SubspaceVerifierOptions<Client>) -> Self {
Self {
inner: Arc::new(Inner::new(options)),
}
}
}
#[async_trait::async_trait]
impl<PosTable, Block, Client> Verifier<Block> for SubspaceVerifier<PosTable, Block, Client>
where
PosTable: Table,
Block: BlockT,
BlockNumber: From<NumberFor<Block>>,
Client: HeaderBackend<Block> + ProvideRuntimeApi<Block> + AuxStore + 'static,
Client::Api: BlockBuilderApi<Block> + SubspaceApi<Block, PublicKey>,
{
fn verification_concurrency(&self) -> NonZeroUsize {
available_parallelism().unwrap_or(NonZeroUsize::new(1).expect("Not zero; qed"))
}
async fn verify(
&self,
block: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
let inner = Arc::clone(&self.inner);
tokio::task::spawn_blocking(move || Handle::current().block_on(inner.verify(block)))
.await
.map_err(|error| format!("Failed to join block verification task: {error}"))?
}
}