use crate::archiver::SegmentHeadersStore;
use crate::SubspaceLink;
use futures::channel::mpsc;
use futures::{StreamExt, TryFutureExt};
use sc_client_api::AuxStore;
use sc_consensus::block_import::{BlockImportParams, StateAction};
use sc_consensus::{BoxBlockImport, JustificationSyncLink, StorageChanges};
use sc_consensus_slots::{
BackoffAuthoringBlocksStrategy, SimpleSlotWorker, SlotInfo, SlotLenienceType, SlotProportion,
};
use sc_proof_of_time::verifier::PotVerifier;
use sc_proof_of_time::PotSlotWorker;
use sc_telemetry::TelemetryHandle;
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use schnorrkel::context::SigningContext;
use sp_api::{ApiError, ApiExt, ProvideRuntimeApi};
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
use sp_consensus::{BlockOrigin, Environment, Error as ConsensusError, Proposer, SyncOracle};
use sp_consensus_slots::Slot;
use sp_consensus_subspace::digests::{
extract_pre_digest, CompatibleDigestItem, PreDigest, PreDigestPotInfo,
};
use sp_consensus_subspace::{
PotNextSlotInput, SignedVote, SubspaceApi, SubspaceJustification, Vote,
};
use sp_core::H256;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One, Saturating, Zero};
use sp_runtime::{DigestItem, Justification, Justifications};
use std::collections::BTreeMap;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use subspace_core_primitives::pot::{PotCheckpoints, PotOutput};
use subspace_core_primitives::sectors::SectorId;
use subspace_core_primitives::solutions::{RewardSignature, Solution, SolutionRange};
use subspace_core_primitives::{BlockNumber, PublicKey, REWARD_SIGNING_CONTEXT};
use subspace_proof_of_space::Table;
use subspace_verification::{
check_reward_signature, verify_solution, PieceCheckParams, VerifySolutionParams,
};
use tracing::{debug, error, info, warn};
const PENDING_SOLUTIONS_CHANNEL_CAPACITY: usize = 10;
#[derive(Debug, Clone)]
pub struct SubspaceSyncOracle<SO>
where
SO: SyncOracle + Send + Sync,
{
force_authoring: bool,
pause_sync: Arc<AtomicBool>,
inner: SO,
}
impl<SO> SyncOracle for SubspaceSyncOracle<SO>
where
SO: SyncOracle + Send + Sync,
{
fn is_major_syncing(&self) -> bool {
(!self.force_authoring && self.inner.is_major_syncing())
|| self.pause_sync.load(Ordering::Acquire)
}
fn is_offline(&self) -> bool {
self.inner.is_offline()
}
}
impl<SO> SubspaceSyncOracle<SO>
where
SO: SyncOracle + Send + Sync,
{
pub fn new(
force_authoring: bool,
pause_sync: Arc<AtomicBool>,
substrate_sync_oracle: SO,
) -> Self {
Self {
force_authoring,
pause_sync,
inner: substrate_sync_oracle,
}
}
}
#[derive(Debug, Copy, Clone)]
pub struct NewSlotInfo {
pub slot: Slot,
pub proof_of_time: PotOutput,
pub solution_range: SolutionRange,
pub voting_solution_range: SolutionRange,
}
#[derive(Debug, Clone)]
pub struct NewSlotNotification {
pub new_slot_info: NewSlotInfo,
pub solution_sender: mpsc::Sender<Solution<PublicKey>>,
}
#[derive(Debug, Clone)]
pub struct RewardSigningNotification {
pub hash: H256,
pub public_key: PublicKey,
pub signature_sender: TracingUnboundedSender<RewardSignature>,
}
pub struct SubspaceSlotWorkerOptions<Block, Client, E, SO, L, BS, AS>
where
Block: BlockT,
SO: SyncOracle + Send + Sync,
{
pub client: Arc<Client>,
pub env: E,
pub block_import: BoxBlockImport<Block>,
pub sync_oracle: SubspaceSyncOracle<SO>,
pub justification_sync_link: L,
pub force_authoring: bool,
pub backoff_authoring_blocks: Option<BS>,
pub subspace_link: SubspaceLink<Block>,
pub segment_headers_store: SegmentHeadersStore<AS>,
pub block_proposal_slot_portion: SlotProportion,
pub max_block_proposal_slot_portion: Option<SlotProportion>,
pub telemetry: Option<TelemetryHandle>,
pub offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
pub pot_verifier: PotVerifier,
}
pub struct SubspaceSlotWorker<PosTable, Block, Client, E, SO, L, BS, AS>
where
Block: BlockT,
SO: SyncOracle + Send + Sync,
{
client: Arc<Client>,
block_import: BoxBlockImport<Block>,
env: E,
sync_oracle: SubspaceSyncOracle<SO>,
justification_sync_link: L,
force_authoring: bool,
backoff_authoring_blocks: Option<BS>,
subspace_link: SubspaceLink<Block>,
reward_signing_context: SigningContext,
block_proposal_slot_portion: SlotProportion,
max_block_proposal_slot_portion: Option<SlotProportion>,
telemetry: Option<TelemetryHandle>,
offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
segment_headers_store: SegmentHeadersStore<AS>,
pending_solutions: BTreeMap<Slot, mpsc::Receiver<Solution<PublicKey>>>,
pot_checkpoints: BTreeMap<Slot, PotCheckpoints>,
pot_verifier: PotVerifier,
_pos_table: PhantomData<PosTable>,
}
impl<PosTable, Block, Client, E, SO, L, BS, AS> PotSlotWorker<Block>
for SubspaceSlotWorker<PosTable, Block, Client, E, SO, L, BS, AS>
where
Block: BlockT,
Client: HeaderBackend<Block> + ProvideRuntimeApi<Block>,
Client::Api: SubspaceApi<Block, PublicKey>,
SO: SyncOracle + Send + Sync,
{
fn on_proof(&mut self, slot: Slot, checkpoints: PotCheckpoints) {
self.pot_checkpoints
.retain(|&stored_slot, _checkpoints| stored_slot < slot);
self.pot_checkpoints.insert(slot, checkpoints);
if self.sync_oracle.is_major_syncing() {
debug!("Skipping farming slot {slot} due to sync");
return;
}
let maybe_root_plot_public_key = self
.client
.runtime_api()
.root_plot_public_key(self.client.info().best_hash)
.ok()
.flatten();
if maybe_root_plot_public_key.is_some() && !self.force_authoring {
debug!(
"Skipping farming slot {slot} due to root public key present and force authoring \
not enabled"
);
return;
}
let proof_of_time = checkpoints.output();
let best_hash = self.client.info().best_hash;
let (solution_range, voting_solution_range) =
match extract_solution_ranges_for_block(self.client.as_ref(), best_hash) {
Ok(solution_ranges) => solution_ranges,
Err(error) => {
warn!(
%slot,
%best_hash,
%error,
"Failed to extract solution ranges for block"
);
return;
}
};
let new_slot_info = NewSlotInfo {
slot,
proof_of_time,
solution_range,
voting_solution_range,
};
let (solution_sender, solution_receiver) =
mpsc::channel(PENDING_SOLUTIONS_CHANNEL_CAPACITY);
self.subspace_link
.new_slot_notification_sender
.notify(|| NewSlotNotification {
new_slot_info,
solution_sender,
});
self.pending_solutions.insert(slot, solution_receiver);
}
}
#[async_trait::async_trait]
impl<PosTable, Block, Client, E, Error, SO, L, BS, AS> SimpleSlotWorker<Block>
for SubspaceSlotWorker<PosTable, Block, Client, E, SO, L, BS, AS>
where
PosTable: Table,
Block: BlockT,
Client: ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = ClientError>
+ AuxStore
+ 'static,
Client::Api: SubspaceApi<Block, PublicKey>,
E: Environment<Block, Error = Error> + Send + Sync,
E::Proposer: Proposer<Block, Error = Error>,
SO: SyncOracle + Send + Sync,
L: JustificationSyncLink<Block>,
BS: BackoffAuthoringBlocksStrategy<NumberFor<Block>> + Send + Sync,
Error: std::error::Error + Send + From<ConsensusError> + 'static,
AS: AuxStore + Send + Sync + 'static,
BlockNumber: From<<Block::Header as Header>::Number>,
{
type BlockImport = BoxBlockImport<Block>;
type SyncOracle = SubspaceSyncOracle<SO>;
type JustificationSyncLink = L;
type CreateProposer =
Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
type Proposer = E::Proposer;
type Claim = (PreDigest<PublicKey>, SubspaceJustification);
type AuxData = ();
fn logging_target(&self) -> &'static str {
"subspace"
}
fn block_import(&mut self) -> &mut Self::BlockImport {
&mut self.block_import
}
fn aux_data(
&self,
_parent: &Block::Header,
_slot: Slot,
) -> Result<Self::AuxData, ConsensusError> {
Ok(())
}
fn authorities_len(&self, _epoch_data: &Self::AuxData) -> Option<usize> {
Some(2)
}
async fn claim_slot(
&mut self,
parent_header: &Block::Header,
slot: Slot,
_aux_data: &Self::AuxData,
) -> Option<Self::Claim> {
let parent_pre_digest = match extract_pre_digest(parent_header) {
Ok(pre_digest) => pre_digest,
Err(error) => {
error!(
%error,
"Failed to parse pre-digest out of parent header"
);
return None;
}
};
let parent_slot = parent_pre_digest.slot();
if slot <= parent_slot {
debug!(
"Skipping claiming slot {slot} it must be higher than parent slot {parent_slot}",
);
return None;
} else {
debug!(%slot, "Attempting to claim slot");
}
let chain_constants = self.subspace_link.chain_constants();
let parent_hash = parent_header.hash();
let runtime_api = self.client.runtime_api();
let (solution_range, voting_solution_range) =
extract_solution_ranges_for_block(self.client.as_ref(), parent_hash).ok()?;
let maybe_root_plot_public_key = runtime_api.root_plot_public_key(parent_hash).ok()?;
let parent_pot_parameters = runtime_api.pot_parameters(parent_hash).ok()?;
let parent_future_slot = if parent_header.number().is_zero() {
parent_slot
} else {
parent_slot + chain_constants.block_authoring_delay()
};
let (proof_of_time, future_proof_of_time, pot_justification) = {
self.pot_checkpoints
.retain(|&stored_slot, _checkpoints| stored_slot > parent_slot);
let proof_of_time = self.pot_checkpoints.get(&slot)?.output();
let future_slot = slot + chain_constants.block_authoring_delay();
let pot_input = if parent_header.number().is_zero() {
PotNextSlotInput {
slot: parent_slot + Slot::from(1),
slot_iterations: parent_pot_parameters.slot_iterations(),
seed: self.pot_verifier.genesis_seed(),
}
} else {
PotNextSlotInput::derive(
parent_pot_parameters.slot_iterations(),
parent_slot,
parent_pre_digest.pot_info().proof_of_time(),
&parent_pot_parameters.next_parameters_change(),
)
};
if !self.pot_verifier.is_output_valid(
pot_input,
slot - parent_slot,
proof_of_time,
parent_pot_parameters.next_parameters_change(),
) {
warn!(
%slot,
?pot_input,
?parent_pot_parameters,
"Proof of time is invalid, skipping block authoring at slot"
);
return None;
}
let mut checkpoints_pot_input = if parent_header.number().is_zero() {
PotNextSlotInput {
slot: parent_slot + Slot::from(1),
slot_iterations: parent_pot_parameters.slot_iterations(),
seed: self.pot_verifier.genesis_seed(),
}
} else {
let parent_pot_info = parent_pre_digest.pot_info();
PotNextSlotInput::derive(
parent_pot_parameters.slot_iterations(),
parent_future_slot,
parent_pot_info.future_proof_of_time(),
&parent_pot_parameters.next_parameters_change(),
)
};
let seed = checkpoints_pot_input.seed;
let mut checkpoints = Vec::with_capacity((*future_slot - *parent_future_slot) as usize);
for slot in *parent_future_slot + 1..=*future_slot {
let slot = Slot::from(slot);
let maybe_slot_checkpoints = self.pot_verifier.get_checkpoints(
checkpoints_pot_input.slot_iterations,
checkpoints_pot_input.seed,
);
let Some(slot_checkpoints) = maybe_slot_checkpoints else {
warn!("Proving failed during block authoring");
return None;
};
checkpoints.push(slot_checkpoints);
checkpoints_pot_input = PotNextSlotInput::derive(
checkpoints_pot_input.slot_iterations,
slot,
slot_checkpoints.output(),
&parent_pot_parameters.next_parameters_change(),
);
}
let future_proof_of_time = checkpoints
.last()
.expect("Never empty, there is at least one slot between blocks; qed")
.output();
let pot_justification = SubspaceJustification::PotCheckpoints { seed, checkpoints };
(proof_of_time, future_proof_of_time, pot_justification)
};
let mut solution_receiver = {
self.pending_solutions
.retain(|&stored_slot, _solution_receiver| stored_slot >= slot);
let mut solution_receiver = self.pending_solutions.remove(&slot)?;
solution_receiver.close();
solution_receiver
};
let mut maybe_pre_digest = None;
while let Some(solution) = solution_receiver.next().await {
if let Some(root_plot_public_key) = &maybe_root_plot_public_key {
if &solution.public_key != root_plot_public_key {
continue;
}
}
let sector_id = SectorId::new(
solution.public_key.hash(),
solution.sector_index,
solution.history_size,
);
let history_size = runtime_api.history_size(parent_hash).ok()?;
let max_pieces_in_sector = runtime_api.max_pieces_in_sector(parent_hash).ok()?;
let segment_index = sector_id
.derive_piece_index(
solution.piece_offset,
solution.history_size,
max_pieces_in_sector,
chain_constants.recent_segments(),
chain_constants.recent_history_fraction(),
)
.segment_index();
let maybe_segment_commitment = self
.segment_headers_store
.get_segment_header(segment_index)
.map(|segment_header| segment_header.segment_commitment());
let segment_commitment = match maybe_segment_commitment {
Some(segment_commitment) => segment_commitment,
None => {
warn!(
%slot,
%segment_index,
"Segment commitment not found",
);
continue;
}
};
let sector_expiration_check_segment_index = match solution
.history_size
.sector_expiration_check(chain_constants.min_sector_lifetime())
{
Some(sector_expiration_check) => sector_expiration_check.segment_index(),
None => {
continue;
}
};
let sector_expiration_check_segment_commitment = runtime_api
.segment_commitment(parent_hash, sector_expiration_check_segment_index)
.ok()?;
let solution_verification_result = verify_solution::<PosTable, _>(
&solution,
slot.into(),
&VerifySolutionParams {
proof_of_time,
solution_range: voting_solution_range,
piece_check_params: Some(PieceCheckParams {
max_pieces_in_sector,
segment_commitment,
recent_segments: chain_constants.recent_segments(),
recent_history_fraction: chain_constants.recent_history_fraction(),
min_sector_lifetime: chain_constants.min_sector_lifetime(),
current_history_size: history_size,
sector_expiration_check_segment_commitment,
}),
},
&self.subspace_link.kzg,
);
match solution_verification_result {
Ok(solution_distance) => {
if solution_distance <= solution_range / 2 {
if maybe_pre_digest.is_none() {
info!(%slot, "🚜 Claimed block at slot");
maybe_pre_digest.replace(PreDigest::V0 {
slot,
solution,
pot_info: PreDigestPotInfo::V0 {
proof_of_time,
future_proof_of_time,
},
});
} else {
info!(
%slot,
"Skipping solution that has quality sufficient for block because \
block pre-digest was already created",
);
}
} else if !parent_header.number().is_zero() {
info!(%slot, "🗳️ Claimed vote at slot");
self.create_vote(
parent_header,
slot,
solution,
proof_of_time,
future_proof_of_time,
)
.await;
}
}
Err(error @ subspace_verification::Error::OutsideSolutionRange { .. }) => {
if runtime_api
.solution_ranges(parent_hash)
.ok()
.and_then(|solution_ranges| solution_ranges.next)
.is_some()
{
debug!(
%slot,
%error,
"Invalid solution received",
);
} else {
warn!(
%slot,
%error,
"Invalid solution received",
);
}
}
Err(error) => {
warn!(
%slot,
%error,
"Invalid solution received",
);
}
}
}
maybe_pre_digest.map(|pre_digest| (pre_digest, pot_justification))
}
fn pre_digest_data(
&self,
_slot: Slot,
(pre_digest, _justification): &Self::Claim,
) -> Vec<DigestItem> {
vec![DigestItem::subspace_pre_digest(pre_digest)]
}
async fn block_import_params(
&self,
header: Block::Header,
header_hash: &Block::Hash,
body: Vec<Block::Extrinsic>,
storage_changes: sc_consensus_slots::StorageChanges<Block>,
(pre_digest, justification): Self::Claim,
_aux_data: Self::AuxData,
) -> Result<BlockImportParams<Block>, ConsensusError> {
let signature = self
.sign_reward(
H256::from_slice(header_hash.as_ref()),
pre_digest.solution().public_key,
)
.await?;
let digest_item = DigestItem::subspace_seal(signature);
let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(digest_item);
import_block.body = Some(body);
import_block.state_action =
StateAction::ApplyChanges(StorageChanges::Changes(storage_changes));
import_block
.justifications
.replace(Justifications::from(Justification::from(justification)));
Ok(import_block)
}
fn force_authoring(&self) -> bool {
self.force_authoring
}
fn should_backoff(&self, slot: Slot, chain_head: &Block::Header) -> bool {
if let Some(strategy) = &self.backoff_authoring_blocks {
if let Ok(chain_head_slot) = extract_pre_digest(chain_head).map(|digest| digest.slot())
{
return strategy.should_backoff(
*chain_head.number(),
chain_head_slot,
self.client.info().finalized_number,
slot,
self.logging_target(),
);
}
}
false
}
fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
&mut self.sync_oracle
}
fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
&mut self.justification_sync_link
}
fn proposer(&mut self, block: &Block::Header) -> Self::CreateProposer {
Box::pin(
self.env
.init(block)
.map_err(|e| ConsensusError::ClientImport(e.to_string())),
)
}
fn telemetry(&self) -> Option<TelemetryHandle> {
self.telemetry.clone()
}
fn proposing_remaining_duration(&self, slot_info: &SlotInfo<Block>) -> std::time::Duration {
let parent_slot = extract_pre_digest(&slot_info.chain_head)
.ok()
.map(|d| d.slot());
sc_consensus_slots::proposing_remaining_duration(
parent_slot,
slot_info,
&self.block_proposal_slot_portion,
self.max_block_proposal_slot_portion.as_ref(),
SlotLenienceType::Exponential,
self.logging_target(),
)
}
}
impl<PosTable, Block, Client, E, Error, SO, L, BS, AS>
SubspaceSlotWorker<PosTable, Block, Client, E, SO, L, BS, AS>
where
PosTable: Table,
Block: BlockT,
Client: ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = ClientError>
+ AuxStore
+ 'static,
Client::Api: SubspaceApi<Block, PublicKey>,
E: Environment<Block, Error = Error> + Send + Sync,
E::Proposer: Proposer<Block, Error = Error>,
SO: SyncOracle + Send + Sync,
L: JustificationSyncLink<Block>,
BS: BackoffAuthoringBlocksStrategy<NumberFor<Block>> + Send + Sync,
Error: std::error::Error + Send + From<ConsensusError> + 'static,
AS: AuxStore + Send + Sync + 'static,
BlockNumber: From<<Block::Header as Header>::Number>,
{
pub fn new(
SubspaceSlotWorkerOptions {
client,
env,
block_import,
sync_oracle,
justification_sync_link,
force_authoring,
backoff_authoring_blocks,
subspace_link,
segment_headers_store,
block_proposal_slot_portion,
max_block_proposal_slot_portion,
telemetry,
offchain_tx_pool_factory,
pot_verifier,
}: SubspaceSlotWorkerOptions<Block, Client, E, SO, L, BS, AS>,
) -> Self {
Self {
client: client.clone(),
block_import,
env,
sync_oracle,
justification_sync_link,
force_authoring,
backoff_authoring_blocks,
subspace_link,
reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT),
block_proposal_slot_portion,
max_block_proposal_slot_portion,
telemetry,
offchain_tx_pool_factory,
segment_headers_store,
pending_solutions: Default::default(),
pot_checkpoints: Default::default(),
pot_verifier,
_pos_table: PhantomData::<PosTable>,
}
}
async fn create_vote(
&self,
parent_header: &Block::Header,
slot: Slot,
solution: Solution<PublicKey>,
proof_of_time: PotOutput,
future_proof_of_time: PotOutput,
) {
let parent_hash = parent_header.hash();
let mut runtime_api = self.client.runtime_api();
runtime_api.register_extension(
self.offchain_tx_pool_factory
.offchain_transaction_pool(parent_hash),
);
if self.should_backoff(slot, parent_header) {
return;
}
let vote = Vote::V0 {
height: parent_header.number().saturating_add(One::one()),
parent_hash: parent_header.hash(),
slot,
solution: solution.clone(),
proof_of_time,
future_proof_of_time,
};
let signature = match self.sign_reward(vote.hash(), solution.public_key).await {
Ok(signature) => signature,
Err(error) => {
error!(
%slot,
%error,
"Failed to submit vote",
);
return;
}
};
let signed_vote = SignedVote { vote, signature };
if let Err(error) = runtime_api.submit_vote_extrinsic(parent_hash, signed_vote) {
error!(
%slot,
%error,
"Failed to submit vote",
);
}
}
async fn sign_reward(
&self,
hash: H256,
public_key: PublicKey,
) -> Result<RewardSignature, ConsensusError> {
let (signature_sender, mut signature_receiver) =
tracing_unbounded("subspace_signature_signing_stream", 100);
self.subspace_link
.reward_signing_notification_sender
.notify(|| RewardSigningNotification {
hash,
public_key,
signature_sender,
});
while let Some(signature) = signature_receiver.next().await {
if check_reward_signature(
hash.as_ref(),
&signature,
&public_key,
&self.reward_signing_context,
)
.is_err()
{
warn!(
%hash,
"Received invalid signature for reward"
);
continue;
}
return Ok(signature);
}
Err(ConsensusError::CannotSign(format!(
"Farmer didn't sign reward. Key: {:?}",
public_key
)))
}
}
pub(crate) fn extract_solution_ranges_for_block<Block, Client>(
client: &Client,
parent_hash: Block::Hash,
) -> Result<(u64, u64), ApiError>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>,
Client::Api: SubspaceApi<Block, PublicKey>,
{
client
.runtime_api()
.solution_ranges(parent_hash)
.map(|solution_ranges| {
(
solution_ranges.next.unwrap_or(solution_ranges.current),
solution_ranges
.voting_next
.unwrap_or(solution_ranges.voting_current),
)
})
}