subspace_service/sync_from_dsn/
piece_validator.rs

1use async_trait::async_trait;
2use sc_client_api::AuxStore;
3use sc_consensus_subspace::archiver::SegmentHeadersStore;
4use subspace_core_primitives::pieces::{Piece, PieceIndex};
5use subspace_kzg::Kzg;
6use subspace_networking::Node;
7use subspace_networking::libp2p::PeerId;
8use subspace_networking::utils::piece_provider::PieceValidator;
9use subspace_verification::is_piece_valid;
10use tracing::{error, warn};
11
12pub(crate) struct SegmentCommitmentPieceValidator<AS> {
13    dsn_node: Node,
14    kzg: Kzg,
15    segment_headers_store: SegmentHeadersStore<AS>,
16}
17
18impl<AS> SegmentCommitmentPieceValidator<AS>
19where
20    AS: AuxStore + Send + Sync + 'static,
21{
22    /// Segment headers must be in order from 0 to the last one that exists
23    pub(crate) fn new(
24        dsn_node: Node,
25        kzg: Kzg,
26        segment_headers_store: SegmentHeadersStore<AS>,
27    ) -> Self {
28        Self {
29            dsn_node,
30            kzg,
31            segment_headers_store,
32        }
33    }
34}
35
36#[async_trait]
37impl<AS> PieceValidator for SegmentCommitmentPieceValidator<AS>
38where
39    AS: AuxStore + Send + Sync + 'static,
40{
41    async fn validate_piece(
42        &self,
43        source_peer_id: PeerId,
44        piece_index: PieceIndex,
45        piece: Piece,
46    ) -> Option<Piece> {
47        if source_peer_id == self.dsn_node.id() {
48            return Some(piece);
49        }
50
51        let segment_index = piece_index.segment_index();
52
53        let maybe_segment_header = self.segment_headers_store.get_segment_header(segment_index);
54        let segment_commitment = match maybe_segment_header {
55            Some(segment_header) => segment_header.segment_commitment(),
56            None => {
57                error!( %segment_index, "No segment commitment in the cache.");
58
59                return None;
60            }
61        };
62
63        let is_valid_fut = tokio::task::spawn_blocking({
64            let kzg = self.kzg.clone();
65
66            move || {
67                is_piece_valid(&kzg, &piece, &segment_commitment, piece_index.position())
68                    .then_some(piece)
69            }
70        });
71
72        match is_valid_fut.await.unwrap_or_default() {
73            Some(piece) => Some(piece),
74            None => {
75                warn!(
76                    %piece_index,
77                    %source_peer_id,
78                    "Received invalid piece from peer"
79                );
80
81                // We don't care about result here
82                let _ = self.dsn_node.ban_peer(source_peer_id).await;
83                None
84            }
85        }
86    }
87}