subspace_farmer/farmer_piece_getter/
piece_validator.rs1use crate::node_client::NodeClient;
4use async_trait::async_trait;
5use subspace_core_primitives::pieces::{Piece, PieceIndex};
6use subspace_kzg::Kzg;
7use subspace_networking::libp2p::PeerId;
8use subspace_networking::utils::piece_provider::PieceValidator;
9use subspace_networking::Node;
10use subspace_verification::is_piece_valid;
11use tracing::{error, warn};
12
13#[derive(Debug, Clone)]
17pub struct SegmentCommitmentPieceValidator<NC> {
18 dsn_node: Node,
19 node_client: NC,
20 kzg: Kzg,
21}
22
23impl<NC> SegmentCommitmentPieceValidator<NC> {
24 pub fn new(dsn_node: Node, node_client: NC, kzg: Kzg) -> Self {
26 Self {
27 dsn_node,
28 node_client,
29 kzg,
30 }
31 }
32}
33
34#[async_trait]
35impl<NC> PieceValidator for SegmentCommitmentPieceValidator<NC>
36where
37 NC: NodeClient,
38{
39 async fn validate_piece(
40 &self,
41 source_peer_id: PeerId,
42 piece_index: PieceIndex,
43 piece: Piece,
44 ) -> Option<Piece> {
45 if source_peer_id == self.dsn_node.id() {
46 return Some(piece);
47 }
48
49 let segment_index = piece_index.segment_index();
50
51 let segment_headers = match self.node_client.segment_headers(vec![segment_index]).await {
52 Ok(segment_headers) => segment_headers,
53 Err(error) => {
54 error!(
55 %piece_index,
56 ?error,
57 "Failed to retrieve segment headers from node"
58 );
59 return None;
60 }
61 };
62
63 let segment_commitment = match segment_headers.into_iter().next().flatten() {
64 Some(segment_header) => segment_header.segment_commitment(),
65 None => {
66 error!(
67 %piece_index,
68 %segment_index,
69 "Segment commitment for segment index wasn't found on node"
70 );
71 return None;
72 }
73 };
74
75 let is_valid_fut = tokio::task::spawn_blocking({
76 let kzg = self.kzg.clone();
77
78 move || {
79 is_piece_valid(&kzg, &piece, &segment_commitment, piece_index.position())
80 .then_some(piece)
81 }
82 });
83
84 match is_valid_fut.await.unwrap_or_default() {
85 Some(piece) => Some(piece),
86 None => {
87 warn!(
88 %piece_index,
89 %source_peer_id,
90 "Received invalid piece from peer"
91 );
92
93 let _ = self.dsn_node.ban_peer(source_peer_id).await;
95 None
96 }
97 }
98 }
99}