subspace_service/sync_from_dsn/
piece_validator.rs

1use crate::sync_from_dsn::LOG_TARGET;
2use async_trait::async_trait;
3use sc_client_api::AuxStore;
4use sc_consensus_subspace::archiver::SegmentHeadersStore;
5use subspace_core_primitives::pieces::{Piece, PieceIndex};
6use subspace_kzg::Kzg;
7use subspace_networking::libp2p::PeerId;
8use subspace_networking::utils::piece_provider::PieceValidator;
9use subspace_networking::Node;
10use subspace_verification::is_piece_valid;
11use tracing::{error, warn};
12
13pub(crate) struct SegmentCommitmentPieceValidator<AS> {
14    dsn_node: Node,
15    kzg: Kzg,
16    segment_headers_store: SegmentHeadersStore<AS>,
17}
18
19impl<AS> SegmentCommitmentPieceValidator<AS>
20where
21    AS: AuxStore + Send + Sync + 'static,
22{
23    /// Segment headers must be in order from 0 to the last one that exists
24    pub(crate) fn new(
25        dsn_node: Node,
26        kzg: Kzg,
27        segment_headers_store: SegmentHeadersStore<AS>,
28    ) -> Self {
29        Self {
30            dsn_node,
31            kzg,
32            segment_headers_store,
33        }
34    }
35}
36
37#[async_trait]
38impl<AS> PieceValidator for SegmentCommitmentPieceValidator<AS>
39where
40    AS: AuxStore + Send + Sync + 'static,
41{
42    async fn validate_piece(
43        &self,
44        source_peer_id: PeerId,
45        piece_index: PieceIndex,
46        piece: Piece,
47    ) -> Option<Piece> {
48        if source_peer_id == self.dsn_node.id() {
49            return Some(piece);
50        }
51
52        let segment_index = piece_index.segment_index();
53
54        let maybe_segment_header = self.segment_headers_store.get_segment_header(segment_index);
55        let segment_commitment = match maybe_segment_header {
56            Some(segment_header) => segment_header.segment_commitment(),
57            None => {
58                error!(target: LOG_TARGET, %segment_index, "No segment commitment in the cache.");
59
60                return None;
61            }
62        };
63
64        let is_valid_fut = tokio::task::spawn_blocking({
65            let kzg = self.kzg.clone();
66
67            move || {
68                is_piece_valid(&kzg, &piece, &segment_commitment, piece_index.position())
69                    .then_some(piece)
70            }
71        });
72
73        match is_valid_fut.await.unwrap_or_default() {
74            Some(piece) => Some(piece),
75            None => {
76                warn!(
77                    target: LOG_TARGET,
78                    %piece_index,
79                    %source_peer_id,
80                    "Received invalid piece from peer"
81                );
82
83                // We don't care about result here
84                let _ = self.dsn_node.ban_peer(source_peer_id).await;
85                None
86            }
87        }
88    }
89}