subspace_service/mmr/
request_handler.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Substrate.
3
4// Substrate is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Substrate is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
16
17#[cfg(test)]
18mod tests;
19
20use crate::mmr::sync::decode_mmr_data;
21use crate::mmr::{get_offchain_key, get_temp_key};
22use futures::channel::oneshot;
23use futures::stream::StreamExt;
24use parity_scale_codec::{Decode, Encode};
25use sc_client_api::{BlockBackend, ProofProvider};
26use sc_network::config::ProtocolId;
27use sc_network::request_responses::{IncomingRequest, OutgoingResponse};
28use sc_network::{NetworkBackend, PeerId};
29use schnellru::{ByLength, LruMap};
30use sp_blockchain::HeaderBackend;
31use sp_core::offchain::storage::OffchainDb;
32use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
33use sp_mmr_primitives::utils::NodesUtils;
34use sp_runtime::traits::Block as BlockT;
35use std::collections::BTreeMap;
36use std::marker::PhantomData;
37use std::sync::Arc;
38use std::time::Duration;
39use subspace_core_primitives::BlockNumber;
40use subspace_runtime_primitives::BlockHashFor;
41use tracing::{debug, error, trace};
42
43const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
44
45/// Defines max items per request
46pub const MAX_MMR_ITEMS: u32 = 20000;
47
48mod rep {
49    use sc_network::ReputationChange as Rep;
50
51    /// Reputation change when a peer sent us the same request multiple times.
52    pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times");
53}
54
55/// Generates a `RequestResponseProtocolConfig` for the state request protocol, refusing incoming
56/// requests.
57pub fn generate_protocol_config<Hash: AsRef<[u8]>, B: BlockT, N: NetworkBackend<B, B::Hash>>(
58    _: &ProtocolId,
59    genesis_hash: Hash,
60    fork_id: Option<&str>,
61    inbound_queue: async_channel::Sender<IncomingRequest>,
62) -> N::RequestResponseProtocolConfig {
63    N::request_response_config(
64        generate_protocol_name(genesis_hash, fork_id).into(),
65        Vec::new(),
66        1024 * 1024,
67        16 * 1024 * 1024,
68        Duration::from_secs(40),
69        Some(inbound_queue),
70    )
71}
72
73/// Generate the state protocol name from the genesis hash and fork id.
74pub fn generate_protocol_name<Hash: AsRef<[u8]>>(
75    genesis_hash: Hash,
76    fork_id: Option<&str>,
77) -> String {
78    let genesis_hash = genesis_hash.as_ref();
79    if let Some(fork_id) = fork_id {
80        format!(
81            "/{}/{}/mmr/1",
82            array_bytes::bytes2hex("", genesis_hash),
83            fork_id
84        )
85    } else {
86        format!("/{}/mmr/1", array_bytes::bytes2hex("", genesis_hash))
87    }
88}
89
90fn leaf_index_that_added_node(position: BlockNumber) -> BlockNumber {
91    NodesUtils::leaf_index_that_added_node(position.into())
92        .try_into()
93        .expect("Always its into a block number; qed")
94}
95
96/// The key of [`BlockRequestHandler::seen_requests`].
97#[derive(Eq, PartialEq, Clone, Hash)]
98struct SeenRequestsKey {
99    peer: PeerId,
100    starting_position: u32,
101}
102
103/// Request MMR data from a peer.
104#[derive(Clone, PartialEq, Encode, Decode, Debug)]
105pub struct MmrRequest {
106    /// Starting position for MMR node.
107    pub starting_position: u32,
108    /// Max returned nodes.
109    pub limit: u32,
110}
111
112#[derive(Clone, PartialEq, Encode, Decode, Debug)]
113pub struct MmrResponse {
114    /// MMR-nodes related to node position
115    pub mmr_data: BTreeMap<u32, Vec<u8>>,
116}
117
118/// The value of [`StateRequestHandler::seen_requests`].
119enum SeenRequestsValue {
120    /// First time we have seen the request.
121    First,
122    /// We have fulfilled the request `n` times.
123    Fulfilled(usize),
124}
125
126/// Handler for incoming block requests from a remote peer.
127pub struct MmrRequestHandler<Block, OS, Client>
128where
129    Block: BlockT,
130{
131    request_receiver: async_channel::Receiver<IncomingRequest>,
132    /// Maps from request to number of times we have seen this request.
133    ///
134    /// This is used to check if a peer is spamming us with the same request.
135    seen_requests: LruMap<SeenRequestsKey, SeenRequestsValue>,
136
137    offchain_db: OffchainDb<OS>,
138
139    client: Arc<Client>,
140
141    _phantom: PhantomData<Block>,
142}
143
144impl<Block, OS, Client> MmrRequestHandler<Block, OS, Client>
145where
146    Block: BlockT<Hash = sp_core::H256>,
147    Client:
148        HeaderBackend<Block> + BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
149    OS: OffchainStorage,
150{
151    /// Create a new [`MmrRequestHandler`].
152    pub fn new<NB>(
153        protocol_id: &ProtocolId,
154        fork_id: Option<&str>,
155        client: Arc<Client>,
156        num_peer_hint: usize,
157        offchain_storage: OS,
158    ) -> (Self, NB::RequestResponseProtocolConfig)
159    where
160        NB: NetworkBackend<Block, BlockHashFor<Block>>,
161    {
162        // Reserve enough request slots for one request per peer when we are at the maximum
163        // number of peers.
164        let capacity = std::cmp::max(num_peer_hint, 1);
165        let (tx, request_receiver) = async_channel::bounded(capacity);
166
167        let protocol_config = generate_protocol_config::<_, Block, NB>(
168            protocol_id,
169            client
170                .block_hash(0u32.into())
171                .ok()
172                .flatten()
173                .expect("Genesis block exists; qed"),
174            fork_id,
175            tx,
176        );
177
178        let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
179        let seen_requests = LruMap::new(capacity);
180
181        (
182            Self {
183                client,
184                request_receiver,
185                seen_requests,
186                offchain_db: OffchainDb::new(offchain_storage),
187                _phantom: PhantomData,
188            },
189            protocol_config,
190        )
191    }
192
193    /// Run [`StateRequestHandler`].
194    pub async fn run(mut self) {
195        while let Some(request) = self.request_receiver.next().await {
196            let IncomingRequest {
197                peer,
198                payload,
199                pending_response,
200            } = request;
201
202            match self.handle_request(payload, pending_response, &peer) {
203                Ok(()) => debug!("Handled MMR request from {}.", peer),
204                Err(e) => {
205                    error!("Failed to handle MMR request from {}: {}", peer, e,)
206                }
207            }
208        }
209    }
210
211    fn handle_request(
212        &mut self,
213        payload: Vec<u8>,
214        pending_response: oneshot::Sender<OutgoingResponse>,
215        peer: &PeerId,
216    ) -> Result<(), HandleRequestError> {
217        let request = MmrRequest::decode(&mut payload.as_slice())?;
218
219        let key = SeenRequestsKey {
220            peer: *peer,
221            starting_position: request.starting_position,
222        };
223
224        let mut reputation_changes = Vec::new();
225
226        match self.seen_requests.get(&key) {
227            Some(SeenRequestsValue::First) => {}
228            Some(SeenRequestsValue::Fulfilled(requests)) => {
229                *requests = requests.saturating_add(1);
230
231                if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
232                    reputation_changes.push(rep::SAME_REQUEST);
233                }
234            }
235            None => {
236                self.seen_requests
237                    .insert(key.clone(), SeenRequestsValue::First);
238            }
239        }
240
241        trace!("Handle MMR request: {peer}, request: {request:?}",);
242
243        let result = if request.limit > MAX_MMR_ITEMS {
244            error!(
245                "Invalid MMR request from peer={peer}: {:?}",
246                HandleRequestError::MaxItemsLimitExceeded
247            );
248
249            Err(())
250        } else {
251            let mut mmr_data = BTreeMap::new();
252            for position in request.starting_position..(request.starting_position + request.limit) {
253                let canon_key = get_offchain_key(position.into());
254                let storage_value = self
255                    .offchain_db
256                    .local_storage_get(StorageKind::PERSISTENT, &canon_key);
257
258                let block_number = leaf_index_that_added_node(position);
259                trace!( %position, %block_number, "Storage data present: {}", storage_value.is_some());
260
261                if let Some(storage_value) = storage_value {
262                    mmr_data.insert(position, storage_value);
263                } else {
264                    if let Ok(Some(hash)) = self.client.hash(block_number.into()) {
265                        let temp_key = get_temp_key(position.into(), hash);
266                        let storage_value = self
267                            .offchain_db
268                            .local_storage_get(StorageKind::PERSISTENT, &temp_key);
269
270                        if let Some(storage_value) = storage_value {
271                            let data = decode_mmr_data(&storage_value);
272                            trace!( %position, %block_number,"MMR node: {data:?}");
273                            mmr_data.insert(position, storage_value);
274                            continue;
275                        } else {
276                            debug!( %position, %block_number, ?hash, "Didn't find value in storage.")
277                        }
278                    } else {
279                        debug!( %position, %block_number, "Didn't find hash.")
280                    }
281                    break; // No more storage values
282                }
283            }
284
285            if let Some(value) = self.seen_requests.get(&key) {
286                // If this is the first time we have processed this request, we need to change
287                // it to `Fulfilled`.
288                if let SeenRequestsValue::First = value {
289                    *value = SeenRequestsValue::Fulfilled(1);
290                }
291            }
292
293            let response = MmrResponse { mmr_data };
294
295            Ok(response.encode())
296        };
297
298        pending_response
299            .send(OutgoingResponse {
300                result,
301                reputation_changes,
302                sent_feedback: None,
303            })
304            .map_err(|_| HandleRequestError::SendResponse)
305    }
306}
307
308#[derive(Debug, thiserror::Error)]
309enum HandleRequestError {
310    #[error("Invalid request: max MMR nodes limit exceeded.")]
311    MaxItemsLimitExceeded,
312
313    #[error(transparent)]
314    Client(#[from] sp_blockchain::Error),
315
316    #[error("Failed to send response.")]
317    SendResponse,
318
319    #[error("Failed to decode request: {0}.")]
320    Decode(#[from] parity_scale_codec::Error),
321}