1use crate::mmr::get_offchain_key;
2use crate::mmr::request_handler::{MAX_MMR_ITEMS, MmrRequest, MmrResponse, generate_protocol_name};
3use futures::channel::oneshot;
4use parity_scale_codec::{Decode, Encode};
5use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure};
6use sc_network_sync::SyncingService;
7use sp_api::ProvideRuntimeApi;
8use sp_blockchain::HeaderBackend;
9use sp_core::offchain::storage::OffchainDb;
10use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
11use sp_core::{H256, Hasher};
12use sp_mmr_primitives::mmr_lib::{MMRStoreReadOps, MMRStoreWriteOps};
13use sp_mmr_primitives::utils::NodesUtils;
14use sp_mmr_primitives::{DataOrHash, MmrApi, mmr_lib};
15use sp_runtime::traits::{Block as BlockT, Keccak256, NumberFor};
16use sp_subspace_mmr::MmrLeaf;
17use std::cell::RefCell;
18use std::marker::PhantomData;
19use std::sync::Arc;
20use std::time::Duration;
21use subspace_core_primitives::{BlockHash, BlockNumber};
22use tokio::time::sleep;
23use tracing::{debug, error, trace};
24
25type Node<H, L> = DataOrHash<H, L>;
26type MmrLeafOf = MmrLeaf<BlockNumber, BlockHash>;
27type NodeOf = Node<Keccak256, MmrLeafOf>;
28type MmrOf<OS> = mmr_lib::MMR<NodeOf, MmrHasher, OffchainMmrStorage<OS>>;
29
30pub(crate) fn decode_mmr_data(mut data: &[u8]) -> mmr_lib::Result<NodeOf> {
31 let node = match NodeOf::decode(&mut data) {
32 Ok(node) => node,
33 Err(err) => {
34 error!(?err, "Can't decode MMR data");
35
36 return Err(mmr_lib::Error::StoreError(
37 "Can't decode MMR data".to_string(),
38 ));
39 }
40 };
41
42 Ok(node)
43}
44
45struct OffchainMmrStorage<OS: OffchainStorage> {
46 offchain_db: RefCell<OffchainDb<OS>>,
47}
48
49impl<OS: OffchainStorage> OffchainMmrStorage<OS> {
50 fn new(offchain_storage: OS) -> Self {
51 let offchain_db = OffchainDb::new(offchain_storage);
52
53 Self {
54 offchain_db: RefCell::new(offchain_db),
55 }
56 }
57}
58
59impl<OS: OffchainStorage> MMRStoreReadOps<NodeOf> for OffchainMmrStorage<OS> {
60 fn get_elem(&self, pos: u64) -> mmr_lib::Result<Option<NodeOf>> {
61 let canon_key = get_offchain_key(pos);
62 let Some(data) = self
63 .offchain_db
64 .borrow_mut()
65 .local_storage_get(StorageKind::PERSISTENT, &canon_key)
66 else {
67 error!( %pos, "Can't get MMR data.");
68 return Ok(None);
69 };
70
71 let node = decode_mmr_data(data.as_slice());
72
73 node.map(Some)
74 }
75}
76
77impl<OS: OffchainStorage> MMRStoreWriteOps<NodeOf> for OffchainMmrStorage<OS> {
78 fn append(&mut self, pos: u64, elems: Vec<NodeOf>) -> mmr_lib::Result<()> {
79 let mut current_pos = pos;
80 for elem in elems {
81 let data = elem.encode();
82
83 let canon_key = get_offchain_key(current_pos);
84 self.offchain_db.borrow_mut().local_storage_set(
85 StorageKind::PERSISTENT,
86 &canon_key,
87 &data,
88 );
89
90 current_pos += 1;
91 }
92
93 Ok(())
94 }
95}
96
97pub struct MmrHasher;
99
100impl mmr_lib::Merge for MmrHasher {
101 type Item = NodeOf;
102
103 fn merge(left: &Self::Item, right: &Self::Item) -> mmr_lib::Result<Self::Item> {
104 let mut concat = left.hash().as_ref().to_vec();
105 concat.extend_from_slice(right.hash().as_ref());
106
107 Ok(Node::Hash(Keccak256::hash(&concat)))
108 }
109}
110
111const SYNC_PAUSE: Duration = Duration::from_secs(5);
112
113pub(crate) struct MmrSync<Client, Block, OS: OffchainStorage> {
115 client: Arc<Client>,
116 mmr: MmrOf<OS>,
117 leaves_number: u32,
118 _data: PhantomData<Block>,
119}
120
121impl<Client, Block, OS> MmrSync<Client, Block, OS>
122where
123 Block: BlockT,
124 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block>,
125 Client::Api: MmrApi<Block, H256, NumberFor<Block>>,
126 OS: OffchainStorage,
127{
128 pub fn new(client: Arc<Client>, offchain_storage: OS) -> Self {
129 Self {
130 client,
131 mmr: MmrOf::new(0, OffchainMmrStorage::new(offchain_storage)),
132 leaves_number: 0,
133 _data: Default::default(),
134 }
135 }
136
137 pub(crate) async fn sync<NR>(
140 &mut self,
141 fork_id: Option<String>,
142 network_service: NR,
143 sync_service: Arc<SyncingService<Block>>,
144 target_block: BlockNumber,
145 ) -> Result<(), sp_blockchain::Error>
146 where
147 NR: NetworkRequest,
148 {
149 debug!("MMR sync started.");
150 let info = self.client.info();
151 let protocol_name = generate_protocol_name(info.genesis_hash, fork_id.as_deref());
152
153 let mut leaves_number = 0u32;
154 let mut starting_position = 0;
155
156 'outer: loop {
157 let peers_info = match sync_service.peers_info().await {
158 Ok(peers_info) => peers_info,
159 Err(error) => {
160 error!("Peers info request returned an error: {error}",);
161 sleep(SYNC_PAUSE).await;
162 continue;
163 }
164 };
165
166 'peers: for (peer_id, peer_info) in peers_info.iter() {
168 trace!("MMR sync. peer = {peer_id}, info = {:?}", peer_info);
169
170 if !peer_info.is_synced {
171 trace!("MMR sync skipped (not synced). peer = {peer_id}");
172
173 continue;
174 }
175
176 loop {
178 let target_position = {
179 let nodes = NodesUtils::new(target_block.into());
180
181 let target_position = nodes.size().saturating_sub(1);
182
183 debug!(
184 "MMR-sync. Target block={}, Node target position={}",
185 target_block, target_position
186 );
187
188 target_position
189 };
190
191 let request = MmrRequest {
192 starting_position,
193 limit: MAX_MMR_ITEMS,
194 };
195 let response = send_mmr_request(
196 protocol_name.clone(),
197 *peer_id,
198 request,
199 &network_service,
200 )
201 .await;
202
203 match response {
204 Ok(response) => {
205 trace!("Response: {:?}", response.mmr_data.len());
206
207 if response.mmr_data.is_empty() {
208 debug!("Empty response from peer={}", peer_id);
209 break;
210 }
211
212 'data: for (position, data) in response.mmr_data.iter() {
214 if *position != starting_position {
216 debug!(
217 ?peer_info,
218 %starting_position,
219 %position,
220 "MMR sync error: incorrect starting position."
221 );
222
223 continue 'peers;
224 }
225
226 let node = decode_mmr_data(data);
227
228 let node = match node {
229 Ok(node) => node,
230 Err(err) => {
231 debug!( ?peer_info, ?err, %position, "Can't decode MMR data received from the peer.");
232
233 continue 'peers;
234 }
235 };
236
237 if matches!(node, Node::Data(_)) {
238 if let Err(err) = self.mmr.push(node) {
239 debug!( ?peer_info, ?err, %position, "Can't add MMR data received from the peer.");
240
241 return Err(sp_blockchain::Error::Backend(
242 "Can't add MMR data to the MMR storage".to_string(),
243 ));
244 }
245
246 leaves_number += 1;
247 }
248
249 starting_position += 1;
250
251 if u64::from(*position) >= target_position {
253 debug!( %target_position, "MMR-sync: target position reached.");
254 break 'data;
255 }
256 }
257 }
258 Err(error) => {
259 debug!("MMR sync request failed. peer = {peer_id}: {error}");
260
261 continue 'peers;
262 }
263 }
264
265 if target_position <= starting_position.into() {
267 if let Err(err) = self.mmr.commit() {
268 error!(?err, "MMR commit failed.");
269
270 return Err(sp_blockchain::Error::Application(
271 "Failed to commit MMR data.".into(),
272 ));
273 }
274
275 debug!("Target position reached: {target_position}");
278
279 break 'outer;
280 }
281 }
282 }
283 debug!( %starting_position, "No synced peers to handle the MMR-sync. Pausing...",);
284 sleep(SYNC_PAUSE).await;
285 }
286
287 debug!("MMR sync finished.");
288 self.leaves_number = leaves_number;
289 Ok(())
290 }
291
292 pub(crate) fn verify_mmr_data(&self) -> Result<(), sp_blockchain::Error> {
293 debug!("Verifying MMR data...");
294
295 let block_number = self.leaves_number;
296 let Some(hash) = self.client.hash(block_number.into())? else {
297 error!( %block_number, "MMR data verification: error during hash acquisition");
298 return Err(sp_blockchain::Error::UnknownBlock(
299 "Failed to get Block hash for Number: {block_number:?}".to_string(),
300 ));
301 };
302
303 let mmr_root = self.mmr.get_root();
304 trace!("MMR root: {:?}", mmr_root);
305 let api_root = self.client.runtime_api().mmr_root(hash);
306 trace!("API root: {:?}", api_root);
307
308 let Ok(Node::Hash(mmr_root_hash)) = mmr_root.clone() else {
309 error!( %block_number, ?mmr_root, "Can't get MMR root from local storage.");
310 return Err(sp_blockchain::Error::Application(
311 "Can't get MMR root from local storage".into(),
312 ));
313 };
314
315 let Ok(Ok(api_root_hash)) = api_root else {
316 error!( %block_number, ?mmr_root, "Can't get MMR root from API.");
317 return Err(sp_blockchain::Error::Application(
318 "Can't get MMR root from API.".into(),
319 ));
320 };
321
322 if api_root_hash != mmr_root_hash {
323 error!(?api_root_hash, ?mmr_root_hash, "MMR data hashes differ.");
324 return Err(sp_blockchain::Error::Application(
325 "MMR data hashes differ.".into(),
326 ));
327 }
328
329 debug!("MMR data verified");
330
331 Ok(())
332 }
333}
334
335#[derive(Debug, thiserror::Error)]
337pub enum MmrResponseError {
338 #[error("MMR request failed: {0}")]
339 RequestFailed(#[from] RequestFailure),
340
341 #[error("MMR request canceled")]
342 RequestCanceled,
343
344 #[error("MMR request failed: invalid protocol")]
345 InvalidProtocol,
346
347 #[error("Failed to decode response: {0}")]
348 DecodeFailed(String),
349}
350
351async fn send_mmr_request<NR: NetworkRequest>(
352 protocol_name: String,
353 peer_id: PeerId,
354 request: MmrRequest,
355 network_service: &NR,
356) -> Result<MmrResponse, MmrResponseError> {
357 let (tx, rx) = oneshot::channel();
358
359 debug!("Sending request: {request:?} (peer={peer_id})");
360
361 let encoded_request = request.encode();
362
363 network_service.start_request(
364 peer_id,
365 protocol_name.clone().into(),
366 encoded_request,
367 None,
368 tx,
369 IfDisconnected::ImmediateError,
370 );
371
372 let result = rx.await.map_err(|_| MmrResponseError::RequestCanceled)?;
373
374 match result {
375 Ok((data, response_protocol_name)) => {
376 if response_protocol_name != protocol_name.into() {
377 return Err(MmrResponseError::InvalidProtocol);
378 }
379
380 let response = decode_mmr_response(&data).map_err(MmrResponseError::DecodeFailed)?;
381
382 Ok(response)
383 }
384 Err(error) => Err(error.into()),
385 }
386}
387
388fn decode_mmr_response(mut response: &[u8]) -> Result<MmrResponse, String> {
389 let response = MmrResponse::decode(&mut response)
390 .map_err(|error| format!("Failed to decode state response: {error}"))?;
391
392 Ok(response)
393}