1use crate::mmr::get_offchain_key;
2use crate::mmr::request_handler::{generate_protocol_name, MmrRequest, MmrResponse, MAX_MMR_ITEMS};
3use crate::sync_from_dsn::LOG_TARGET;
4use futures::channel::oneshot;
5use parity_scale_codec::{Decode, Encode};
6use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure};
7use sc_network_sync::SyncingService;
8use sp_api::ProvideRuntimeApi;
9use sp_blockchain::HeaderBackend;
10use sp_core::offchain::storage::OffchainDb;
11use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
12use sp_core::{Hasher, H256};
13use sp_mmr_primitives::mmr_lib::{MMRStoreReadOps, MMRStoreWriteOps};
14use sp_mmr_primitives::utils::NodesUtils;
15use sp_mmr_primitives::{mmr_lib, DataOrHash, MmrApi};
16use sp_runtime::traits::{Block as BlockT, Keccak256, NumberFor};
17use sp_subspace_mmr::MmrLeaf;
18use std::cell::RefCell;
19use std::marker::PhantomData;
20use std::sync::Arc;
21use std::time::Duration;
22use subspace_core_primitives::{BlockHash, BlockNumber};
23use tokio::time::sleep;
24use tracing::{debug, error, trace};
25
26type Node<H, L> = DataOrHash<H, L>;
27type MmrLeafOf = MmrLeaf<BlockNumber, BlockHash>;
28type NodeOf = Node<Keccak256, MmrLeafOf>;
29type MmrOf<OS> = mmr_lib::MMR<NodeOf, MmrHasher, OffchainMmrStorage<OS>>;
30
31pub(crate) fn decode_mmr_data(mut data: &[u8]) -> mmr_lib::Result<NodeOf> {
32 let node = match NodeOf::decode(&mut data) {
33 Ok(node) => node,
34 Err(err) => {
35 error!(target: LOG_TARGET, ?err, "Can't decode MMR data");
36
37 return Err(mmr_lib::Error::StoreError(
38 "Can't decode MMR data".to_string(),
39 ));
40 }
41 };
42
43 Ok(node)
44}
45
46struct OffchainMmrStorage<OS: OffchainStorage> {
47 offchain_db: RefCell<OffchainDb<OS>>,
48}
49
50impl<OS: OffchainStorage> OffchainMmrStorage<OS> {
51 fn new(offchain_storage: OS) -> Self {
52 let offchain_db = OffchainDb::new(offchain_storage);
53
54 Self {
55 offchain_db: RefCell::new(offchain_db),
56 }
57 }
58}
59
60impl<OS: OffchainStorage> MMRStoreReadOps<NodeOf> for OffchainMmrStorage<OS> {
61 fn get_elem(&self, pos: u64) -> mmr_lib::Result<Option<NodeOf>> {
62 let canon_key = get_offchain_key(pos);
63 let Some(data) = self
64 .offchain_db
65 .borrow_mut()
66 .local_storage_get(StorageKind::PERSISTENT, &canon_key)
67 else {
68 error!(target: LOG_TARGET, %pos, "Can't get MMR data.");
69
70 return Ok(None);
71 };
72
73 let node = decode_mmr_data(data.as_slice());
74
75 node.map(Some)
76 }
77}
78
79impl<OS: OffchainStorage> MMRStoreWriteOps<NodeOf> for OffchainMmrStorage<OS> {
80 fn append(&mut self, pos: u64, elems: Vec<NodeOf>) -> mmr_lib::Result<()> {
81 let mut current_pos = pos;
82 for elem in elems {
83 let data = elem.encode();
84
85 let canon_key = get_offchain_key(current_pos);
86 self.offchain_db.borrow_mut().local_storage_set(
87 StorageKind::PERSISTENT,
88 &canon_key,
89 &data,
90 );
91
92 current_pos += 1;
93 }
94
95 Ok(())
96 }
97}
98
99pub struct MmrHasher;
101
102impl mmr_lib::Merge for MmrHasher {
103 type Item = NodeOf;
104
105 fn merge(left: &Self::Item, right: &Self::Item) -> mmr_lib::Result<Self::Item> {
106 let mut concat = left.hash().as_ref().to_vec();
107 concat.extend_from_slice(right.hash().as_ref());
108
109 Ok(Node::Hash(Keccak256::hash(&concat)))
110 }
111}
112
113const SYNC_PAUSE: Duration = Duration::from_secs(5);
114
115pub(crate) struct MmrSync<Client, Block, OS: OffchainStorage> {
117 client: Arc<Client>,
118 mmr: MmrOf<OS>,
119 leaves_number: u32,
120 _data: PhantomData<Block>,
121}
122
123impl<Client, Block, OS> MmrSync<Client, Block, OS>
124where
125 Block: BlockT,
126 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
127 Client::Api: MmrApi<Block, H256, NumberFor<Block>>,
128 OS: OffchainStorage,
129{
130 pub fn new(client: Arc<Client>, offchain_storage: OS) -> Self {
131 Self {
132 client,
133 mmr: MmrOf::new(0, OffchainMmrStorage::new(offchain_storage)),
134 leaves_number: 0,
135 _data: Default::default(),
136 }
137 }
138
139 pub(crate) async fn sync<NR>(
142 &mut self,
143 fork_id: Option<String>,
144 network_service: NR,
145 sync_service: Arc<SyncingService<Block>>,
146 target_block: BlockNumber,
147 ) -> Result<(), sp_blockchain::Error>
148 where
149 NR: NetworkRequest,
150 {
151 debug!(target: LOG_TARGET, "MMR sync started.");
152 let info = self.client.info();
153 let protocol_name = generate_protocol_name(info.genesis_hash, fork_id.as_deref());
154
155 let mut leaves_number = 0u32;
156 let mut starting_position = 0;
157
158 'outer: loop {
159 let peers_info = match sync_service.peers_info().await {
160 Ok(peers_info) => peers_info,
161 Err(error) => {
162 error!(target: LOG_TARGET, "Peers info request returned an error: {error}",);
163 sleep(SYNC_PAUSE).await;
164 continue;
165 }
166 };
167
168 'peers: for (peer_id, peer_info) in peers_info.iter() {
170 trace!(target: LOG_TARGET, "MMR sync. peer = {peer_id}, info = {:?}", peer_info);
171
172 if !peer_info.is_synced {
173 trace!(target: LOG_TARGET, "MMR sync skipped (not synced). peer = {peer_id}");
174
175 continue;
176 }
177
178 loop {
180 let target_position = {
181 let nodes = NodesUtils::new(target_block.into());
182
183 let target_position = nodes.size().saturating_sub(1);
184
185 debug!(
186 target: LOG_TARGET,
187 "MMR-sync. Target block={}, Node target position={}",
188 target_block, target_position
189 );
190
191 target_position
192 };
193
194 let request = MmrRequest {
195 starting_position,
196 limit: MAX_MMR_ITEMS,
197 };
198 let response = send_mmr_request(
199 protocol_name.clone(),
200 *peer_id,
201 request,
202 &network_service,
203 )
204 .await;
205
206 match response {
207 Ok(response) => {
208 trace!(target: LOG_TARGET, "Response: {:?}", response.mmr_data.len());
209
210 if response.mmr_data.is_empty() {
211 debug!(target: LOG_TARGET, "Empty response from peer={}", peer_id);
212 break;
213 }
214
215 'data: for (position, data) in response.mmr_data.iter() {
217 if *position != starting_position {
219 debug!(
220 target: LOG_TARGET,
221 ?peer_info,
222 %starting_position,
223 %position,
224 "MMR sync error: incorrect starting position."
225 );
226
227 continue 'peers;
228 }
229
230 let node = decode_mmr_data(data);
231
232 let node = match node {
233 Ok(node) => node,
234 Err(err) => {
235 debug!(target: LOG_TARGET, ?peer_info, ?err, %position, "Can't decode MMR data received from the peer.");
236
237 continue 'peers;
238 }
239 };
240
241 if matches!(node, Node::Data(_)) {
242 if let Err(err) = self.mmr.push(node) {
243 debug!(target: LOG_TARGET, ?peer_info, ?err, %position, "Can't add MMR data received from the peer.");
244
245 return Err(sp_blockchain::Error::Backend(
246 "Can't add MMR data to the MMR storage".to_string(),
247 ));
248 }
249
250 leaves_number += 1;
251 }
252
253 starting_position += 1;
254
255 if u64::from(*position) >= target_position {
257 debug!(target: LOG_TARGET, %target_position, "MMR-sync: target position reached.");
258 break 'data;
259 }
260 }
261 }
262 Err(error) => {
263 debug!(target: LOG_TARGET, "MMR sync request failed. peer = {peer_id}: {error}");
264
265 continue 'peers;
266 }
267 }
268
269 if target_position <= starting_position.into() {
271 if let Err(err) = self.mmr.commit() {
272 error!(target: LOG_TARGET, ?err, "MMR commit failed.");
273
274 return Err(sp_blockchain::Error::Application(
275 "Failed to commit MMR data.".into(),
276 ));
277 }
278
279 debug!(target: LOG_TARGET, "Target position reached: {target_position}");
282
283 break 'outer;
284 }
285 }
286 }
287 debug!(target: LOG_TARGET, %starting_position, "No synced peers to handle the MMR-sync. Pausing...",);
288 sleep(SYNC_PAUSE).await;
289 }
290
291 debug!(target: LOG_TARGET, "MMR sync finished.");
292 self.leaves_number = leaves_number;
293 Ok(())
294 }
295
296 pub(crate) fn verify_mmr_data(&self) -> Result<(), sp_blockchain::Error> {
297 debug!(target: LOG_TARGET, "Verifying MMR data...");
298
299 let block_number = self.leaves_number;
300 let Some(hash) = self.client.hash(block_number.into())? else {
301 error!(target: LOG_TARGET, %block_number, "MMR data verification: error during hash acquisition");
302 return Err(sp_blockchain::Error::UnknownBlock(
303 "Failed to get Block hash for Number: {block_number:?}".to_string(),
304 ));
305 };
306
307 let mmr_root = self.mmr.get_root();
308 trace!(target: LOG_TARGET, "MMR root: {:?}", mmr_root);
309 let api_root = self.client.runtime_api().mmr_root(hash);
310 trace!(target: LOG_TARGET, "API root: {:?}", api_root);
311
312 let Ok(Node::Hash(mmr_root_hash)) = mmr_root.clone() else {
313 error!(target: LOG_TARGET, %block_number, ?mmr_root, "Can't get MMR root from local storage.");
314 return Err(sp_blockchain::Error::Application(
315 "Can't get MMR root from local storage".into(),
316 ));
317 };
318
319 let Ok(Ok(api_root_hash)) = api_root else {
320 error!(target: LOG_TARGET, %block_number, ?mmr_root, "Can't get MMR root from API.");
321 return Err(sp_blockchain::Error::Application(
322 "Can't get MMR root from API.".into(),
323 ));
324 };
325
326 if api_root_hash != mmr_root_hash {
327 error!(target: LOG_TARGET, ?api_root_hash, ?mmr_root_hash, "MMR data hashes differ.");
328 return Err(sp_blockchain::Error::Application(
329 "MMR data hashes differ.".into(),
330 ));
331 }
332
333 debug!(target: LOG_TARGET, "MMR data verified");
334
335 Ok(())
336 }
337}
338
339#[derive(Debug, thiserror::Error)]
341pub enum MmrResponseError {
342 #[error("MMR request failed: {0}")]
343 RequestFailed(#[from] RequestFailure),
344
345 #[error("MMR request canceled")]
346 RequestCanceled,
347
348 #[error("MMR request failed: invalid protocol")]
349 InvalidProtocol,
350
351 #[error("Failed to decode response: {0}")]
352 DecodeFailed(String),
353}
354
355async fn send_mmr_request<NR: NetworkRequest>(
356 protocol_name: String,
357 peer_id: PeerId,
358 request: MmrRequest,
359 network_service: &NR,
360) -> Result<MmrResponse, MmrResponseError> {
361 let (tx, rx) = oneshot::channel();
362
363 debug!(target: LOG_TARGET, "Sending request: {request:?} (peer={peer_id})");
364
365 let encoded_request = request.encode();
366
367 network_service.start_request(
368 peer_id,
369 protocol_name.clone().into(),
370 encoded_request,
371 None,
372 tx,
373 IfDisconnected::ImmediateError,
374 );
375
376 let result = rx.await.map_err(|_| MmrResponseError::RequestCanceled)?;
377
378 match result {
379 Ok((data, response_protocol_name)) => {
380 if response_protocol_name != protocol_name.into() {
381 return Err(MmrResponseError::InvalidProtocol);
382 }
383
384 let response = decode_mmr_response(&data).map_err(MmrResponseError::DecodeFailed)?;
385
386 Ok(response)
387 }
388 Err(error) => Err(error.into()),
389 }
390}
391
392fn decode_mmr_response(mut response: &[u8]) -> Result<MmrResponse, String> {
393 let response = MmrResponse::decode(&mut response)
394 .map_err(|error| format!("Failed to decode state response: {error}"))?;
395
396 Ok(response)
397}