subspace_networking/protocols/request_response/handlers/
cached_piece_by_index.rs

1//! Helper for incoming cached piece requests.
2//!
3//! Request handler can be created with [`CachedPieceByIndexRequestHandler`].
4
5#[cfg(test)]
6mod tests;
7
8use crate::protocols::request_response::handlers::generic_request_handler::{
9    GenericRequest, GenericRequestHandler,
10};
11use derive_more::{Deref, DerefMut, From, Into};
12use libp2p::kad::K_VALUE;
13use libp2p::multiaddr::Protocol;
14use libp2p::{Multiaddr, PeerId};
15use multihash::Multihash;
16use parity_scale_codec::{Compact, CompactLen, Decode, Encode, EncodeLike, Input, Output};
17use std::sync::Arc;
18use subspace_core_primitives::pieces::{Piece, PieceIndex};
19
20/// Cached-piece-by-index request.
21///
22/// This is similar to `PieceByIndexRequest`, but will only respond with cached pieces.
23#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
24pub struct CachedPieceByIndexRequest {
25    /// Request key - piece index
26    pub piece_index: PieceIndex,
27    /// Additional pieces that requester is interested in if they are cached locally
28    // TODO: Use `Arc<[PieceIndex]>` once
29    //  https://github.com/paritytech/parity-scale-codec/issues/633 is resolved
30    pub cached_pieces: Arc<Vec<PieceIndex>>,
31}
32
33impl GenericRequest for CachedPieceByIndexRequest {
34    const PROTOCOL_NAME: &'static str = "/subspace/cached-piece-by-index/0.1.0";
35    const LOG_TARGET: &'static str = "cached-piece-by-index-request-response-handler";
36    type Response = CachedPieceByIndexResponse;
37}
38
39impl CachedPieceByIndexRequest {
40    /// Max number of cached piece indexes to accept per request, equals to the number of source shards in
41    /// a sector and fits nicely into a single TCP packet
42    pub const RECOMMENDED_LIMIT: usize = 128;
43}
44
45/// Closest peers
46#[derive(Debug, Default, PartialEq, Eq, Clone, From, Into, Deref, DerefMut)]
47pub struct ClosestPeers(Vec<(PeerId, Vec<Multiaddr>)>);
48
49impl Encode for ClosestPeers {
50    fn size_hint(&self) -> usize {
51        let mut size = Compact::compact_len(&(self.0.len() as u32));
52
53        for (peer_id, addresses) in &self.0 {
54            size += peer_id.as_ref().encoded_size();
55            size += Compact::compact_len(&(addresses.len() as u32));
56
57            for address in addresses {
58                size += address.as_ref().encoded_size();
59            }
60        }
61
62        size
63    }
64
65    fn encode_to<T: Output + ?Sized>(&self, dest: &mut T) {
66        Compact::from(self.0.len() as u32).encode_to(dest);
67
68        for (peer_id, addresses) in &self.0 {
69            peer_id.as_ref().encode_to(dest);
70            Compact::from(addresses.len() as u32).encode_to(dest);
71
72            for address in addresses {
73                address.as_ref().encode_to(dest);
74            }
75        }
76    }
77}
78
79impl EncodeLike for ClosestPeers {}
80
81impl Decode for ClosestPeers {
82    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
83        let mut closest_peers = Vec::with_capacity(K_VALUE.get());
84
85        let closest_peers_count = Compact::<u32>::decode(input)?.0 as usize;
86        for _ in 0..closest_peers_count {
87            let peer_id =
88                PeerId::from_multihash(Multihash::decode(input)?).map_err(|multihash| {
89                    parity_scale_codec::Error::from("Can't create `PeerId` from `Multihash`")
90                        .chain(format!("Code: {}", multihash.code()))
91                })?;
92            let p2p = Multiaddr::from(Protocol::P2p(peer_id));
93            let mut addresses = Vec::new();
94
95            let addresses_count = Compact::<u32>::decode(input)?.0 as usize;
96
97            if addresses_count == 0 {
98                return Err(parity_scale_codec::Error::from(
99                    "List of addresses must not be empty",
100                ));
101            }
102
103            for _ in 0..addresses_count {
104                let address = Multiaddr::try_from(Vec::<u8>::decode(input)?).map_err(|error| {
105                    parity_scale_codec::Error::from("Failed to decode `Multiaddr`")
106                        .chain(error.to_string())
107                })?;
108
109                if !address.ends_with(&p2p) {
110                    return Err(parity_scale_codec::Error::from(
111                        "`Multiaddr` doesn't end with correct p2p suffix",
112                    )
113                    .chain(format!("Address {address}, PeerId {p2p}")));
114                }
115
116                addresses.push(address);
117            }
118
119            closest_peers.push((peer_id, addresses));
120        }
121
122        Ok(Self(closest_peers))
123    }
124}
125
126/// Piece result contains either piece itself or the closest known peers to the piece index
127#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
128pub enum PieceResult {
129    /// Piece was cached locally
130    Piece(Piece),
131    /// Piece was not cached locally, but these are the closest known peers to the piece index
132    ClosestPeers(ClosestPeers),
133}
134
135/// Cached-piece-by-index response, may be cached piece or stored in one of the farms
136#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
137pub struct CachedPieceByIndexResponse {
138    /// Piece result
139    pub result: PieceResult,
140    /// Additional pieces that requester is interested in and are cached locally, order from request
141    /// is not preserved
142    pub cached_pieces: Vec<PieceIndex>,
143}
144
145/// Cached-piece-by-index request handler
146pub type CachedPieceByIndexRequestHandler = GenericRequestHandler<CachedPieceByIndexRequest>;