subspace_networking/protocols/request_response/handlers/
cached_piece_by_index.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//! Helper for incoming cached piece requests.
//!
//! Request handler can be created with [`CachedPieceByIndexRequestHandler`].

#[cfg(test)]
mod tests;

use crate::protocols::request_response::handlers::generic_request_handler::{
    GenericRequest, GenericRequestHandler,
};
use derive_more::{Deref, DerefMut, From, Into};
use libp2p::kad::K_VALUE;
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
use multihash::Multihash;
use parity_scale_codec::{Compact, CompactLen, Decode, Encode, EncodeLike, Input, Output};
use std::sync::Arc;
use subspace_core_primitives::pieces::{Piece, PieceIndex};

/// Cached-piece-by-index request.
///
/// This is similar to `PieceByIndexRequest`, but will only respond with cached pieces.
#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
pub struct CachedPieceByIndexRequest {
    /// Request key - piece index
    pub piece_index: PieceIndex,
    /// Additional pieces that requester is interested in if they are cached locally
    // TODO: Use `Arc<[PieceIndex]>` once
    //  https://github.com/paritytech/parity-scale-codec/issues/633 is resolved
    pub cached_pieces: Arc<Vec<PieceIndex>>,
}

impl GenericRequest for CachedPieceByIndexRequest {
    const PROTOCOL_NAME: &'static str = "/subspace/cached-piece-by-index/0.1.0";
    const LOG_TARGET: &'static str = "cached-piece-by-index-request-response-handler";
    type Response = CachedPieceByIndexResponse;
}

impl CachedPieceByIndexRequest {
    /// Max number of cached piece indexes to accept per request, equals to the number of source shards in
    /// a sector and fits nicely into a single TCP packet
    pub const RECOMMENDED_LIMIT: usize = 128;
}

/// Closest peers
#[derive(Debug, Default, PartialEq, Eq, Clone, From, Into, Deref, DerefMut)]
pub struct ClosestPeers(Vec<(PeerId, Vec<Multiaddr>)>);

impl Encode for ClosestPeers {
    fn size_hint(&self) -> usize {
        let mut size = Compact::compact_len(&(self.0.len() as u32));

        for (peer_id, addresses) in &self.0 {
            size += peer_id.as_ref().encoded_size();
            size += Compact::compact_len(&(addresses.len() as u32));

            for address in addresses {
                size += address.as_ref().encoded_size();
            }
        }

        size
    }

    fn encode_to<T: Output + ?Sized>(&self, dest: &mut T) {
        Compact::from(self.0.len() as u32).encode_to(dest);

        for (peer_id, addresses) in &self.0 {
            peer_id.as_ref().encode_to(dest);
            Compact::from(addresses.len() as u32).encode_to(dest);

            for address in addresses {
                address.as_ref().encode_to(dest);
            }
        }
    }
}

impl EncodeLike for ClosestPeers {}

impl Decode for ClosestPeers {
    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
        let mut closest_peers = Vec::with_capacity(K_VALUE.get());

        let closest_peers_count = Compact::<u32>::decode(input)?.0 as usize;
        for _ in 0..closest_peers_count {
            let peer_id =
                PeerId::from_multihash(Multihash::decode(input)?).map_err(|multihash| {
                    parity_scale_codec::Error::from("Can't create `PeerId` from `Multihash`")
                        .chain(format!("Code: {}", multihash.code()))
                })?;
            let p2p = Multiaddr::from(Protocol::P2p(peer_id));
            let mut addresses = Vec::new();

            let addresses_count = Compact::<u32>::decode(input)?.0 as usize;

            if addresses_count == 0 {
                return Err(parity_scale_codec::Error::from(
                    "List of addresses must not be empty",
                ));
            }

            for _ in 0..addresses_count {
                let address = Multiaddr::try_from(Vec::<u8>::decode(input)?).map_err(|error| {
                    parity_scale_codec::Error::from("Failed to decode `Multiaddr`")
                        .chain(error.to_string())
                })?;

                if !address.ends_with(&p2p) {
                    return Err(parity_scale_codec::Error::from(
                        "`Multiaddr` doesn't end with correct p2p suffix",
                    )
                    .chain(format!("Address {address}, PeerId {p2p}")));
                }

                addresses.push(address);
            }

            closest_peers.push((peer_id, addresses));
        }

        Ok(Self(closest_peers))
    }
}

/// Piece result contains either piece itself or the closest known peers to the piece index
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum PieceResult {
    /// Piece was cached locally
    Piece(Piece),
    /// Piece was not cached locally, but these are the closest known peers to the piece index
    ClosestPeers(ClosestPeers),
}

/// Cached-piece-by-index response, may be cached piece or stored in one of the farms
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct CachedPieceByIndexResponse {
    /// Piece result
    pub result: PieceResult,
    /// Additional pieces that requester is interested in and are cached locally, order from request
    /// is not preserved
    pub cached_pieces: Vec<PieceIndex>,
}

/// Cached-piece-by-index request handler
pub type CachedPieceByIndexRequestHandler = GenericRequestHandler<CachedPieceByIndexRequest>;