1#[cfg(test)]
18mod tests;
19
20use crate::mmr::sync::decode_mmr_data;
21use crate::mmr::{get_offchain_key, get_temp_key};
22use crate::sync_from_dsn::LOG_TARGET;
23use futures::channel::oneshot;
24use futures::stream::StreamExt;
25use parity_scale_codec::{Decode, Encode};
26use sc_client_api::{BlockBackend, ProofProvider};
27use sc_network::config::ProtocolId;
28use sc_network::request_responses::{IncomingRequest, OutgoingResponse};
29use sc_network::{NetworkBackend, PeerId};
30use schnellru::{ByLength, LruMap};
31use sp_blockchain::HeaderBackend;
32use sp_core::offchain::storage::OffchainDb;
33use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
34use sp_mmr_primitives::utils::NodesUtils;
35use sp_runtime::traits::Block as BlockT;
36use std::collections::BTreeMap;
37use std::marker::PhantomData;
38use std::sync::Arc;
39use std::time::Duration;
40use subspace_core_primitives::BlockNumber;
41use subspace_runtime_primitives::BlockHashFor;
42use tracing::{debug, error, trace};
43
44const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
45
46pub const MAX_MMR_ITEMS: u32 = 20000;
48
49mod rep {
50 use sc_network::ReputationChange as Rep;
51
52 pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times");
54}
55
56pub fn generate_protocol_config<Hash: AsRef<[u8]>, B: BlockT, N: NetworkBackend<B, B::Hash>>(
59 _: &ProtocolId,
60 genesis_hash: Hash,
61 fork_id: Option<&str>,
62 inbound_queue: async_channel::Sender<IncomingRequest>,
63) -> N::RequestResponseProtocolConfig {
64 N::request_response_config(
65 generate_protocol_name(genesis_hash, fork_id).into(),
66 Vec::new(),
67 1024 * 1024,
68 16 * 1024 * 1024,
69 Duration::from_secs(40),
70 Some(inbound_queue),
71 )
72}
73
74pub fn generate_protocol_name<Hash: AsRef<[u8]>>(
76 genesis_hash: Hash,
77 fork_id: Option<&str>,
78) -> String {
79 let genesis_hash = genesis_hash.as_ref();
80 if let Some(fork_id) = fork_id {
81 format!(
82 "/{}/{}/mmr/1",
83 array_bytes::bytes2hex("", genesis_hash),
84 fork_id
85 )
86 } else {
87 format!("/{}/mmr/1", array_bytes::bytes2hex("", genesis_hash))
88 }
89}
90
91fn leaf_index_that_added_node(position: BlockNumber) -> BlockNumber {
92 NodesUtils::leaf_index_that_added_node(position.into())
93 .try_into()
94 .expect("Always its into a block number; qed")
95}
96
97#[derive(Eq, PartialEq, Clone, Hash)]
99struct SeenRequestsKey {
100 peer: PeerId,
101 starting_position: u32,
102}
103
104#[derive(Clone, PartialEq, Encode, Decode, Debug)]
106pub struct MmrRequest {
107 pub starting_position: u32,
109 pub limit: u32,
111}
112
113#[derive(Clone, PartialEq, Encode, Decode, Debug)]
114pub struct MmrResponse {
115 pub mmr_data: BTreeMap<u32, Vec<u8>>,
117}
118
119enum SeenRequestsValue {
121 First,
123 Fulfilled(usize),
125}
126
127pub struct MmrRequestHandler<Block, OS, Client>
129where
130 Block: BlockT,
131{
132 request_receiver: async_channel::Receiver<IncomingRequest>,
133 seen_requests: LruMap<SeenRequestsKey, SeenRequestsValue>,
137
138 offchain_db: OffchainDb<OS>,
139
140 client: Arc<Client>,
141
142 _phantom: PhantomData<Block>,
143}
144
145impl<Block, OS, Client> MmrRequestHandler<Block, OS, Client>
146where
147 Block: BlockT<Hash = sp_core::H256>,
148 Client:
149 HeaderBackend<Block> + BlockBackend<Block> + ProofProvider<Block> + Send + Sync + 'static,
150 OS: OffchainStorage,
151{
152 pub fn new<NB>(
154 protocol_id: &ProtocolId,
155 fork_id: Option<&str>,
156 client: Arc<Client>,
157 num_peer_hint: usize,
158 offchain_storage: OS,
159 ) -> (Self, NB::RequestResponseProtocolConfig)
160 where
161 NB: NetworkBackend<Block, BlockHashFor<Block>>,
162 {
163 let capacity = std::cmp::max(num_peer_hint, 1);
166 let (tx, request_receiver) = async_channel::bounded(capacity);
167
168 let protocol_config = generate_protocol_config::<_, Block, NB>(
169 protocol_id,
170 client
171 .block_hash(0u32.into())
172 .ok()
173 .flatten()
174 .expect("Genesis block exists; qed"),
175 fork_id,
176 tx,
177 );
178
179 let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
180 let seen_requests = LruMap::new(capacity);
181
182 (
183 Self {
184 client,
185 request_receiver,
186 seen_requests,
187 offchain_db: OffchainDb::new(offchain_storage),
188 _phantom: PhantomData,
189 },
190 protocol_config,
191 )
192 }
193
194 pub async fn run(mut self) {
196 while let Some(request) = self.request_receiver.next().await {
197 let IncomingRequest {
198 peer,
199 payload,
200 pending_response,
201 } = request;
202
203 match self.handle_request(payload, pending_response, &peer) {
204 Ok(()) => debug!(target: LOG_TARGET, "Handled MMR request from {}.", peer),
205 Err(e) => {
206 error!(target: LOG_TARGET, "Failed to handle MMR request from {}: {}", peer, e,)
207 }
208 }
209 }
210 }
211
212 fn handle_request(
213 &mut self,
214 payload: Vec<u8>,
215 pending_response: oneshot::Sender<OutgoingResponse>,
216 peer: &PeerId,
217 ) -> Result<(), HandleRequestError> {
218 let request = MmrRequest::decode(&mut payload.as_slice())?;
219
220 let key = SeenRequestsKey {
221 peer: *peer,
222 starting_position: request.starting_position,
223 };
224
225 let mut reputation_changes = Vec::new();
226
227 match self.seen_requests.get(&key) {
228 Some(SeenRequestsValue::First) => {}
229 Some(SeenRequestsValue::Fulfilled(ref mut requests)) => {
230 *requests = requests.saturating_add(1);
231
232 if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
233 reputation_changes.push(rep::SAME_REQUEST);
234 }
235 }
236 None => {
237 self.seen_requests
238 .insert(key.clone(), SeenRequestsValue::First);
239 }
240 }
241
242 trace!(target: LOG_TARGET, "Handle MMR request: {peer}, request: {request:?}",);
243
244 let result = if request.limit > MAX_MMR_ITEMS {
245 error!(
246 target: LOG_TARGET,
247 "Invalid MMR request from peer={peer}: {:?}",
248 HandleRequestError::MaxItemsLimitExceeded
249 );
250
251 Err(())
252 } else {
253 let mut mmr_data = BTreeMap::new();
254 for position in request.starting_position..(request.starting_position + request.limit) {
255 let canon_key = get_offchain_key(position.into());
256 let storage_value = self
257 .offchain_db
258 .local_storage_get(StorageKind::PERSISTENT, &canon_key);
259
260 let block_number = leaf_index_that_added_node(position);
261 trace!(target: LOG_TARGET, %position, %block_number, "Storage data present: {}", storage_value.is_some());
262
263 if let Some(storage_value) = storage_value {
264 mmr_data.insert(position, storage_value);
265 } else {
266 if let Ok(Some(hash)) = self.client.hash(block_number.into()) {
267 let temp_key = get_temp_key(position.into(), hash);
268 let storage_value = self
269 .offchain_db
270 .local_storage_get(StorageKind::PERSISTENT, &temp_key);
271
272 if let Some(storage_value) = storage_value {
273 let data = decode_mmr_data(&storage_value);
274 trace!(target: LOG_TARGET, %position, %block_number,"MMR node: {data:?}");
275 mmr_data.insert(position, storage_value);
276 continue;
277 } else {
278 debug!(target: LOG_TARGET, %position, %block_number, ?hash, "Didn't find value in storage.")
279 }
280 } else {
281 debug!(target: LOG_TARGET, %position, %block_number, "Didn't find hash.")
282 }
283 break; }
285 }
286
287 if let Some(value) = self.seen_requests.get(&key) {
288 if let SeenRequestsValue::First = value {
291 *value = SeenRequestsValue::Fulfilled(1);
292 }
293 }
294
295 let response = MmrResponse { mmr_data };
296
297 Ok(response.encode())
298 };
299
300 pending_response
301 .send(OutgoingResponse {
302 result,
303 reputation_changes,
304 sent_feedback: None,
305 })
306 .map_err(|_| HandleRequestError::SendResponse)
307 }
308}
309
310#[derive(Debug, thiserror::Error)]
311enum HandleRequestError {
312 #[error("Invalid request: max MMR nodes limit exceeded.")]
313 MaxItemsLimitExceeded,
314
315 #[error(transparent)]
316 Client(#[from] sp_blockchain::Error),
317
318 #[error("Failed to send response.")]
319 SendResponse,
320
321 #[error("Failed to decode request: {0}.")]
322 Decode(#[from] parity_scale_codec::Error),
323}