1#[cfg(test)]
18mod tests;
19
20use crate::mmr::sync::decode_mmr_data;
21use crate::mmr::{get_offchain_key, get_temp_key};
22use futures::channel::oneshot;
23use futures::stream::StreamExt;
24use parity_scale_codec::{Decode, Encode};
25use sc_client_api::{BlockBackend, ProofProvider};
26use sc_network::config::ProtocolId;
27use sc_network::request_responses::{IncomingRequest, OutgoingResponse};
28use sc_network::{NetworkBackend, PeerId};
29use schnellru::{ByLength, LruMap};
30use sp_blockchain::HeaderBackend;
31use sp_core::offchain::storage::OffchainDb;
32use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
33use sp_mmr_primitives::utils::NodesUtils;
34use sp_runtime::traits::Block as BlockT;
35use std::collections::BTreeMap;
36use std::marker::PhantomData;
37use std::sync::Arc;
38use std::time::Duration;
39use subspace_core_primitives::BlockNumber;
40use subspace_runtime_primitives::BlockHashFor;
41use tracing::{debug, error, trace};
42
43const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
44
45pub const MAX_MMR_ITEMS: u32 = 20000;
47
48mod rep {
49 use sc_network::ReputationChange as Rep;
50
51 pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times");
53}
54
55pub fn generate_protocol_config<Hash: AsRef<[u8]>, B: BlockT, N: NetworkBackend<B, B::Hash>>(
58 _: &ProtocolId,
59 genesis_hash: Hash,
60 fork_id: Option<&str>,
61 inbound_queue: async_channel::Sender<IncomingRequest>,
62) -> N::RequestResponseProtocolConfig {
63 N::request_response_config(
64 generate_protocol_name(genesis_hash, fork_id).into(),
65 Vec::new(),
66 1024 * 1024,
67 16 * 1024 * 1024,
68 Duration::from_secs(40),
69 Some(inbound_queue),
70 )
71}
72
73pub fn generate_protocol_name<Hash: AsRef<[u8]>>(
75 genesis_hash: Hash,
76 fork_id: Option<&str>,
77) -> String {
78 let genesis_hash = genesis_hash.as_ref();
79 if let Some(fork_id) = fork_id {
80 format!(
81 "/{}/{}/mmr/1",
82 array_bytes::bytes2hex("", genesis_hash),
83 fork_id
84 )
85 } else {
86 format!("/{}/mmr/1", array_bytes::bytes2hex("", genesis_hash))
87 }
88}
89
90fn leaf_index_that_added_node(position: BlockNumber) -> BlockNumber {
91 NodesUtils::leaf_index_that_added_node(position.into())
92 .try_into()
93 .expect("Always its into a block number; qed")
94}
95
96#[derive(Eq, PartialEq, Clone, Hash)]
98struct SeenRequestsKey {
99 peer: PeerId,
100 starting_position: u32,
101}
102
103#[derive(Clone, PartialEq, Encode, Decode, Debug)]
105pub struct MmrRequest {
106 pub starting_position: u32,
108 pub limit: u32,
110}
111
112#[derive(Clone, PartialEq, Encode, Decode, Debug)]
113pub struct MmrResponse {
114 pub mmr_data: BTreeMap<u32, Vec<u8>>,
116}
117
118enum SeenRequestsValue {
120 First,
122 Fulfilled(usize),
124}
125
126pub struct MmrRequestHandler<Block, OS, Client>
128where
129 Block: BlockT,
130{
131 request_receiver: async_channel::Receiver<IncomingRequest>,
132 seen_requests: LruMap<SeenRequestsKey, SeenRequestsValue>,
136
137 offchain_db: OffchainDb<OS>,
138
139 client: Arc<Client>,
140
141 _phantom: PhantomData<Block>,
142}
143
144impl<Block, OS, Client> MmrRequestHandler<Block, OS, Client>
145where
146 Block: BlockT<Hash = sp_core::H256>,
147 Client:
148 HeaderBackend<Block> + BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
149 OS: OffchainStorage,
150{
151 pub fn new<NB>(
153 protocol_id: &ProtocolId,
154 fork_id: Option<&str>,
155 client: Arc<Client>,
156 num_peer_hint: usize,
157 offchain_storage: OS,
158 ) -> (Self, NB::RequestResponseProtocolConfig)
159 where
160 NB: NetworkBackend<Block, BlockHashFor<Block>>,
161 {
162 let capacity = std::cmp::max(num_peer_hint, 1);
165 let (tx, request_receiver) = async_channel::bounded(capacity);
166
167 let protocol_config = generate_protocol_config::<_, Block, NB>(
168 protocol_id,
169 client
170 .block_hash(0u32.into())
171 .ok()
172 .flatten()
173 .expect("Genesis block exists; qed"),
174 fork_id,
175 tx,
176 );
177
178 let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
179 let seen_requests = LruMap::new(capacity);
180
181 (
182 Self {
183 client,
184 request_receiver,
185 seen_requests,
186 offchain_db: OffchainDb::new(offchain_storage),
187 _phantom: PhantomData,
188 },
189 protocol_config,
190 )
191 }
192
193 pub async fn run(mut self) {
195 while let Some(request) = self.request_receiver.next().await {
196 let IncomingRequest {
197 peer,
198 payload,
199 pending_response,
200 } = request;
201
202 match self.handle_request(payload, pending_response, &peer) {
203 Ok(()) => debug!("Handled MMR request from {}.", peer),
204 Err(e) => {
205 error!("Failed to handle MMR request from {}: {}", peer, e,)
206 }
207 }
208 }
209 }
210
211 fn handle_request(
212 &mut self,
213 payload: Vec<u8>,
214 pending_response: oneshot::Sender<OutgoingResponse>,
215 peer: &PeerId,
216 ) -> Result<(), HandleRequestError> {
217 let request = MmrRequest::decode(&mut payload.as_slice())?;
218
219 let key = SeenRequestsKey {
220 peer: *peer,
221 starting_position: request.starting_position,
222 };
223
224 let mut reputation_changes = Vec::new();
225
226 match self.seen_requests.get(&key) {
227 Some(SeenRequestsValue::First) => {}
228 Some(SeenRequestsValue::Fulfilled(requests)) => {
229 *requests = requests.saturating_add(1);
230
231 if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
232 reputation_changes.push(rep::SAME_REQUEST);
233 }
234 }
235 None => {
236 self.seen_requests
237 .insert(key.clone(), SeenRequestsValue::First);
238 }
239 }
240
241 trace!("Handle MMR request: {peer}, request: {request:?}",);
242
243 let result = if request.limit > MAX_MMR_ITEMS {
244 error!(
245 "Invalid MMR request from peer={peer}: {:?}",
246 HandleRequestError::MaxItemsLimitExceeded
247 );
248
249 Err(())
250 } else {
251 let mut mmr_data = BTreeMap::new();
252 for position in request.starting_position..(request.starting_position + request.limit) {
253 let canon_key = get_offchain_key(position.into());
254 let storage_value = self
255 .offchain_db
256 .local_storage_get(StorageKind::PERSISTENT, &canon_key);
257
258 let block_number = leaf_index_that_added_node(position);
259 trace!( %position, %block_number, "Storage data present: {}", storage_value.is_some());
260
261 if let Some(storage_value) = storage_value {
262 mmr_data.insert(position, storage_value);
263 } else {
264 if let Ok(Some(hash)) = self.client.hash(block_number.into()) {
265 let temp_key = get_temp_key(position.into(), hash);
266 let storage_value = self
267 .offchain_db
268 .local_storage_get(StorageKind::PERSISTENT, &temp_key);
269
270 if let Some(storage_value) = storage_value {
271 let data = decode_mmr_data(&storage_value);
272 trace!( %position, %block_number,"MMR node: {data:?}");
273 mmr_data.insert(position, storage_value);
274 continue;
275 } else {
276 debug!( %position, %block_number, ?hash, "Didn't find value in storage.")
277 }
278 } else {
279 debug!( %position, %block_number, "Didn't find hash.")
280 }
281 break; }
283 }
284
285 if let Some(value) = self.seen_requests.get(&key) {
286 if let SeenRequestsValue::First = value {
289 *value = SeenRequestsValue::Fulfilled(1);
290 }
291 }
292
293 let response = MmrResponse { mmr_data };
294
295 Ok(response.encode())
296 };
297
298 pending_response
299 .send(OutgoingResponse {
300 result,
301 reputation_changes,
302 sent_feedback: None,
303 })
304 .map_err(|_| HandleRequestError::SendResponse)
305 }
306}
307
308#[derive(Debug, thiserror::Error)]
309enum HandleRequestError {
310 #[error("Invalid request: max MMR nodes limit exceeded.")]
311 MaxItemsLimitExceeded,
312
313 #[error(transparent)]
314 Client(#[from] sp_blockchain::Error),
315
316 #[error("Failed to send response.")]
317 SendResponse,
318
319 #[error("Failed to decode request: {0}.")]
320 Decode(#[from] parity_scale_codec::Error),
321}