subspace_service/mmr/
request_handler.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Substrate.
3
4// Substrate is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Substrate is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Substrate.  If not, see <http://www.gnu.org/licenses/>.
16
17#[cfg(test)]
18mod tests;
19
20use crate::mmr::sync::decode_mmr_data;
21use crate::mmr::{get_offchain_key, get_temp_key};
22use crate::sync_from_dsn::LOG_TARGET;
23use futures::channel::oneshot;
24use futures::stream::StreamExt;
25use parity_scale_codec::{Decode, Encode};
26use sc_client_api::{BlockBackend, ProofProvider};
27use sc_network::config::ProtocolId;
28use sc_network::request_responses::{IncomingRequest, OutgoingResponse};
29use sc_network::{NetworkBackend, PeerId};
30use schnellru::{ByLength, LruMap};
31use sp_blockchain::HeaderBackend;
32use sp_core::offchain::storage::OffchainDb;
33use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
34use sp_mmr_primitives::utils::NodesUtils;
35use sp_runtime::traits::Block as BlockT;
36use std::collections::BTreeMap;
37use std::marker::PhantomData;
38use std::sync::Arc;
39use std::time::Duration;
40use subspace_core_primitives::BlockNumber;
41use subspace_runtime_primitives::BlockHashFor;
42use tracing::{debug, error, trace};
43
44const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
45
46/// Defines max items per request
47pub const MAX_MMR_ITEMS: u32 = 20000;
48
49mod rep {
50    use sc_network::ReputationChange as Rep;
51
52    /// Reputation change when a peer sent us the same request multiple times.
53    pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times");
54}
55
56/// Generates a `RequestResponseProtocolConfig` for the state request protocol, refusing incoming
57/// requests.
58pub fn generate_protocol_config<Hash: AsRef<[u8]>, B: BlockT, N: NetworkBackend<B, B::Hash>>(
59    _: &ProtocolId,
60    genesis_hash: Hash,
61    fork_id: Option<&str>,
62    inbound_queue: async_channel::Sender<IncomingRequest>,
63) -> N::RequestResponseProtocolConfig {
64    N::request_response_config(
65        generate_protocol_name(genesis_hash, fork_id).into(),
66        Vec::new(),
67        1024 * 1024,
68        16 * 1024 * 1024,
69        Duration::from_secs(40),
70        Some(inbound_queue),
71    )
72}
73
74/// Generate the state protocol name from the genesis hash and fork id.
75pub fn generate_protocol_name<Hash: AsRef<[u8]>>(
76    genesis_hash: Hash,
77    fork_id: Option<&str>,
78) -> String {
79    let genesis_hash = genesis_hash.as_ref();
80    if let Some(fork_id) = fork_id {
81        format!(
82            "/{}/{}/mmr/1",
83            array_bytes::bytes2hex("", genesis_hash),
84            fork_id
85        )
86    } else {
87        format!("/{}/mmr/1", array_bytes::bytes2hex("", genesis_hash))
88    }
89}
90
91fn leaf_index_that_added_node(position: BlockNumber) -> BlockNumber {
92    NodesUtils::leaf_index_that_added_node(position.into())
93        .try_into()
94        .expect("Always its into a block number; qed")
95}
96
97/// The key of [`BlockRequestHandler::seen_requests`].
98#[derive(Eq, PartialEq, Clone, Hash)]
99struct SeenRequestsKey {
100    peer: PeerId,
101    starting_position: u32,
102}
103
104/// Request MMR data from a peer.
105#[derive(Clone, PartialEq, Encode, Decode, Debug)]
106pub struct MmrRequest {
107    /// Starting position for MMR node.
108    pub starting_position: u32,
109    /// Max returned nodes.
110    pub limit: u32,
111}
112
113#[derive(Clone, PartialEq, Encode, Decode, Debug)]
114pub struct MmrResponse {
115    /// MMR-nodes related to node position
116    pub mmr_data: BTreeMap<u32, Vec<u8>>,
117}
118
119/// The value of [`StateRequestHandler::seen_requests`].
120enum SeenRequestsValue {
121    /// First time we have seen the request.
122    First,
123    /// We have fulfilled the request `n` times.
124    Fulfilled(usize),
125}
126
127/// Handler for incoming block requests from a remote peer.
128pub struct MmrRequestHandler<Block, OS, Client>
129where
130    Block: BlockT,
131{
132    request_receiver: async_channel::Receiver<IncomingRequest>,
133    /// Maps from request to number of times we have seen this request.
134    ///
135    /// This is used to check if a peer is spamming us with the same request.
136    seen_requests: LruMap<SeenRequestsKey, SeenRequestsValue>,
137
138    offchain_db: OffchainDb<OS>,
139
140    client: Arc<Client>,
141
142    _phantom: PhantomData<Block>,
143}
144
145impl<Block, OS, Client> MmrRequestHandler<Block, OS, Client>
146where
147    Block: BlockT<Hash = sp_core::H256>,
148    Client:
149        HeaderBackend<Block> + BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
150    OS: OffchainStorage,
151{
152    /// Create a new [`MmrRequestHandler`].
153    pub fn new<NB>(
154        protocol_id: &ProtocolId,
155        fork_id: Option<&str>,
156        client: Arc<Client>,
157        num_peer_hint: usize,
158        offchain_storage: OS,
159    ) -> (Self, NB::RequestResponseProtocolConfig)
160    where
161        NB: NetworkBackend<Block, BlockHashFor<Block>>,
162    {
163        // Reserve enough request slots for one request per peer when we are at the maximum
164        // number of peers.
165        let capacity = std::cmp::max(num_peer_hint, 1);
166        let (tx, request_receiver) = async_channel::bounded(capacity);
167
168        let protocol_config = generate_protocol_config::<_, Block, NB>(
169            protocol_id,
170            client
171                .block_hash(0u32.into())
172                .ok()
173                .flatten()
174                .expect("Genesis block exists; qed"),
175            fork_id,
176            tx,
177        );
178
179        let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
180        let seen_requests = LruMap::new(capacity);
181
182        (
183            Self {
184                client,
185                request_receiver,
186                seen_requests,
187                offchain_db: OffchainDb::new(offchain_storage),
188                _phantom: PhantomData,
189            },
190            protocol_config,
191        )
192    }
193
194    /// Run [`StateRequestHandler`].
195    pub async fn run(mut self) {
196        while let Some(request) = self.request_receiver.next().await {
197            let IncomingRequest {
198                peer,
199                payload,
200                pending_response,
201            } = request;
202
203            match self.handle_request(payload, pending_response, &peer) {
204                Ok(()) => debug!(target: LOG_TARGET, "Handled MMR request from {}.", peer),
205                Err(e) => {
206                    error!(target: LOG_TARGET, "Failed to handle MMR request from {}: {}", peer, e,)
207                }
208            }
209        }
210    }
211
212    fn handle_request(
213        &mut self,
214        payload: Vec<u8>,
215        pending_response: oneshot::Sender<OutgoingResponse>,
216        peer: &PeerId,
217    ) -> Result<(), HandleRequestError> {
218        let request = MmrRequest::decode(&mut payload.as_slice())?;
219
220        let key = SeenRequestsKey {
221            peer: *peer,
222            starting_position: request.starting_position,
223        };
224
225        let mut reputation_changes = Vec::new();
226
227        match self.seen_requests.get(&key) {
228            Some(SeenRequestsValue::First) => {}
229            Some(SeenRequestsValue::Fulfilled(ref mut requests)) => {
230                *requests = requests.saturating_add(1);
231
232                if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
233                    reputation_changes.push(rep::SAME_REQUEST);
234                }
235            }
236            None => {
237                self.seen_requests
238                    .insert(key.clone(), SeenRequestsValue::First);
239            }
240        }
241
242        trace!(target: LOG_TARGET, "Handle MMR request: {peer}, request: {request:?}",);
243
244        let result = if request.limit > MAX_MMR_ITEMS {
245            error!(
246                target: LOG_TARGET,
247                "Invalid MMR request from peer={peer}: {:?}",
248                HandleRequestError::MaxItemsLimitExceeded
249            );
250
251            Err(())
252        } else {
253            let mut mmr_data = BTreeMap::new();
254            for position in request.starting_position..(request.starting_position + request.limit) {
255                let canon_key = get_offchain_key(position.into());
256                let storage_value = self
257                    .offchain_db
258                    .local_storage_get(StorageKind::PERSISTENT, &canon_key);
259
260                let block_number = leaf_index_that_added_node(position);
261                trace!(target: LOG_TARGET, %position, %block_number, "Storage data present: {}", storage_value.is_some());
262
263                if let Some(storage_value) = storage_value {
264                    mmr_data.insert(position, storage_value);
265                } else {
266                    if let Ok(Some(hash)) = self.client.hash(block_number.into()) {
267                        let temp_key = get_temp_key(position.into(), hash);
268                        let storage_value = self
269                            .offchain_db
270                            .local_storage_get(StorageKind::PERSISTENT, &temp_key);
271
272                        if let Some(storage_value) = storage_value {
273                            let data = decode_mmr_data(&storage_value);
274                            trace!(target: LOG_TARGET, %position, %block_number,"MMR node: {data:?}");
275                            mmr_data.insert(position, storage_value);
276                            continue;
277                        } else {
278                            debug!(target: LOG_TARGET, %position, %block_number, ?hash, "Didn't find value in storage.")
279                        }
280                    } else {
281                        debug!(target: LOG_TARGET, %position, %block_number, "Didn't find hash.")
282                    }
283                    break; // No more storage values
284                }
285            }
286
287            if let Some(value) = self.seen_requests.get(&key) {
288                // If this is the first time we have processed this request, we need to change
289                // it to `Fulfilled`.
290                if let SeenRequestsValue::First = value {
291                    *value = SeenRequestsValue::Fulfilled(1);
292                }
293            }
294
295            let response = MmrResponse { mmr_data };
296
297            Ok(response.encode())
298        };
299
300        pending_response
301            .send(OutgoingResponse {
302                result,
303                reputation_changes,
304                sent_feedback: None,
305            })
306            .map_err(|_| HandleRequestError::SendResponse)
307    }
308}
309
310#[derive(Debug, thiserror::Error)]
311enum HandleRequestError {
312    #[error("Invalid request: max MMR nodes limit exceeded.")]
313    MaxItemsLimitExceeded,
314
315    #[error(transparent)]
316    Client(#[from] sp_blockchain::Error),
317
318    #[error("Failed to send response.")]
319    SendResponse,
320
321    #[error("Failed to decode request: {0}.")]
322    Decode(#[from] parity_scale_codec::Error),
323}