sc_consensus_subspace/
slot_worker.rs

1//! Slot worker drives block and vote production based on slots produced in [`sc_proof_of_time`].
2//!
3//! While slot worker uses [`sc_consensus_slots`], it is not driven by time, but instead by Proof of
4//! Time that is produced by [`PotSourceWorker`](sc_proof_of_time::source::PotSourceWorker).
5//!
6//! Each time a new proof is found, [`PotSlotWorker::on_proof`] is called and corresponding
7//! [`SlotInfo`] notification is sent ([`SubspaceLink::new_slot_notification_stream`]) to farmers to
8//! do the audit and try to prove they have a solution without actually waiting for the response.
9//! [`ChainConstants::block_authoring_delay`](sp_consensus_subspace::ChainConstants::block_authoring_delay)
10//! slots later (when corresponding future proof arrives) all the solutions produced by farmers so
11//! far are collected and corresponding block and/or votes are produced. In case PoT chain reorg
12//! happens, outdated solutions (they are tied to proofs of time) are thrown away.
13//!
14//! Custom [`SubspaceSyncOracle`] wrapper is introduced due to Subspace-specific changes comparing
15//! to the base Substrate behavior where major syncing is assumed to not happen in case authoring is
16//! forced.
17
18use crate::SubspaceLink;
19use crate::archiver::SegmentHeadersStore;
20use futures::channel::mpsc;
21use futures::{StreamExt, TryFutureExt};
22use sc_client_api::AuxStore;
23use sc_consensus::block_import::{BlockImportParams, StateAction};
24use sc_consensus::{BoxBlockImport, JustificationSyncLink, StorageChanges};
25use sc_consensus_slots::{
26    BackoffAuthoringBlocksStrategy, SimpleSlotWorker, SlotInfo, SlotLenienceType, SlotProportion,
27};
28use sc_proof_of_time::PotSlotWorker;
29use sc_proof_of_time::source::pot_next_slot_input;
30use sc_proof_of_time::verifier::PotVerifier;
31use sc_telemetry::TelemetryHandle;
32use sc_transaction_pool_api::OffchainTransactionPoolFactory;
33use sc_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
34use schnorrkel::context::SigningContext;
35use sp_api::{ApiError, ApiExt, ProvideRuntimeApi};
36use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
37use sp_consensus::{BlockOrigin, Environment, Error as ConsensusError, Proposer, SyncOracle};
38use sp_consensus_slots::Slot;
39use sp_consensus_subspace::digests::{
40    CompatibleDigestItem, PreDigest, PreDigestPotInfo, extract_pre_digest,
41};
42use sp_consensus_subspace::{
43    PotNextSlotInput, SignedVote, SubspaceApi, SubspaceJustification, Vote,
44};
45use sp_core::H256;
46use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One, Saturating, Zero};
47use sp_runtime::{DigestItem, Justification, Justifications};
48use std::collections::BTreeMap;
49use std::future::Future;
50use std::marker::PhantomData;
51use std::pin::Pin;
52use std::sync::Arc;
53use std::sync::atomic::{AtomicBool, Ordering};
54use subspace_core_primitives::pot::{PotCheckpoints, PotOutput};
55use subspace_core_primitives::sectors::SectorId;
56use subspace_core_primitives::solutions::{RewardSignature, Solution, SolutionRange};
57use subspace_core_primitives::{BlockNumber, PublicKey, REWARD_SIGNING_CONTEXT};
58use subspace_proof_of_space::Table;
59use subspace_verification::{
60    PieceCheckParams, VerifySolutionParams, check_reward_signature, verify_solution,
61};
62use tracing::{debug, error, info, warn};
63
64/// Large enough size for any practical purposes, there shouldn't be even this many solutions.
65const PENDING_SOLUTIONS_CHANNEL_CAPACITY: usize = 10;
66
67/// Subspace sync oracle.
68///
69/// Subspace sync oracle that takes into account force authoring flag, allowing to bootstrap
70/// Subspace network from scratch due to our fork of Substrate where sync state of nodes depends on
71/// connected nodes (none of which will be synced initially). It also accounts for DSN sync, when
72/// normal Substrate sync is paused, which might happen before Substrate's internals decide there is
73/// a sync happening, but DSN sync is already in progress.
74#[derive(Debug, Clone)]
75pub struct SubspaceSyncOracle<SO>
76where
77    SO: SyncOracle + Send + Sync,
78{
79    force_authoring: bool,
80    pause_sync: Arc<AtomicBool>,
81    inner: SO,
82}
83
84impl<SO> SyncOracle for SubspaceSyncOracle<SO>
85where
86    SO: SyncOracle + Send + Sync,
87{
88    fn is_major_syncing(&self) -> bool {
89        // This allows slot worker to produce blocks even when it is offline, which according to
90        // modified Substrate fork will happen when node is offline or connected to non-synced peers
91        // (default state), it also accounts for DSN sync
92        (!self.force_authoring && self.inner.is_major_syncing())
93            || self.pause_sync.load(Ordering::Acquire)
94    }
95
96    fn is_offline(&self) -> bool {
97        self.inner.is_offline()
98    }
99}
100
101impl<SO> SubspaceSyncOracle<SO>
102where
103    SO: SyncOracle + Send + Sync,
104{
105    /// Create new instance
106    pub fn new(
107        force_authoring: bool,
108        pause_sync: Arc<AtomicBool>,
109        substrate_sync_oracle: SO,
110    ) -> Self {
111        Self {
112            force_authoring,
113            pause_sync,
114            inner: substrate_sync_oracle,
115        }
116    }
117}
118
119/// Information about new slot that just arrived
120#[derive(Debug, Copy, Clone)]
121pub struct NewSlotInfo {
122    /// Slot
123    pub slot: Slot,
124    /// The PoT output for `slot`
125    pub proof_of_time: PotOutput,
126    /// Acceptable solution range for block authoring
127    pub solution_range: SolutionRange,
128    /// Acceptable solution range for voting
129    pub voting_solution_range: SolutionRange,
130}
131
132/// New slot notification with slot information and sender for solution for the slot.
133#[derive(Debug, Clone)]
134pub struct NewSlotNotification {
135    /// New slot information.
136    pub new_slot_info: NewSlotInfo,
137    /// Sender that can be used to send solutions for the slot.
138    pub solution_sender: mpsc::Sender<Solution<PublicKey>>,
139}
140/// Notification with a hash that needs to be signed to receive reward and sender for signature.
141#[derive(Debug, Clone)]
142pub struct RewardSigningNotification {
143    /// Hash to be signed.
144    pub hash: H256,
145    /// Public key of the plot identity that should create signature.
146    pub public_key: PublicKey,
147    /// Sender that can be used to send signature for the header.
148    pub signature_sender: TracingUnboundedSender<RewardSignature>,
149}
150
151/// Parameters for [`SubspaceSlotWorker`]
152pub struct SubspaceSlotWorkerOptions<Block, Client, E, SO, L, BS, AS>
153where
154    Block: BlockT,
155    SO: SyncOracle + Send + Sync,
156{
157    /// The client to use
158    pub client: Arc<Client>,
159    /// The environment we are producing blocks for.
160    pub env: E,
161    /// The underlying block-import object to supply our produced blocks to.
162    /// This must be a `SubspaceBlockImport` or a wrapper of it, otherwise
163    /// critical consensus logic will be omitted.
164    pub block_import: BoxBlockImport<Block>,
165    /// A sync oracle
166    pub sync_oracle: SubspaceSyncOracle<SO>,
167    /// Hook into the sync module to control the justification sync process.
168    pub justification_sync_link: L,
169    /// Force authoring of blocks even if we are offline
170    pub force_authoring: bool,
171    /// Strategy and parameters for backing off block production.
172    pub backoff_authoring_blocks: Option<BS>,
173    /// The source of timestamps for relative slots
174    pub subspace_link: SubspaceLink<Block>,
175    /// Persistent storage of segment headers
176    pub segment_headers_store: SegmentHeadersStore<AS>,
177    /// The proportion of the slot dedicated to proposing.
178    ///
179    /// The block proposing will be limited to this proportion of the slot from the starting of the
180    /// slot. However, the proposing can still take longer when there is some lenience factor applied,
181    /// because there were no blocks produced for some slots.
182    pub block_proposal_slot_portion: SlotProportion,
183    /// The maximum proportion of the slot dedicated to proposing with any lenience factor applied
184    /// due to no blocks being produced.
185    pub max_block_proposal_slot_portion: Option<SlotProportion>,
186    /// Handle use to report telemetries.
187    pub telemetry: Option<TelemetryHandle>,
188    /// The offchain transaction pool factory.
189    ///
190    /// Will be used when sending equivocation reports and votes.
191    pub offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
192    /// Proof of time verifier
193    pub pot_verifier: PotVerifier,
194}
195
196/// Subspace slot worker responsible for block and vote production
197pub struct SubspaceSlotWorker<PosTable, Block, Client, E, SO, L, BS, AS>
198where
199    Block: BlockT,
200    SO: SyncOracle + Send + Sync,
201{
202    client: Arc<Client>,
203    block_import: BoxBlockImport<Block>,
204    env: E,
205    sync_oracle: SubspaceSyncOracle<SO>,
206    justification_sync_link: L,
207    force_authoring: bool,
208    backoff_authoring_blocks: Option<BS>,
209    subspace_link: SubspaceLink<Block>,
210    reward_signing_context: SigningContext,
211    block_proposal_slot_portion: SlotProportion,
212    max_block_proposal_slot_portion: Option<SlotProportion>,
213    telemetry: Option<TelemetryHandle>,
214    offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
215    segment_headers_store: SegmentHeadersStore<AS>,
216    /// Solution receivers for challenges that were sent to farmers and expected to be received
217    /// eventually
218    pending_solutions: BTreeMap<Slot, mpsc::Receiver<Solution<PublicKey>>>,
219    /// Collection of PoT slots that can be retrieved later if needed by block production
220    pot_checkpoints: BTreeMap<Slot, PotCheckpoints>,
221    pot_verifier: PotVerifier,
222    _pos_table: PhantomData<PosTable>,
223}
224
225impl<PosTable, Block, Client, E, SO, L, BS, AS> PotSlotWorker<Block>
226    for SubspaceSlotWorker<PosTable, Block, Client, E, SO, L, BS, AS>
227where
228    Block: BlockT,
229    Client: HeaderBackend<Block> + ProvideRuntimeApi<Block>,
230    Client::Api: SubspaceApi<Block, PublicKey>,
231    SO: SyncOracle + Send + Sync,
232{
233    fn on_proof(&mut self, slot: Slot, checkpoints: PotCheckpoints) {
234        // Remove checkpoints from future slots, if present they are out of date anyway
235        self.pot_checkpoints
236            .retain(|&stored_slot, _checkpoints| stored_slot < slot);
237
238        self.pot_checkpoints.insert(slot, checkpoints);
239
240        if self.sync_oracle.is_major_syncing() {
241            debug!("Skipping farming slot {slot} due to sync");
242            return;
243        }
244
245        let maybe_root_plot_public_key = self
246            .client
247            .runtime_api()
248            .root_plot_public_key(self.client.info().best_hash)
249            .ok()
250            .flatten();
251        if maybe_root_plot_public_key.is_some() && !self.force_authoring {
252            debug!(
253                "Skipping farming slot {slot} due to root public key present and force authoring \
254                not enabled"
255            );
256            return;
257        }
258
259        let proof_of_time = checkpoints.output();
260
261        // NOTE: Best hash is not necessarily going to be the parent of corresponding block, but
262        // solution range shouldn't be too far off
263        let best_hash = self.client.info().best_hash;
264        let (solution_range, voting_solution_range) =
265            match extract_solution_ranges_for_block(self.client.as_ref(), best_hash) {
266                Ok(solution_ranges) => solution_ranges,
267                Err(error) => {
268                    warn!(
269                        %slot,
270                        %best_hash,
271                        %error,
272                        "Failed to extract solution ranges for block"
273                    );
274                    return;
275                }
276            };
277
278        let new_slot_info = NewSlotInfo {
279            slot,
280            proof_of_time,
281            solution_range,
282            voting_solution_range,
283        };
284        let (solution_sender, solution_receiver) =
285            mpsc::channel(PENDING_SOLUTIONS_CHANNEL_CAPACITY);
286
287        self.subspace_link
288            .new_slot_notification_sender
289            .notify(|| NewSlotNotification {
290                new_slot_info,
291                solution_sender,
292            });
293
294        self.pending_solutions.insert(slot, solution_receiver);
295    }
296}
297
298#[async_trait::async_trait]
299impl<PosTable, Block, Client, E, Error, SO, L, BS, AS> SimpleSlotWorker<Block>
300    for SubspaceSlotWorker<PosTable, Block, Client, E, SO, L, BS, AS>
301where
302    PosTable: Table,
303    Block: BlockT,
304    Client: ProvideRuntimeApi<Block>
305        + HeaderBackend<Block>
306        + HeaderMetadata<Block, Error = ClientError>
307        + AuxStore
308        + 'static,
309    Client::Api: SubspaceApi<Block, PublicKey>,
310    E: Environment<Block, Error = Error> + Send + Sync,
311    E::Proposer: Proposer<Block, Error = Error>,
312    SO: SyncOracle + Send + Sync,
313    L: JustificationSyncLink<Block>,
314    BS: BackoffAuthoringBlocksStrategy<NumberFor<Block>> + Send + Sync,
315    Error: std::error::Error + Send + From<ConsensusError> + 'static,
316    AS: AuxStore + Send + Sync + 'static,
317    BlockNumber: From<NumberFor<Block>>,
318{
319    type BlockImport = BoxBlockImport<Block>;
320    type SyncOracle = SubspaceSyncOracle<SO>;
321    type JustificationSyncLink = L;
322    type CreateProposer =
323        Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
324    type Proposer = E::Proposer;
325    type Claim = (PreDigest<PublicKey>, SubspaceJustification);
326    type AuxData = ();
327
328    fn logging_target(&self) -> &'static str {
329        "subspace"
330    }
331
332    fn block_import(&mut self) -> &mut Self::BlockImport {
333        &mut self.block_import
334    }
335
336    fn aux_data(
337        &self,
338        _parent: &Block::Header,
339        _slot: Slot,
340    ) -> Result<Self::AuxData, ConsensusError> {
341        Ok(())
342    }
343
344    fn authorities_len(&self, _epoch_data: &Self::AuxData) -> Option<usize> {
345        // This function is used in `sc-consensus-slots` in order to determine whether it is
346        // possible to skip block production under certain circumstances, returning `None` or any
347        // number smaller or equal to `1` disables that functionality and we don't want that.
348        Some(2)
349    }
350
351    async fn claim_slot(
352        &mut self,
353        parent_header: &Block::Header,
354        slot: Slot,
355        _aux_data: &Self::AuxData,
356    ) -> Option<Self::Claim> {
357        let parent_pre_digest = match extract_pre_digest(parent_header) {
358            Ok(pre_digest) => pre_digest,
359            Err(error) => {
360                error!(
361                    %error,
362                    "Failed to parse pre-digest out of parent header"
363                );
364
365                return None;
366            }
367        };
368        let parent_slot = parent_pre_digest.slot();
369
370        if slot <= parent_slot {
371            debug!(
372                "Skipping claiming slot {slot} it must be higher than parent slot {parent_slot}",
373            );
374
375            return None;
376        } else {
377            debug!(%slot, "Attempting to claim slot");
378        }
379
380        let chain_constants = self.subspace_link.chain_constants();
381
382        let parent_hash = parent_header.hash();
383        let runtime_api = self.client.runtime_api();
384
385        let (solution_range, voting_solution_range) =
386            extract_solution_ranges_for_block(self.client.as_ref(), parent_hash).ok()?;
387
388        let maybe_root_plot_public_key = runtime_api.root_plot_public_key(parent_hash).ok()?;
389
390        let parent_pot_parameters = runtime_api.pot_parameters(parent_hash).ok()?;
391        let parent_future_slot = if parent_header.number().is_zero() {
392            parent_slot
393        } else {
394            parent_slot + chain_constants.block_authoring_delay()
395        };
396
397        let (proof_of_time, future_proof_of_time, pot_justification) = {
398            // Remove checkpoints from old slots we will not need anymore
399            self.pot_checkpoints
400                .retain(|&stored_slot, _checkpoints| stored_slot > parent_slot);
401
402            let proof_of_time = self.pot_checkpoints.get(&slot)?.output();
403
404            // Future slot for which proof must be available before authoring block at this slot
405            let future_slot = slot + chain_constants.block_authoring_delay();
406
407            let pot_input = pot_next_slot_input::<Block>(
408                parent_header.number(),
409                parent_slot,
410                &parent_pot_parameters,
411                self.pot_verifier.genesis_seed(),
412                parent_pre_digest.pot_info().proof_of_time(),
413            );
414
415            // Ensure proof of time is valid according to parent block
416            if !self.pot_verifier.is_output_valid(
417                pot_input,
418                slot - parent_slot,
419                proof_of_time,
420                parent_pot_parameters.next_parameters_change(),
421            ) {
422                warn!(
423                    %slot,
424                    ?pot_input,
425                    ?parent_pot_parameters,
426                    "Proof of time is invalid, skipping block authoring at slot"
427                );
428                return None;
429            }
430
431            let mut checkpoints_pot_input = if parent_header.number().is_zero() {
432                PotNextSlotInput {
433                    slot: parent_slot + Slot::from(1),
434                    slot_iterations: parent_pot_parameters.slot_iterations(),
435                    seed: self.pot_verifier.genesis_seed(),
436                }
437            } else {
438                let parent_pot_info = parent_pre_digest.pot_info();
439
440                PotNextSlotInput::derive(
441                    parent_pot_parameters.slot_iterations(),
442                    parent_future_slot,
443                    parent_pot_info.future_proof_of_time(),
444                    &parent_pot_parameters.next_parameters_change(),
445                )
446            };
447            let seed = checkpoints_pot_input.seed;
448
449            let mut checkpoints = Vec::with_capacity((*future_slot - *parent_future_slot) as usize);
450
451            for slot in *parent_future_slot + 1..=*future_slot {
452                let slot = Slot::from(slot);
453                let maybe_slot_checkpoints = self.pot_verifier.get_checkpoints(
454                    checkpoints_pot_input.slot_iterations,
455                    checkpoints_pot_input.seed,
456                );
457                let Some(slot_checkpoints) = maybe_slot_checkpoints else {
458                    warn!("Proving failed during block authoring");
459                    return None;
460                };
461
462                checkpoints.push(slot_checkpoints);
463
464                checkpoints_pot_input = PotNextSlotInput::derive(
465                    checkpoints_pot_input.slot_iterations,
466                    slot,
467                    slot_checkpoints.output(),
468                    &parent_pot_parameters.next_parameters_change(),
469                );
470            }
471
472            let future_proof_of_time = checkpoints
473                .last()
474                .expect("Never empty, there is at least one slot between blocks; qed")
475                .output();
476
477            let pot_justification = SubspaceJustification::PotCheckpoints { seed, checkpoints };
478
479            (proof_of_time, future_proof_of_time, pot_justification)
480        };
481
482        let mut solution_receiver = {
483            // Remove receivers for old slots we will not need anymore
484            self.pending_solutions
485                .retain(|&stored_slot, _solution_receiver| stored_slot >= slot);
486
487            let mut solution_receiver = self.pending_solutions.remove(&slot)?;
488            // Time is out, we will not accept any more solutions
489            solution_receiver.close();
490            solution_receiver
491        };
492
493        let mut maybe_pre_digest = None;
494
495        while let Some(solution) = solution_receiver.next().await {
496            if let Some(root_plot_public_key) = &maybe_root_plot_public_key
497                && &solution.public_key != root_plot_public_key
498            {
499                // Only root plot public key is allowed, no need to even try to claim block or
500                // vote.
501                continue;
502            }
503
504            let sector_id = SectorId::new(
505                solution.public_key.hash(),
506                solution.sector_index,
507                solution.history_size,
508            );
509
510            let history_size = runtime_api.history_size(parent_hash).ok()?;
511            let max_pieces_in_sector = runtime_api.max_pieces_in_sector(parent_hash).ok()?;
512
513            let segment_index = sector_id
514                .derive_piece_index(
515                    solution.piece_offset,
516                    solution.history_size,
517                    max_pieces_in_sector,
518                    chain_constants.recent_segments(),
519                    chain_constants.recent_history_fraction(),
520                )
521                .segment_index();
522            let maybe_segment_commitment = self
523                .segment_headers_store
524                .get_segment_header(segment_index)
525                .map(|segment_header| segment_header.segment_commitment());
526
527            let segment_commitment = match maybe_segment_commitment {
528                Some(segment_commitment) => segment_commitment,
529                None => {
530                    warn!(
531                        %slot,
532                        %segment_index,
533                        "Segment commitment not found",
534                    );
535                    continue;
536                }
537            };
538            let sector_expiration_check_segment_index = match solution
539                .history_size
540                .sector_expiration_check(chain_constants.min_sector_lifetime())
541            {
542                Some(sector_expiration_check) => sector_expiration_check.segment_index(),
543                None => {
544                    continue;
545                }
546            };
547            let sector_expiration_check_segment_commitment = self
548                .segment_headers_store
549                .get_segment_header(sector_expiration_check_segment_index)
550                .map(|segment_header| segment_header.segment_commitment());
551
552            let solution_verification_result = verify_solution::<PosTable, _>(
553                &solution,
554                slot.into(),
555                &VerifySolutionParams {
556                    proof_of_time,
557                    solution_range: voting_solution_range,
558                    piece_check_params: Some(PieceCheckParams {
559                        max_pieces_in_sector,
560                        segment_commitment,
561                        recent_segments: chain_constants.recent_segments(),
562                        recent_history_fraction: chain_constants.recent_history_fraction(),
563                        min_sector_lifetime: chain_constants.min_sector_lifetime(),
564                        current_history_size: history_size,
565                        sector_expiration_check_segment_commitment,
566                    }),
567                },
568                &self.subspace_link.kzg,
569            );
570
571            match solution_verification_result {
572                Ok(solution_distance) => {
573                    // If solution is of high enough quality and block pre-digest wasn't produced yet,
574                    // block reward is claimed
575                    if solution_distance <= solution_range / 2 {
576                        if maybe_pre_digest.is_none() {
577                            info!(%slot, "🚜 Claimed block at slot");
578                            maybe_pre_digest.replace(PreDigest::V0 {
579                                slot,
580                                solution,
581                                pot_info: PreDigestPotInfo::V0 {
582                                    proof_of_time,
583                                    future_proof_of_time,
584                                },
585                            });
586                        } else {
587                            info!(
588                                %slot,
589                                "Skipping solution that has quality sufficient for block because \
590                                block pre-digest was already created",
591                            );
592                        }
593                    } else if !parent_header.number().is_zero() {
594                        // Not sending vote on top of genesis block since segment headers since piece
595                        // verification wouldn't be possible due to missing (for now) segment commitment
596                        info!(%slot, "🗳️ Claimed vote at slot");
597
598                        self.create_vote(
599                            parent_header,
600                            slot,
601                            solution,
602                            proof_of_time,
603                            future_proof_of_time,
604                        )
605                        .await;
606                    }
607                }
608                Err(error @ subspace_verification::Error::OutsideSolutionRange { .. }) => {
609                    // Solution range might have just adjusted, but when farmer was auditing they
610                    // didn't know about this, so downgrade warning to debug message
611                    if runtime_api
612                        .solution_ranges(parent_hash)
613                        .ok()
614                        .and_then(|solution_ranges| solution_ranges.next)
615                        .is_some()
616                    {
617                        debug!(
618                            %slot,
619                            %error,
620                            "Invalid solution received",
621                        );
622                    } else {
623                        warn!(
624                            %slot,
625                            %error,
626                            "Invalid solution received",
627                        );
628                    }
629                }
630                Err(error) => {
631                    warn!(
632                        %slot,
633                        %error,
634                        "Invalid solution received",
635                    );
636                }
637            }
638        }
639
640        maybe_pre_digest.map(|pre_digest| (pre_digest, pot_justification))
641    }
642
643    fn pre_digest_data(
644        &self,
645        _slot: Slot,
646        (pre_digest, _justification): &Self::Claim,
647    ) -> Vec<DigestItem> {
648        vec![DigestItem::subspace_pre_digest(pre_digest)]
649    }
650
651    async fn block_import_params(
652        &self,
653        header: Block::Header,
654        header_hash: &Block::Hash,
655        body: Vec<Block::Extrinsic>,
656        storage_changes: sc_consensus_slots::StorageChanges<Block>,
657        (pre_digest, justification): Self::Claim,
658        _aux_data: Self::AuxData,
659    ) -> Result<BlockImportParams<Block>, ConsensusError> {
660        let signature = self
661            .sign_reward(
662                H256::from_slice(header_hash.as_ref()),
663                pre_digest.solution().public_key,
664            )
665            .await?;
666
667        let digest_item = DigestItem::subspace_seal(signature);
668
669        let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
670        import_block.post_digests.push(digest_item);
671        import_block.body = Some(body);
672        import_block.state_action =
673            StateAction::ApplyChanges(StorageChanges::Changes(storage_changes));
674        import_block
675            .justifications
676            .replace(Justifications::from(Justification::from(justification)));
677
678        Ok(import_block)
679    }
680
681    fn force_authoring(&self) -> bool {
682        self.force_authoring
683    }
684
685    fn should_backoff(&self, slot: Slot, chain_head: &Block::Header) -> bool {
686        if let Some(strategy) = &self.backoff_authoring_blocks
687            && let Ok(chain_head_slot) = extract_pre_digest(chain_head).map(|digest| digest.slot())
688        {
689            return strategy.should_backoff(
690                *chain_head.number(),
691                chain_head_slot,
692                self.client.info().finalized_number,
693                slot,
694                self.logging_target(),
695            );
696        }
697        false
698    }
699
700    fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
701        &mut self.sync_oracle
702    }
703
704    fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
705        &mut self.justification_sync_link
706    }
707
708    fn proposer(&mut self, block: &Block::Header) -> Self::CreateProposer {
709        Box::pin(
710            self.env
711                .init(block)
712                .map_err(|e| ConsensusError::ClientImport(e.to_string())),
713        )
714    }
715
716    fn telemetry(&self) -> Option<TelemetryHandle> {
717        self.telemetry.clone()
718    }
719
720    fn proposing_remaining_duration(&self, slot_info: &SlotInfo<Block>) -> std::time::Duration {
721        let parent_slot = extract_pre_digest(&slot_info.chain_head)
722            .ok()
723            .map(|d| d.slot());
724
725        sc_consensus_slots::proposing_remaining_duration(
726            parent_slot,
727            slot_info,
728            &self.block_proposal_slot_portion,
729            self.max_block_proposal_slot_portion.as_ref(),
730            SlotLenienceType::Exponential,
731            self.logging_target(),
732        )
733    }
734}
735
736impl<PosTable, Block, Client, E, Error, SO, L, BS, AS>
737    SubspaceSlotWorker<PosTable, Block, Client, E, SO, L, BS, AS>
738where
739    PosTable: Table,
740    Block: BlockT,
741    Client: ProvideRuntimeApi<Block>
742        + HeaderBackend<Block>
743        + HeaderMetadata<Block, Error = ClientError>
744        + AuxStore
745        + 'static,
746    Client::Api: SubspaceApi<Block, PublicKey>,
747    E: Environment<Block, Error = Error> + Send + Sync,
748    E::Proposer: Proposer<Block, Error = Error>,
749    SO: SyncOracle + Send + Sync,
750    L: JustificationSyncLink<Block>,
751    BS: BackoffAuthoringBlocksStrategy<NumberFor<Block>> + Send + Sync,
752    Error: std::error::Error + Send + From<ConsensusError> + 'static,
753    AS: AuxStore + Send + Sync + 'static,
754    BlockNumber: From<NumberFor<Block>>,
755{
756    /// Create new Subspace slot worker
757    pub fn new(
758        SubspaceSlotWorkerOptions {
759            client,
760            env,
761            block_import,
762            sync_oracle,
763            justification_sync_link,
764            force_authoring,
765            backoff_authoring_blocks,
766            subspace_link,
767            segment_headers_store,
768            block_proposal_slot_portion,
769            max_block_proposal_slot_portion,
770            telemetry,
771            offchain_tx_pool_factory,
772            pot_verifier,
773        }: SubspaceSlotWorkerOptions<Block, Client, E, SO, L, BS, AS>,
774    ) -> Self {
775        Self {
776            client: client.clone(),
777            block_import,
778            env,
779            sync_oracle,
780            justification_sync_link,
781            force_authoring,
782            backoff_authoring_blocks,
783            subspace_link,
784            reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT),
785            block_proposal_slot_portion,
786            max_block_proposal_slot_portion,
787            telemetry,
788            offchain_tx_pool_factory,
789            segment_headers_store,
790            pending_solutions: Default::default(),
791            pot_checkpoints: Default::default(),
792            pot_verifier,
793            _pos_table: PhantomData::<PosTable>,
794        }
795    }
796
797    async fn create_vote(
798        &self,
799        parent_header: &Block::Header,
800        slot: Slot,
801        solution: Solution<PublicKey>,
802        proof_of_time: PotOutput,
803        future_proof_of_time: PotOutput,
804    ) {
805        let parent_hash = parent_header.hash();
806        let mut runtime_api = self.client.runtime_api();
807        // Register the offchain tx pool to be able to use it from the runtime.
808        runtime_api.register_extension(
809            self.offchain_tx_pool_factory
810                .offchain_transaction_pool(parent_hash),
811        );
812
813        if self.should_backoff(slot, parent_header) {
814            return;
815        }
816
817        // Vote doesn't have extrinsics or state, hence dummy values
818        let vote = Vote::V0 {
819            height: parent_header.number().saturating_add(One::one()),
820            parent_hash: parent_header.hash(),
821            slot,
822            solution: solution.clone(),
823            proof_of_time,
824            future_proof_of_time,
825        };
826
827        let signature = match self.sign_reward(vote.hash(), solution.public_key).await {
828            Ok(signature) => signature,
829            Err(error) => {
830                error!(
831                    %slot,
832                    %error,
833                    "Failed to submit vote",
834                );
835                return;
836            }
837        };
838
839        let signed_vote = SignedVote { vote, signature };
840
841        if let Err(error) = runtime_api.submit_vote_extrinsic(parent_hash, signed_vote) {
842            error!(
843                %slot,
844                %error,
845                "Failed to submit vote",
846            );
847        }
848    }
849
850    async fn sign_reward(
851        &self,
852        hash: H256,
853        public_key: PublicKey,
854    ) -> Result<RewardSignature, ConsensusError> {
855        let (signature_sender, mut signature_receiver) =
856            tracing_unbounded("subspace_signature_signing_stream", 100);
857
858        self.subspace_link
859            .reward_signing_notification_sender
860            .notify(|| RewardSigningNotification {
861                hash,
862                public_key,
863                signature_sender,
864            });
865
866        while let Some(signature) = signature_receiver.next().await {
867            if check_reward_signature(
868                hash.as_ref(),
869                &signature,
870                &public_key,
871                &self.reward_signing_context,
872            )
873            .is_err()
874            {
875                warn!(
876                    %hash,
877                    "Received invalid signature for reward"
878                );
879                continue;
880            }
881
882            return Ok(signature);
883        }
884
885        Err(ConsensusError::CannotSign(format!(
886            "Farmer didn't sign reward. Key: {public_key:?}"
887        )))
888    }
889}
890
891/// Extract solution ranges for block and votes, given ID of the parent block.
892pub(crate) fn extract_solution_ranges_for_block<Block, Client>(
893    client: &Client,
894    parent_hash: Block::Hash,
895) -> Result<(u64, u64), ApiError>
896where
897    Block: BlockT,
898    Client: ProvideRuntimeApi<Block>,
899    Client::Api: SubspaceApi<Block, PublicKey>,
900{
901    client
902        .runtime_api()
903        .solution_ranges(parent_hash)
904        .map(|solution_ranges| {
905            (
906                solution_ranges.next.unwrap_or(solution_ranges.current),
907                solution_ranges
908                    .voting_next
909                    .unwrap_or(solution_ranges.voting_current),
910            )
911        })
912}